博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark学习记录(三)核心API模块介绍
阅读量:7074 次
发布时间:2019-06-28

本文共 2710 字,大约阅读时间需要 9 分钟。

hot3.png

spark

-------------
基于hadoop的mr,扩展MR模型高效使用MR模型,内存型集群计算,提高app处理速度。

spark特点

-------------
速度:在内存中存储中间结果。
支持多种语言。Scala、Java、Python
内置了80+的算子.
高级分析:MR,SQL/ Streamming /mllib / graph

RDD:

----------------
是spark的基本数据结构,是不可变数据集。RDD中的数据集进行逻辑分区,每个分区可以单独在集群节点
进行计算。可以包含任何java,scala,python和自定义类型。

RDD是只读的记录分区集合。RDD具有容错机制。

创建RDD方式,一、并行化一个现有集合。

hadoop 花费90%时间用户rw。、

内存处理计算。在job间进行数据共享。内存的IO速率高于网络和disk的10 ~ 100之间。

内部包含5个主要属性

-----------------------
1.分区列表
2.针对每个split的计算函数。
3.对其他rdd的依赖列表
4.可选,如果是KeyValueRDD的话,可以带分区类。
5.可选,首选块位置列表(hdfs block location);

 

RDD变换

rdd的变换方法都是lazy执行的

------------------
返回指向新rdd的指针,在rdd之间创建依赖关系。每个rdd都有计算函数和指向父RDD的指针。

map() //对每个元素进行变换,应用变换函数
//(T)=>V

mapPartitions() //对每个分区进行应用变换,输入的Iterator,返回新的迭代器,可以对分区进行函数处理。

image2018-10-11_17-49-18.png?version=1&modificationDate=1539251353000&api=v2

//针对每个数据分区进行操作,入参是分区数据的Iterator,map() 针对分区中的每个元素进行操作。

mapPartitions()  //Iterator<T> => Iterator<U>

注:最好设置每个分区都对应有一个线程。

filter() //过滤器,(T)=>Boolean

flatMap() //压扁,T => TraversableOnce[U]

 

//同mapPartitions方法一样都是针对分区处理,只不过这个方法可以获取到分区索引

mapPartitionsWithIndex(func)  //(Int, Iterator<T>) => Iterator<U>

image2018-10-11_18-4-58.png?version=1&modificationDate=1539252293000&api=v2

 

//采样返回采样的RDD子集。

//withReplacement 元素是否可以多次采样.
//fraction : 期望采样数量.[0,1]
//表示一个种子,根据这个seed随机抽取,一般都只用到前两个参数
sample(withReplacement, fraction, seed)

作用:在数据倾斜的时候,我们那么多数据如果想知道那个key倾斜了,就需要我们采样获取这些key,出现次数陊的key就是导致数据倾斜的key。如果这些key数据不是很重要的话,可以过滤掉,这样就解决了数据倾斜。

image2018-10-11_18-39-57.png?version=1&modificationDate=1539254392000&api=v2

 

 

union() //类似于mysql union操作。

image2018-10-11_18-48-13.png?version=1&modificationDate=1539254888000&api=v2

 

intersection //交集,提取两个rdd中都含有的元素。

image2018-10-11_18-52-15.png?version=1&modificationDate=1539255130000&api=v2

distinct([numTasks])) //去重,去除重复的元素。

image2018-10-11_18-55-16.png?version=1&modificationDate=1539255311000&api=v2

 

groupByKey() //(K,V) => (K,Iterable<V>)  使用前需要构造出对偶的RDD

image2018-10-11_19-26-15.png?version=1&modificationDate=1539257170000&api=v2

reduceByKey(*) //按key聚合。注意他是一个RDD变换方法,不是action

 

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])//按照key进行聚合,这个函数逻辑较为复杂请看aggregateByKey函数的专题

image2018-10-11_20-1-50.png?version=1&modificationDate=1539259306000&api=v2

 

sortByKey //根据映射的Key进行排序,但是只能根据Key排序

