【小贪】大数据处理:Pyspark, Pandas对比及常用语法

近期致力于总结科研或者工作中用到的主要技术栈,从技术原理到常用语法,这次查缺补漏当作我的小百科。主要技术包括:

  • ✅数据库常用:MySQL, Hive SQL, Spark SQL
  • ✅大数据处理常用:Pyspark, Pandas
  • ⚪ 图像处理常用:OpenCV, matplotlib
  • ⚪ 机器学习常用:SciPy, Sklearn
  • ⚪ 深度学习常用:Pytorch, numpy
  • ⚪ 常用数据结构语法糖:itertools, collections
  • ⚪ 常用命令: Shell, Git, Vim

以下整理错误或者缺少的部分欢迎指正!!!

大数据处理常用:Pyspark, Pandas

性能对比

PysparkPandas
运行环境分布式计算集群(Hadoop/Apache Spark集群)单个计算机
数据规模亿级大规模百万级小规模
优势分布式计算->并行处理,处理速度快API简单->数据处理简单
延迟机制lazy execution, 执行动作之前不执行任务eager execution, 任务立即被执行
内存缓存persist()/cache()将转换的RDDs保存在内存单机缓存
DataFrame可变性不可变,修改则返回一个新的DataFrame可变
可扩展性
列名允许重复×

常用语法对比

# 头文件
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType  # 或者直接导入*
import pandas as pd


# 创建SparkSession对象
spark = SparkSession.builder \
    .appName("username") \
    .getOrCreate()


# 创建空表
schema = StructType([
                StructField('id', LongType()),
                StructField('type', StringType()),
            ])  # spark需要指定列名和类型
spark_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=schema)
pandas_df = pd.DataFrame(columns=['id', 'type'], index=[0, 1, 2])


# 根据现有数据创建
data = [(1, "Alice", 2000), (2, "Bob", 2001), (3, "Charlie", 2002)]
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("birth_year", IntegerType(), True)
])
spark_df = spark.createDataFrame(data, ["id", "name", "birth_year"])
spark_df = spark.createDataFrame(data, schema)
pandas_df = pd.DataFrame(data=data, columns=["id", "name", "birth_year"])


# 读取csv文件
spark_df = spark.read.csv("data.csv", header=True, inferSchema=True)
pandas_df = pd.read_csv("data.csv", sep="\t")  # read_excel
# 保存数据到csv
spark_df.write.csv('data.csv', header=True)
pandas_df.to_csv("data.csv", index=False)

# 读取hive表数据
spark_df = spark.sql('select * from tab')
# 保存数据到hive表
spark_df.write.mode('overwrite').saveAsTable('db_name.tab_name')


# 相互转换
spark_df = SQLContext.createDataFrame(pandas_df)
pandas_df = spark_df.toPandas()


# 转换数据类型
spark_df = spark_df.withColumn("A", col("age").cast(StringType))
pandas_df["A"] = pandas_df['A'].astype("int")


# 重置索引
spark_df = spark_df.withColumn("id", monotonically_increasing_id())  # 生成一个增长的id列
pandas_df.reset_index()


# 切片
pandas_df['a':'c']  # a-c三行
pandas_df.iloc[1:3, 0:2]  # 1-2行,0-1列。左闭右开
pandas_df.iloc[[0, 2], [1, 2]] # 第0,2行第0,2列
pandas_df.loc['a':'c', ['A', 'B']] # 第a-c行A,B列


# 选择列
spark_df.select('A', 'B')
pandas_df[['A', 'B']]

# 删除列
spark_df.drop('A', 'B')
pandas_df.drop(['A', 'B'], axis=1, inplace=True)  # inplace表示是否创建新对象

# 新增列,设置列值
spark_df = spark_df.withColumn('name', F.lit(0))
pandas_df['name'] = 0

# 修改列值
spark_df.withColumn('name', 1)
pandas_df['name'] = 1
# 使用函数修改列值
spark_df = spark_df.withColumn('code', F.when(F.isnull(spark_df.code), 0).otherwise(spark_df.code))

# 修改列名
spark_df.withColumnRenamed('old_name', 'new_name')
pandas_df.rename(columns={'old_name1': 'new_name1', 'old_name1': 'new_name2'}, inplace=True)


# 显示数据
spark_df.limit(10) # 前10行
spark_df.show/take(10)  # collect()返回全部数据
spark_df/pandas_df.first/head/tail(10)


