Marc Arndt created SPARK-27853: ---------------------------------- Summary: Allow for custom Partitioning implementations Key: SPARK-27853 URL: https://issues.apache.org/jira/browse/SPARK-27853 Project: Spark Issue Type: Improvement Components: Optimizer, SQL Affects Versions: 2.4.3 Reporter: Marc Arndt
When partitioning a Dataset Spark uses the physical plan element ShuffleExchangeExec together with a Partitioning instance. I find myself in situation where I need to provide my own partitioning criteria, that decides to which partition each InternalRow should belong. According to the Spark API I would expect to be able to provide my custom partitioning criteria as a custom implementation of the Partitioning interface. Sadly after implementing a custom Partitioning implementation you will receive a "Exchange not implemented for $newPartitioning" error message, because of the following code inside the ShuffleExchangeExec#prepareShuffleDependency method: {code:scala} val part: Partitioner = newPartitioning match { case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) case HashPartitioning(_, n) => new Partitioner { override def numPartitions: Int = n // For HashPartitioning, the partitioning key is already a valid partition ID, as we use // `HashPartitioning.partitionIdExpression` to produce partitioning key. override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) => // Internally, RangePartitioner runs a job on the RDD that samples keys to compute // partition bounds. To get accurate samples, we need to copy the mutable keys. val rddForSampling = rdd.mapPartitionsInternal { iter => val mutablePair = new MutablePair[InternalRow, Null]() iter.map(row => mutablePair.update(row.copy(), null)) } implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) new RangePartitioner( numPartitions, rddForSampling, ascending = true, samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition) case SinglePartition => new Partitioner { override def numPartitions: Int = 1 override def getPartition(key: Any): Int = 0 } case _ => sys.error(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { case RoundRobinPartitioning(numPartitions) => // Distributes elements evenly across output partitions, starting from a random partition. var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 position } case h: HashPartitioning => val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes) row => projection(row).getInt(0) case RangePartitioning(_, _) | SinglePartition => identity case _ => sys.error(s"Exchange not implemented for $newPartitioning") } {code} The code in the above code snippet matches the given Partitioning instance "newPartitioning" against a set of given hardcoded Partitioning types. When adding a new Partitioning implementation the pattern matching won't be able to find a pattern for it and therefore will use the fallback case: {code:java} case _ => sys.error(s"Exchange not implemented for $newPartitioning") {code} and throw an exception. To be able to provide custom partition behaviour I would suggest to change the implementation in ShuffleExchangeExec to be able to work with an arbitrary Partitioning implementation. For the Partition creation I would imagine that this can be done in a nice way inside the Partitioning classes via a Partitioning#createPartitioner method. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org