Spark 二次排序
示例数据:
+---+------+------+
|key|value1|value2|
+---+------+------+
| k1| a11| b13|
| k1| a12| b11|
| k1| a13| b12|
| k1| a14| b1|
| k2| a11| b21|
| k2| a12| b22|
| k2| a23| b23|
| k2| a24| b24|
+---+------+------+
如果让同一个key
的数据聚在一起,并且按 value1
排序,怎么实现?
小规模
rdd.map{
case (key, value1, value2) =>
key -> List((value1, value2))
}.reduceByKey(_ ++ _).mapValues(_.sortWith(_._1 < _._1))
或者
rdd.map{
case (key, value1, value2) =>
key -> (value1, value2)
}.groupByKey().map{
case (key, it) =>
it.toList.sortWith(_._1 < _._1)
}
小规模数据,这样是没什么问题,如果同一个key
下的数据很大,内存放不下,就会出问题。
Spark sortBy
可以排序,但是同一个key
的数据,可能会分布不同的partition
中。
大规模
思路就是:使用Spark的 sortBy
并且让同一个key
的数据分布在一个partition
中。
repartitionAndSortWithinPartitions
就是spark提供这样的算子,只要实现一个Partitioner
就可以了。
// key hash
class PartKeyHashPartitioner[K1: Ordering: ClassTag, K2, V](partitions: Int, rdd: RDD[((K1, K2), V)]) extends Partitioner {
val _part = new HashPartitioner(partitions)
override def numPartitions: Int = _part.numPartitions
override def getPartition(key: Any): Int = _part.getPartition(key.asInstanceOf[(K1, K2)]._1)
}
val rdd2 = rdd.map{
case (key, value1, value2) =>
(key, value1) -> value2
}
rdd2.repartitionAndSortWithinPartitions(new PartKeyHashPartitioner(128, rdd2))
如果想让key也排序,再把partitioner
改为排序的就可以了。
- class PartKeyHashPartitioner[K1: Ordering: ClassTag, K2, V](partitions: Int, rdd: RDD[((K1, K2), V)]) extends Partitioner {
- val _part = new HashPartitioner(partitions)
+ class PartKeyPartitioner[K1: Ordering: ClassTag, K2, V](partitions: Int, rdd: RDD[((K1, K2), V)], ascending: Boolean = true) extends Partitioner {
+ val _part = new RangePartitioner[K1, V](partitions, rdd.map(v => v._1._1 -> v._2), ascending)
override def numPartitions: Int = _part.numPartitions
override def getPartition(key: Any): Int = _part.getPartition(key.asInstanceOf[(K1, K2)]._1)
}