# 表格遍历
saprk_df.collect()[:10]
spark_df.foreach(lambda row: print(row['c1'], row['c2']))
for i, row in pandas_df.iterrows():
    print(row["c1"], row["c2"])


# 排序
spark/pandas_df.sort()  # 按列值排序
pandas_df.sort_index()  # 按轴排序
pandas_df.sort_values(by=["A", "B"], axis=0, ascending=[True, False], inplace=True)  # 指定列升序/降序排序


# 过滤
spark_df.filter(df['col_name'] > 1)     # spark_df.where(df['col_name'] > 1)
pandas_df[pandas_df['col_name'] > 1]
pandas_df_new = pandas_df[pandas_df["code"].apply(lambda x: len(x) == 11)]


# 去重
spark_df.select('col_name').distinct()
spark_df_filter = spark_df.drop_duplicates(["col_name"])
pandas_df.drop_duplicates(["col_name"], keep='first', inplace=True)

# 缺失数据处理
spark_df.na.fill()
spark_df.na.drop(subset=['A', "B"])  # 同dropna
pandas_df.fillna()
pandas_df.dropna(subset=['A', "B"], how="any", inplace=True)

# 空值过滤 filter=choose
spark_df.filter(~(F.isnull(spark_df.d)))
spark_df.filter(~(spark_df['A'].isNull() | spark_df['B'].isNull()))   # 选出列值不为空的行  isnan()=isNull()<->isNOtnan()
pandas_df[pandas_df['A'].isna()]  # 选出列值为空的行
pandas_df[pandas_df['A'].notna()] # 选出列值不为空的行


# 统计
spark/pandas_df.count()  # spark返回总行数,pandas返回列非空总数
spark/pandas_df.describe() # 描述列的count, mean, min, max...

# 计算某一列均值
average_value = spark_df.select("col_name").agg({"col_name": "avg"}).collect()[0][0]
average_value = pandas_df["col_name"].mean()


# 表合并
# 按行合并,相当于追加
spark_df = spark_df.unionAll(spark_df1)
pandas_df = pd.concat([df_up, df_down], axis=0)
# 按列合并
spark_df = spark_df.join(df1, df1.id==spark_df.id, 'inner').drop(df1.id)  # df1.id==spark_df.id也可写成['id](当且仅当列名相同)
pd.merge(df_left, df_right, left_on="a", right_on="b", how="left|right|inner|outer")  


# 聚合函数
spark_df_collect = spark_df.groupBy('number').agg(
    F.collect_set('province').alias('set_province'),
    F.first('city').alias('set_city'),
    F.collect_list('district').alias('set_district'),
    F.max('report_user').alias('set_report_user'),
    F.min('first_type').alias('set_first_type'))
# 分组聚合
spark_df.groupBy('A').agg(F.avg('B'), F.min('B'))
spark/pandas_df.groupby('A').avg('B')

# 根据函数分组聚合
def func(x):
    return pd.DataFrame({
        "A": x["A"].tolist()[0],
        "B": sum(x["B"])}, index=[0])
pandas_df_result = pandas_df.groupby(["A"]).apply(func)


# spark udf函数和pandas apply函数
def func1(a, b):
    return a + b
spark_df.withColumn("col_name", F.udf(func1, IntegerType())(spark_df.a, spark_df.b))  # spark_df['a']或F.col("a")))
def func2(x,y):
    return 1 if x > np.mean(y) else 0
pandas_df['A'].apply(func2, args=(pandas_df['B'],))
pandas_df['C'] = pandas_df.apply(lambda x: 1 if x['A'] > (x['B']*0.5) else 0, axis=1)


# spark创建临时表
spark_df.createOrReplaceTempView('tmp_table')  # 用sql API
res1 = spark.sql('select * from tmp_table')
spark_df.registerTempTable('tmp_table') # 用dataframe API
res2 = spark.table('tmp_table') 

其他常用设置

class SparkUtils:
    def __init__(self):
        self.spark = None

    def get_spark(self):
        if self.spark is None:
            self.spark = SparkSession.builder.appName("username") \
                .enableHiveSupport().config("spark.sql.shuffle.partitions", "500") \
                .config("spark.sql.broadcastTimeout", "3600") \
                .config("spark.driver.memory", "200g") \
                .config("spark.executor.memory", "40g") \
                .config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \
                .config("spark.executorEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \
                .config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name",
                        "bdp-docker.jd.com:5000/wise_mart_bag:latest") \
                .config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name",
                        "bdp-docker.jd.com:5000/wise_mart_bag:latest") \
                .getOrCreate()
        self.spark.sql('SET hive.exec.dynamic.partition=true')
        self.spark.sql('SET hive.exec.dynamic.partition.mode=nonstrict')
        return self.spark

