mbutrovich commented on code in PR #3349:
URL: https://github.com/apache/datafusion-comet/pull/3349#discussion_r2776659705
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -59,6 +58,126 @@ import
org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregat
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto,
supportedSortType}
import org.apache.comet.serde.operator.CometSink
+/**
+ * Trait for injecting per-partition planning data into operator nodes.
+ *
+ * Implementations handle specific operator types (e.g., Iceberg scans, Delta
scans).
+ */
+private[comet] trait PlanDataInjector {
+
+ /** Check if this injector can handle the given operator. */
+ def canInject(op: Operator): Boolean
+
+ /** Extract the key used to look up planning data for this operator. */
+ def getKey(op: Operator): Option[String]
+
+ /** Inject common + partition data into the operator node. */
+ def inject(op: Operator, commonBytes: Array[Byte], partitionBytes:
Array[Byte]): Operator
+}
+
+/**
+ * Registry and utilities for injecting per-partition planning data into
operator trees.
+ */
+private[comet] object PlanDataInjector {
+
+ // Registry of injectors for different operator types
+ private val injectors: Seq[PlanDataInjector] = Seq(
+ IcebergPlanDataInjector
+ // Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc.
+ )
+
+ /**
+ * Injects planning data into an Operator tree by finding nodes that need
injection and applying
+ * the appropriate injector.
+ *
+ * Supports joins over multiple tables by matching each operator with its
corresponding data
+ * based on a key (e.g., metadata_location for Iceberg).
+ */
+ def injectPlanData(
+ op: Operator,
+ commonByKey: Map[String, Array[Byte]],
+ partitionByKey: Map[String, Array[Byte]]): Operator = {
+ val builder = op.toBuilder
+
+ // Try each injector to see if it can handle this operator
+ for (injector <- injectors if injector.canInject(op)) {
+ injector.getKey(op) match {
+ case Some(key) =>
+ (commonByKey.get(key), partitionByKey.get(key)) match {
+ case (Some(commonBytes), Some(partitionBytes)) =>
+ val injectedOp = injector.inject(op, commonBytes, partitionBytes)
+ // Copy the injected operator's fields to our builder
+ builder.clear()
+ builder.mergeFrom(injectedOp)
+ case _ =>
+ throw new CometRuntimeException(s"Missing planning data for key:
$key")
+ }
+ case None => // No key, skip injection
+ }
+ }
+
+ // Recursively process children
+ builder.clearChildren()
+ op.getChildrenList.asScala.foreach { child =>
+ builder.addChildren(injectPlanData(child, commonByKey, partitionByKey))
+ }
+
+ builder.build()
+ }
+
+ def serializeOperator(op: Operator): Array[Byte] = {
+ val size = op.getSerializedSize
+ val bytes = new Array[Byte](size)
+ val codedOutput = CodedOutputStream.newInstance(bytes)
+ op.writeTo(codedOutput)
+ codedOutput.checkNoSpaceLeft()
+ bytes
+ }
+}
+
+/**
+ * Injector for Iceberg scan operators.
+ */
+private[comet] object IcebergPlanDataInjector extends PlanDataInjector {
+ import java.nio.ByteBuffer
+ import java.util.concurrent.ConcurrentHashMap
+
+ // Cache parsed IcebergScanCommon by content to avoid repeated
deserialization
+ // ByteBuffer wrapper provides content-based equality and hashCode
+ // TODO: This is a static singleton on the executor, should we cap the size
(proper LRU cache?)
Review Comment:
The TODO was mostly for discussion purposes. I'm gonna sleep on it and see
if I can tie the cache's life cycle to some object in the query execution so I
don't have to worry about a static singleton.
If I don't figure it out by the end of the weekend I'll open an issue and
replace the TODO with a link.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]