spark
------------- 基于hadoop的mr,扩展MR模型高效使用MR模型,内存型集群计算,提高app处理速度。spark特点
------------- 速度:在内存中存储中间结果。 支持多种语言。Scala、Java、Python 内置了80+的算子. 高级分析:MR,SQL/ Streamming /mllib / graphRDD:
---------------- 是spark的基本数据结构,是不可变数据集。RDD中的数据集进行逻辑分区,每个分区可以单独在集群节点 进行计算。可以包含任何java,scala,python和自定义类型。RDD是只读的记录分区集合。RDD具有容错机制。
创建RDD方式,一、并行化一个现有集合。
hadoop 花费90%时间用户rw。、
内存处理计算。在job间进行数据共享。内存的IO速率高于网络和disk的10 ~ 100之间。内部包含5个主要属性
----------------------- 1.分区列表 2.针对每个split的计算函数。 3.对其他rdd的依赖列表 4.可选,如果是KeyValueRDD的话,可以带分区类。 5.可选,首选块位置列表(hdfs block location);
RDD变换
rdd的变换方法都是lazy执行的
------------------ 返回指向新rdd的指针,在rdd之间创建依赖关系。每个rdd都有计算函数和指向父RDD的指针。 map() //对每个元素进行变换,应用变换函数 //(T)=>VmapPartitions() //对每个分区进行应用变换,输入的Iterator,返回新的迭代器,可以对分区进行函数处理。
//针对每个数据分区进行操作,入参是分区数据的Iterator,map() 针对分区中的每个元素进行操作。
mapPartitions() //Iterator<T> => Iterator<U>
注:最好设置每个分区都对应有一个线程。
filter() //过滤器,(T)=>Boolean
flatMap() //压扁,T => TraversableOnce[U]
//同mapPartitions方法一样都是针对分区处理,只不过这个方法可以获取到分区索引
mapPartitionsWithIndex(func) //(Int, Iterator<T>) => Iterator<U>
//采样返回采样的RDD子集。
//withReplacement 元素是否可以多次采样. //fraction : 期望采样数量.[0,1] //表示一个种子,根据这个seed随机抽取,一般都只用到前两个参数 sample(withReplacement, fraction, seed)作用:在数据倾斜的时候,我们那么多数据如果想知道那个key倾斜了,就需要我们采样获取这些key,出现次数陊的key就是导致数据倾斜的key。如果这些key数据不是很重要的话,可以过滤掉,这样就解决了数据倾斜。
union() //类似于mysql union操作。
intersection //交集,提取两个rdd中都含有的元素。
distinct([numTasks])) //去重,去除重复的元素。
groupByKey() //(K,V) => (K,Iterable<V>) 使用前需要构造出对偶的RDD
reduceByKey(*) //按key聚合。注意他是一个RDD变换方法,不是action
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])//按照key进行聚合,这个函数逻辑较为复杂请看aggregateByKey函数的专题
sortByKey //根据映射的Key进行排序,但是只能根据Key排序
sortBy //比sortByKey更加灵活强大的排序,可根据元组中任意字段排序
join(otherDataset, [numTasks]) //横向连接,有两种数据(K,V)和(K,W),链接后返回(K,(V,W)),两个元组一一对应的
cogroup //协分组,(K,V)和(K,W)分组后返回(K,(V,W)),注意协分组不是一一对应的分组后需要(此处注意与join的区别)
cartesian(otherDataset) //笛卡尔积,RR[(A,B)] RDD[(1,2)] => RDD[(A,1),(A,2),(B,1),(B,2)]
pipe //将rdd的元素传递给脚本或者命令,执行结果返回形成新的RDD
coalesce(numPartitions) //减少分区 repartition //再分区 repartitionAndSortWithinPartitions(partitioner)//再分区并在分区内进行排序
RDD Action
Spack的中的方法都是懒的,,只有遇到了action类型的方法才会真正的执行
------------------ collect() //收集rdd元素形成数组. count() //统计rdd元素的个数 reduce() //聚合,返回一个值。 first //取出第一个元素take(1) take // takeSample (withReplacement,num, [seed]) takeOrdered(n, [ordering]) saveAsTextFile(path) //保存到文件 saveAsSequenceFile(path) //保存成序列文件 sc.sequenceFile读取序列文件saveAsObjectFile(path) (Java and Scala)
countByKey() //按照key统计有几个value
数据倾斜
------------------------------
由于大量相同的Key,在reduce合并计算的过程中,大量相同的Key被分配到了同一个集群节点,导致集群中这个节点计算压力非常大。
本例采用的解决方案是,在map截断将Key先接上一个随机数打散,然后在reduce计算后,再次map还原key,然后进行最终reduce。
Spark WebUI 上面代码运行的DAG 有效无环图,我们可以清楚地看到每一次的reduce聚合都会重新划分阶段