spark = SparkUtils()

# 生成dataframe
spark_data = spark.sql("""
    select 
      id, 
      username
    from 
      tab1
    where 
      status in (1, 2, 3)
      and dt = '{}'
  """.format(date))

# pandas常用显示设置
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.width',1000)
pd.set_option('display.max_colwidth',1000)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/559416.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp全局监听分享朋友圈或朋友

把大象装进冰箱需要几步&#xff1a; 1、创建shart.js文件 export default{data(){return {//设置默认的分享参数//如果页面不设置share&#xff0c;就触发这个默认的分享share:{title:标题,path:/pages/index/index,imageUrl:图片,desc:描述,content:内容}}},onLoad(){let ro…

Android的一些总结

先打开自定义的app显示欢迎->消失 打开桌面应用程序->在桌面应用程序中也要能一键启动打开视频播放的app 桌面应用程序广播接收者进行监听&#xff0c;然后打开服务/activity是可行的。 ########################## 日志&#xff0c;调试&#xff1a; Usb 无线 串口…

机器学习预测汽车油耗效率 MPG

流程 数据获取导入需要的包引入文件,查看内容划分训练集和测试集调用模型查看准确率 数据获取 链接&#xff1a;https://pan.baidu.com/s/1KeIJykbcVpsfEk0xjhiICA?pwd30oe 提取码&#xff1a;30oe --来自百度网盘超级会员V1的分享导入需要的包 import pandas as pd imp…

华为认证实验配置(10): 实现VLAN间通信

传统交换二层组网中&#xff0c;默认所有网络都处于同一个广播域&#xff0c;这带了诸多问题。VLAN技术的提出&#xff0c;满足了二层组网隔离广播域需求&#xff0c;使得属于不同VLAN的网络无法互访&#xff0c;但不同VLAN之间又存在着相互访问的需求 重点&#xff1a;使用路…

【人工智能】机器学习算法综述及常见算法详解

目录 推荐 1、机器学习算法简介 1.1 机器学习算法包含的两个步骤 1.2 机器学习算法的分类 2、线性回归算法 2.1 线性回归的假设是什么&#xff1f; 2.2 如何确定线性回归模型的拟合优度&#xff1f; 2.3 如何处理线性回归中的异常值&#xff1f; 3、逻辑回归算法 3.1 …

公园高速公路景区校园IP网络广播音柱SIP音柱

公园高速公路景区校园IP网络广播音柱SIP音柱 适用于学校、车站、教堂、工厂、仓库、公园停车场及露天市场高速公路等场所播放录制语音文件或背景音乐节目&#xff0c;专业一体化音箱设计&#xff0c;高强度防水设计&#xff0c;符合IP54防护等认证&#xff0c;数字化产品&…

.net6项目模板

1.集成log4net 安装依赖包&#xff1a; 安装扩展依赖即可&#xff0c;已经包含了log4net依赖&#xff1a; Microsoft.Extensions.Logging.Log4Net.AspNetCore 添加日志配置文件&#xff1a; 日志配置文件属性设置为始终复制&#xff1a; 注入服务&#xff1a; #region 注入…

Spring Boot 实现接口幂等性的 4 种方案

一、什么是幂等性 幂等是一个数学与计算机学概念&#xff0c;在数学中某一元运算为幂等时&#xff0c;其作用在任一元素两次后会和其作用一次的结果相同。 在计算机中编程中&#xff0c;一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数或幂…

微信小程序开发之多图片上传+.NET WebAPI后端服务保存图片资源

前言&#xff1a; 最近开发的一个微信小程序项目需要做一个同时选中三张&#xff08;或者是多张&#xff09;图片一起上传到服务端&#xff0c;服务端保存图片资源并保存的功能。发现在微信小程序开发中会有很多场景会使用到多图片上传并保存到的功能&#xff0c;所以我把自己总…

高频前端面试题汇总之Vue篇