image2018-10-12_10-51-52.png?version=1&modificationDate=1539312705000&api=v2

 

sortBy //比sortByKey更加灵活强大的排序,可根据元组中任意字段排序

image2018-10-12_11-12-36.png?version=1&modificationDate=1539313949000&api=v2

image2018-10-12_11-13-4.png?version=1&modificationDate=1539313977000&api=v2

 

join(otherDataset, [numTasks]) //横向连接,有两种数据(K,V)和(K,W),链接后返回(K,(V,W)),两个元组一一对应的

image2018-10-12_11-22-30.png?version=1&modificationDate=1539314543000&api=v2

image2018-10-12_11-22-46.png?version=1&modificationDate=1539314560000&api=v2

 

cogroup //协分组,(K,V)和(K,W)分组后返回(K,(V,W)),注意协分组不是一一对应的分组后需要(此处注意与join的区别)

QQ%E6%88%AA%E5%9B%BE20181012113912.jpg?version=1&modificationDate=1539315570000&api=v2

image2018-10-12_11-39-49.png?version=1&modificationDate=1539315583000&api=v2

cartesian(otherDataset) //笛卡尔积,RR[(A,B)] RDD[(1,2)] => RDD[(A,1),(A,2),(B,1),(B,2)]

QQ%E6%88%AA%E5%9B%BE20181012114617.jpg?version=1&modificationDate=1539315987000&api=v2

 

pipe //将rdd的元素传递给脚本或者命令,执行结果返回形成新的RDD

coalesce(numPartitions) //减少分区

repartition //再分区

repartitionAndSortWithinPartitions(partitioner)//再分区并在分区内进行排序

 

RDD Action

Spack的中的方法都是懒的,,只有遇到了action类型的方法才会真正的执行

------------------
collect() //收集rdd元素形成数组.
count() //统计rdd元素的个数
reduce() //聚合,返回一个值。
first //取出第一个元素take(1)
take //
takeSample (withReplacement,num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path) //保存到文件
saveAsSequenceFile(path) //保存成序列文件  sc.sequenceFile读取序列文件

saveAsObjectFile(path) (Java and Scala)

countByKey()                       //按照key统计有几个value 

 

数据倾斜

------------------------------

由于大量相同的Key,在reduce合并计算的过程中,大量相同的Key被分配到了同一个集群节点,导致集群中这个节点计算压力非常大。

本例采用的解决方案是,在map截断将Key先接上一个随机数打散,然后在reduce计算后,再次map还原key,然后进行最终reduce。

QQ%E6%88%AA%E5%9B%BE20181012161028.jpg?version=1&modificationDate=1539331835000&api=v2

Spark WebUI 上面代码运行的DAG 有效无环图,我们可以清楚地看到每一次的reduce聚合都会重新划分阶段

image2018-10-15_10-10-40.png?version=1&modificationDate=1539569429000&api=v2

转载于:https://my.oschina.net/u/3687664/blog/2876019

你可能感兴趣的文章
Android FM模块学习之四源码解析(一)
查看>>
人生最重要的三种能力,不是读书能学来的
查看>>
JDK中文方框乱码问题
查看>>
关于技术
查看>>
adb学习2
查看>>
面试之STAR法则详解
查看>>
inno安装
查看>>
禁用缓存的设置
查看>>
Idea14解决JSP/JS文件需要重启问题解决
查看>>
对IoC DI的理解
查看>>
tactic remove project
查看>>
实现汉字转拼音
查看>>
太吃鸡了
查看>>
Smart2.0开发指南——开发工具
查看>>
oracle——06表查询中需要注意的一些问题
查看>>
大白话讲Zookeeper能做什么?(一):命名服务与配置管理
查看>>
java httpclient使用socks5代理(二)使用socks5代理服务
查看>>
java实现多线程的三种方式
查看>>
汇编 输入输出字符串(最简单版)
查看>>
Submit a form with Ajax 发送邮件参考
查看>>