Spark 二次排序

示例数据:

+---+------+------+
|key|value1|value2|
+---+------+------+
| k1|   a11|   b13|
| k1|   a12|   b11|
| k1|   a13|   b12|
| k1|   a14|    b1|
| k2|   a11|   b21|
| k2|   a12|   b22|
| k2|   a23|   b23|
| k2|   a24|   b24|
+---+------+------+

如果让同一个key的数据聚在一起,并且按 value1 排序,怎么实现?

小规模

rdd.map{
  case (key, value1, value2) =>
    key -> List((value1, value2))
}.reduceByKey(_ ++ _).mapValues(_.sortWith(_._1 < _._1))

或者

rdd.map{
  case (key, value1, value2) =>
    key -> (value1, value2)
}.groupByKey().map{
  case (key, it) =>
    it.toList.sortWith(_._1 < _._1)
}

小规模数据,这样是没什么问题,如果同一个key下的数据很大,内存放不下,就会出问题。

Spark sortBy 可以排序,但是同一个key的数据,可能会分布不同的partition中。

大规模

思路就是:使用Spark的 sortBy并且让同一个key的数据分布在一个partition中。

repartitionAndSortWithinPartitions 就是spark提供这样的算子,只要实现一个Partitioner就可以了。


// key hash
class PartKeyHashPartitioner[K1: Ordering: ClassTag, K2, V](partitions: Int, rdd: RDD[((K1, K2), V)]) extends Partitioner {
  val _part = new HashPartitioner(partitions)

  override def numPartitions: Int = _part.numPartitions

  override def getPartition(key: Any): Int = _part.getPartition(key.asInstanceOf[(K1, K2)]._1)
}

val rdd2 = rdd.map{
  case (key, value1, value2) =>
    (key, value1) -> value2
}

rdd2.repartitionAndSortWithinPartitions(new PartKeyHashPartitioner(128, rdd2))

如果想让key也排序,再把partitioner改为排序的就可以了。

-    class PartKeyHashPartitioner[K1: Ordering: ClassTag, K2, V](partitions: Int, rdd: RDD[((K1, K2), V)]) extends Partitioner {
-      val _part = new HashPartitioner(partitions)
+    class PartKeyPartitioner[K1: Ordering: ClassTag, K2, V](partitions: Int, rdd: RDD[((K1, K2), V)], ascending: Boolean = true) extends Partitioner {
+      val _part = new RangePartitioner[K1, V](partitions, rdd.map(v => v._1._1 -> v._2), ascending)

      override def numPartitions: Int = _part.numPartitions

      override def getPartition(key: Any): Int = _part.getPartition(key.asInstanceOf[(K1, K2)]._1)
    }