1. Vue的基本原理 当一个Vue实例创建时&#xff0c;Vue会遍历data中的属性&#xff0c;用 Object.defineProperty&#xff08;vue3.0使用proxy &#xff09;将它们转为 getter/setter&#xff0c;并且在内部追踪相关依赖&#xff0c;在属性被访问和修改时通知变化。 每个组件实…

Stable Diffusion 模型分享:ChilloutMix(真实、亚洲面孔)chilloutmix_NiPrunedFp32Fix

本文收录于《AI绘画从入门到精通》专栏&#xff0c;专栏总目录&#xff1a;点这里&#xff0c;订阅后可阅读专栏内所有文章。 文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八 下载地址 模型介绍 相信近来吸引大家想一试 Stable Diffusion 图像生…

【EI会议征稿】2024年先进机械电子、电气工程与自动化国际学术会议(ICAMEEA 2024)

2024 International Conference on Advanced Mechatronic, Electrical Engineering and Automation ●会议简介 2024年先进机械电子、电气工程与自动化国际学术会议&#xff08;ICAMEEA 2024&#xff09;将汇聚全球机械电子、电气工程与自动化领域的专家学者&#xff0c;共同…

洗眼镜什么牌子的超声波清洗机好用?全网一致好评四大品牌

眼镜作为我们日常佩戴的必备单品&#xff0c;你是否真正关注过它的清洁度&#xff1f;眼镜不清洗&#xff0c;不仅影响视力&#xff0c;还可能对眼睛造成不可逆的伤害。因此&#xff0c;眼镜一定要经常清洗&#xff0c;而超声波清洗机则是你洗眼镜的最佳选择。在市面上&#xf…

新项目应该选mongodb还是postgresql?

文章目录 MongoDBPostgreSQL大数据处理时的优势对比实际使用经验 选择MongoDB还是PostgreSQL作为新项目的数据库&#xff0c;主要取决于项目的具体需求、数据模型、应用场景以及团队熟悉程度等因素。下面将从几个关键角度对两者进行对比分析。 MongoDB 数据模型&#xff1a;Mo…

蓝桥杯竞赛类型:Web应用开发 全程详解

既然大家准备报名蓝桥杯&#xff0c;那么对蓝桥杯就应该有一定的了解了。没有了解也没关系&#xff0c;简单来说&#xff0c;蓝桥杯就是一个计算机竞赛&#xff0c;竞赛类型大多是使用各种语言写算法&#xff0c;当然还有本文的主体——Web应用开发。对蓝桥杯有了基本了解之后&…

一个完全用rust写的开源操作系统-Starry

1. Starry Starry是2023年全国大学生计算机系统能力大赛操作系统设计赛-内核实现赛的二等奖作品。Starry是在组件化OS的arceos的基础上&#xff0c;进行二次开发的操作系统内核&#xff0c;使用宏内核架构&#xff0c;能够运行Linux应用的内核。 原始的操作系统大赛的仓库为 …

vue快速入门(三十四)组件data定义方法

注释很详细&#xff0c;直接上代码 上一篇 新增内容 数据绑定方法照常数据定义方法需要作为函数返回值 源码 MyTest.vue <template><div><h1>我的功德&#xff1a;{{merits}} </h1><button click"meritsnum1">功德加一</button>…

C++实战——日期类的实现

日期类的实现 前言一、日期类概念实现运用场景 二、日期类的具体实现代码构造函数拷贝构造函数获取日期&#xff08;内联函数&#xff09;赋值加等减等加减小于小于等于大于大于等于相等不相等前置后置前置- -后置- -关于类里重载的比较运算符为什么要加外部const示例 Date.hDa…

常见UI组件(二)

一、文本输入 1.1 概述 TextInput为文本输入组件&#xff0c;用于接收用户输入的文本内容 1.2 参数 Entry Component struct Index {build() {Column({space : 50}) {TextInput({placeholder:请输入用户名}).width(70%)TextInput({text:当前内容}).width(70%)}.width(100%).…

90天精通Psim仿真--经典实战教程--第10天 Simcode DSP28335 LED控制

PSIM (Power Simulation) 是一款电力电子和电机控制仿真软件,而DSP28335是德州仪器(TI)的一款数字信号处理器(DSP)。如果你想要在PSIM的SimCoder环境中为DSP28335生成LED闪烁的代码,遵循以下步骤: 打开PSIM并创建模型: 首先,在PSIM中创建一个电路模型,该模型应包括DS…
最新文章