This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 80bef433f docs: Add new section to contributor guide, explaining how
to add a new operator (#2758)
80bef433f is described below
commit 80bef433ff6e4a60bbc3e30450638d7db45ec17f
Author: Andy Grove <[email protected]>
AuthorDate: Wed Nov 19 12:16:17 2025 -0700
docs: Add new section to contributor guide, explaining how to add a new
operator (#2758)
---
.../contributor-guide/adding_a_new_operator.md | 597 +++++++++++++++++++++
docs/source/contributor-guide/index.md | 1 +
2 files changed, 598 insertions(+)
diff --git a/docs/source/contributor-guide/adding_a_new_operator.md
b/docs/source/contributor-guide/adding_a_new_operator.md
new file mode 100644
index 000000000..4317943aa
--- /dev/null
+++ b/docs/source/contributor-guide/adding_a_new_operator.md
@@ -0,0 +1,597 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Adding a New Operator
+
+This guide explains how to add support for a new Spark physical operator in
Apache DataFusion Comet.
+
+## Overview
+
+`CometExecRule` is responsible for replacing Spark operators with Comet
operators. There are different approaches to
+implementing Comet operators depending on where they execute and how they
integrate with the native execution engine.
+
+### Types of Comet Operators
+
+`CometExecRule` maintains two distinct maps of operators:
+
+#### 1. Native Operators (`nativeExecs` map)
+
+These operators run entirely in native Rust code and are the primary way to
accelerate Spark workloads. Native
+operators are registered in the `nativeExecs` map in `CometExecRule.scala`.
+
+Key characteristics of native operators:
+
+- They are converted to their corresponding native protobuf representation
+- They execute as DataFusion operators in the native engine
+- The `CometOperatorSerde` implementation handles enable/disable checks,
support validation, and protobuf serialization
+
+Examples: `ProjectExec`, `FilterExec`, `SortExec`, `HashAggregateExec`,
`SortMergeJoinExec`, `ExpandExec`, `WindowExec`
+
+#### 2. Sink Operators (`sinks` map)
+
+Sink operators serve as entry points (data sources) for native execution
blocks. They are registered in the `sinks`
+map in `CometExecRule.scala`.
+
+Key characteristics of sinks:
+
+- They become `ScanExec` operators in the native plan (see `operator2Proto` in
`CometExecRule.scala`)
+- They can be leaf nodes that feed data into native execution blocks
+- They are wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` during
plan transformation
+- Examples include operators that bring data from various sources into native
execution
+
+Examples: `UnionExec`, `CoalesceExec`, `CollectLimitExec`,
`TakeOrderedAndProjectExec`
+
+Special sinks (not in the `sinks` map but also treated as sinks):
+
+- `CometScanExec` - File scans
+- `CometSparkToColumnarExec` - Conversion from Spark row format
+- `ShuffleExchangeExec` / `BroadcastExchangeExec` - Exchange operators
+
+#### 3. Comet JVM Operators
+
+These operators run in the JVM but are part of the Comet execution path. For
JVM operators, all checks happen
+in `CometExecRule` rather than using `CometOperatorSerde`, because they don't
need protobuf serialization.
+
+Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
+
+### Choosing the Right Operator Type
+
+When adding a new operator, choose based on these criteria:
+
+**Use Native Operators when:**
+
+- The operator transforms data (e.g., project, filter, sort, aggregate, join)
+- The operator has a direct DataFusion equivalent or custom implementation
+- The operator consumes native child operators and produces native output
+- The operator is in the middle of an execution pipeline
+
+**Use Sink Operators when:**
+
+- The operator serves as a data source for native execution (becomes a
`ScanExec`)
+- The operator brings data from non-native sources (e.g., `UnionExec`
combining multiple inputs)
+- The operator is typically a leaf or near-leaf node in the execution tree
+- The operator needs special handling to interface with the native engine
+
+**Implementation Note for Sinks:**
+
+Sink operators are handled specially in `CometExecRule.operator2Proto`.
Instead of converting to their own operator
+type, they are converted to `ScanExec` in the native plan. This allows them to
serve as entry points for native
+execution blocks. The original Spark operator is wrapped with
`CometScanWrapper` or `CometSinkPlaceHolder` which
+manages the boundary between JVM and native execution.
+
+## Implementing a Native Operator
+
+This section focuses on adding a native operator, which is the most common and
complex case.
+
+### Step 1: Define the Protobuf Message
+
+First, add the operator definition to `native/proto/src/proto/operator.proto`.
+
+#### Add to the Operator Message
+
+Add your new operator to the `oneof op_struct` in the main `Operator` message:
+
+```proto
+message Operator {
+ repeated Operator children = 1;
+ uint32 plan_id = 2;
+
+ oneof op_struct {
+ Scan scan = 100;
+ Projection projection = 101;
+ Filter filter = 102;
+ // ... existing operators ...
+ YourNewOperator your_new_operator = 112; // Choose next available number
+ }
+}
+```
+
+#### Define the Operator Message
+
+Create a message for your operator with the necessary fields:
+
+```proto
+message YourNewOperator {
+ // Fields specific to your operator
+ repeated spark.spark_expression.Expr expressions = 1;
+ // Add other configuration fields as needed
+}
+```
+
+For reference, see existing operators like `Filter` (simple), `HashAggregate`
(complex), or `Sort` (with ordering).
+
+### Step 2: Create a CometOperatorSerde Implementation
+
+Create a new Scala file in
`spark/src/main/scala/org/apache/comet/serde/operator/` (e.g.,
`CometYourOperator.scala`) that extends `CometOperatorSerde[T]` where `T` is
the Spark operator type.
+
+The `CometOperatorSerde` trait provides several key methods:
+
+- `enabledConfig: Option[ConfigEntry[Boolean]]` - Configuration to
enable/disable this operator
+- `getSupportLevel(operator: T): SupportLevel` - Determines if the operator is
supported
+- `convert(op: T, builder: Operator.Builder, childOp: Operator*):
Option[Operator]` - Converts to protobuf
+- `createExec(nativeOp: Operator, op: T): CometNativeExec` - Creates the Comet
execution operator wrapper
+
+The validation workflow in `CometExecRule.isOperatorEnabled`:
+
+1. Checks if the operator is enabled via `enabledConfig`
+2. Calls `getSupportLevel()` to determine compatibility
+3. Handles Compatible/Incompatible/Unsupported cases with appropriate fallback
messages
+
+#### Simple Example (Filter)
+
+```scala
+object CometFilterExec extends CometOperatorSerde[FilterExec] {
+
+ override def enabledConfig: Option[ConfigEntry[Boolean]] =
+ Some(CometConf.COMET_EXEC_FILTER_ENABLED)
+
+ override def convert(
+ op: FilterExec,
+ builder: Operator.Builder,
+ childOp: OperatorOuterClass.Operator*):
Option[OperatorOuterClass.Operator] = {
+ val cond = exprToProto(op.condition, op.child.output)
+
+ if (cond.isDefined && childOp.nonEmpty) {
+ val filterBuilder = OperatorOuterClass.Filter
+ .newBuilder()
+ .setPredicate(cond.get)
+ Some(builder.setFilter(filterBuilder).build())
+ } else {
+ withInfo(op, op.condition, op.child)
+ None
+ }
+ }
+
+ override def createExec(nativeOp: Operator, op: FilterExec): CometNativeExec
= {
+ CometFilterExec(nativeOp, op, op.output, op.condition, op.child,
SerializedPlan(None))
+ }
+}
+
+case class CometFilterExec(
+ override val nativeOp: Operator,
+ override val originalPlan: SparkPlan,
+ override val output: Seq[Attribute],
+ condition: Expression,
+ child: SparkPlan,
+ override val serializedPlanOpt: SerializedPlan)
+ extends CometUnaryExec {
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
+ override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ this.copy(child = newChild)
+}
+```
+
+#### More Complex Example (Project)
+
+```scala
+object CometProjectExec extends CometOperatorSerde[ProjectExec] {
+
+ override def enabledConfig: Option[ConfigEntry[Boolean]] =
+ Some(CometConf.COMET_EXEC_PROJECT_ENABLED)
+
+ override def convert(
+ op: ProjectExec,
+ builder: Operator.Builder,
+ childOp: Operator*): Option[OperatorOuterClass.Operator] = {
+ val exprs = op.projectList.map(exprToProto(_, op.child.output))
+
+ if (exprs.forall(_.isDefined) && childOp.nonEmpty) {
+ val projectBuilder = OperatorOuterClass.Projection
+ .newBuilder()
+ .addAllProjectList(exprs.map(_.get).asJava)
+ Some(builder.setProjection(projectBuilder).build())
+ } else {
+ withInfo(op, op.projectList: _*)
+ None
+ }
+ }
+
+ override def createExec(nativeOp: Operator, op: ProjectExec):
CometNativeExec = {
+ CometProjectExec(nativeOp, op, op.output, op.projectList, op.child,
SerializedPlan(None))
+ }
+}
+
+case class CometProjectExec(
+ override val nativeOp: Operator,
+ override val originalPlan: SparkPlan,
+ override val output: Seq[Attribute],
+ projectList: Seq[NamedExpression],
+ child: SparkPlan,
+ override val serializedPlanOpt: SerializedPlan)
+ extends CometUnaryExec
+ with PartitioningPreservingUnaryExecNode {
+
+ override def producedAttributes: AttributeSet = outputSet
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ this.copy(child = newChild)
+}
+```
+
+#### Using getSupportLevel
+
+Override `getSupportLevel` to control operator support based on specific
conditions:
+
+```scala
+override def getSupportLevel(operator: YourOperatorExec): SupportLevel = {
+ // Check for unsupported features
+ if (operator.hasUnsupportedFeature) {
+ return Unsupported(Some("Feature X is not supported"))
+ }
+
+ // Check for incompatible behavior
+ if (operator.hasKnownDifferences) {
+ return Incompatible(Some("Known differences in edge case Y"))
+ }
+
+ Compatible()
+}
+```
+
+Support levels:
+
+- **`Compatible()`** - Fully compatible with Spark (default)
+- **`Incompatible()`** - Supported but may differ; requires explicit opt-in
+- **`Unsupported()`** - Not supported under current conditions
+
+Note that Comet will treat an operator as incompatible if any of the child
expressions are incompatible.
+
+### Step 3: Register the Operator
+
+Add your operator to the appropriate map in `CometExecRule.scala`:
+
+#### For Native Operators
+
+Add to the `nativeExecs` map (`CometExecRule.scala`):
+
+```scala
+val nativeExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
+ Map(
+ classOf[ProjectExec] -> CometProjectExec,
+ classOf[FilterExec] -> CometFilterExec,
+ // ... existing operators ...
+ classOf[YourOperatorExec] -> CometYourOperator,
+ )
+```
+
+#### For Sink Operators
+
+If your operator is a sink (becomes a `ScanExec` in the native plan), add to
the `sinks` map (`CometExecRule.scala`):
+
+```scala
+val sinks: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
+ Map(
+ classOf[CoalesceExec] -> CometCoalesceExec,
+ classOf[UnionExec] -> CometUnionExec,
+ // ... existing operators ...
+ classOf[YourSinkOperatorExec] -> CometYourSinkOperator,
+ )
+```
+
+Note: The `allExecs` map automatically combines both `nativeExecs` and
`sinks`, so you only need to add to one of the two maps.
+
+### Step 4: Add Configuration Entry
+
+Add a configuration entry in
`common/src/main/scala/org/apache/comet/CometConf.scala`:
+
+```scala
+val COMET_EXEC_YOUR_OPERATOR_ENABLED: ConfigEntry[Boolean] =
+ conf("spark.comet.exec.yourOperator.enabled")
+ .doc("Whether to enable your operator in Comet")
+ .booleanConf
+ .createWithDefault(true)
+```
+
+Run `make` to update the user guide. The new configuration option will be
added to `docs/source/user-guide/latest/configs.md`.
+
+### Step 5: Implement the Native Operator in Rust
+
+#### Update the Planner
+
+In `native/core/src/execution/planner.rs`, add a match case in the operator
deserialization logic to handle your new protobuf message:
+
+```rust
+use datafusion_comet_proto::spark_operator::operator::OpStruct;
+
+// In the create_plan or similar method:
+match op.op_struct.as_ref() {
+ Some(OpStruct::Scan(scan)) => {
+ // ... existing cases ...
+ }
+ Some(OpStruct::YourNewOperator(your_op)) => {
+ create_your_operator_exec(your_op, children, session_ctx)
+ }
+ // ... other cases ...
+}
+```
+
+#### Implement the Operator
+
+Create the operator implementation, either in an existing file or a new file
in `native/core/src/execution/operators/`:
+
+```rust
+use datafusion::physical_plan::{ExecutionPlan, ...};
+use datafusion_comet_proto::spark_operator::YourNewOperator;
+
+pub fn create_your_operator_exec(
+ op: &YourNewOperator,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ session_ctx: &SessionContext,
+) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> {
+ // Deserialize expressions and configuration
+ // Create and return the execution plan
+
+ // Option 1: Use existing DataFusion operator
+ // Ok(Arc::new(SomeDataFusionExec::try_new(...)?))
+
+ // Option 2: Implement custom operator (see ExpandExec for example)
+ // Ok(Arc::new(YourCustomExec::new(...)))
+}
+```
+
+For custom operators, you'll need to implement the `ExecutionPlan` trait. See
`native/core/src/execution/operators/expand.rs` or `scan.rs` for examples.
+
+### Step 6: Add Tests
+
+#### Scala Integration Tests
+
+Add tests in `spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala`
or a related test suite:
+
+```scala
+test("your operator") {
+ withTable("test_table") {
+ sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet")
+ sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")
+
+ // Test query that uses your operator
+ checkSparkAnswerAndOperator(
+ "SELECT * FROM test_table WHERE col1 > 1"
+ )
+ }
+}
+```
+
+The `checkSparkAnswerAndOperator` helper verifies:
+
+1. Results match Spark's native execution
+2. Your operator is actually being used (not falling back)
+
+#### Rust Unit Tests
+
+Add unit tests in your Rust implementation file:
+
+```rust
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_your_operator() {
+ // Test operator creation and execution
+ }
+}
+```
+
+### Step 7: Update Documentation
+
+Add your operator to the supported operators list in
`docs/source/user-guide/latest/compatibility.md` or similar documentation.
+
+## Implementing a Sink Operator
+
+Sink operators are converted to `ScanExec` in the native plan and serve as
entry points for native execution. The implementation is simpler than native
operators because sink operators extend the `CometSink` base class which
provides the conversion logic.
+
+### Step 1: Create a CometOperatorSerde Implementation
+
+Create a new Scala file in `spark/src/main/scala/org/apache/spark/sql/comet/`
(e.g., `CometYourSinkOperator.scala`):
+
+```scala
+import org.apache.comet.serde.operator.CometSink
+
+object CometYourSinkOperator extends CometSink[YourSinkExec] {
+
+ override def enabledConfig: Option[ConfigEntry[Boolean]] =
+ Some(CometConf.COMET_EXEC_YOUR_SINK_ENABLED)
+
+ // Optional: Override if the data produced is FFI safe
+ override def isFfiSafe: Boolean = false
+
+ override def createExec(
+ nativeOp: OperatorOuterClass.Operator,
+ op: YourSinkExec): CometNativeExec = {
+ CometSinkPlaceHolder(
+ nativeOp,
+ op,
+ CometYourSinkExec(op, op.output, /* other parameters */, op.child))
+ }
+
+ // Optional: Override getSupportLevel if you need custom validation beyond
data types
+ override def getSupportLevel(operator: YourSinkExec): SupportLevel = {
+ // CometSink base class already checks data types in convert()
+ // Add any additional validation here
+ Compatible()
+ }
+}
+
+/**
+ * Comet implementation of YourSinkExec that supports columnar processing
+ */
+case class CometYourSinkExec(
+ override val originalPlan: SparkPlan,
+ override val output: Seq[Attribute],
+ /* other parameters */,
+ child: SparkPlan)
+ extends CometExec
+ with UnaryExecNode {
+
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ // Implement columnar execution logic
+ val rdd = child.executeColumnar()
+ // Apply your sink operator's logic
+ rdd
+ }
+
+ override def outputPartitioning: Partitioning = {
+ // Define output partitioning
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ this.copy(child = newChild)
+}
+```
+
+**Key Points:**
+
+- Extend `CometSink[T]` which provides the `convert()` method that transforms
the operator to `ScanExec`
+- The `CometSink.convert()` method (in `CometSink.scala`) automatically
handles:
+ - Data type validation
+ - Conversion to `ScanExec` in the native plan
+ - Setting FFI safety flags
+- You must implement `createExec()` to wrap the operator appropriately
+- You typically need to create a corresponding `CometYourSinkExec` class that
implements columnar execution
+
+### Step 2: Register the Sink
+
+Add your sink to the `sinks` map in `CometExecRule.scala`:
+
+```scala
+val sinks: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
+ Map(
+ classOf[CoalesceExec] -> CometCoalesceExec,
+ classOf[UnionExec] -> CometUnionExec,
+ classOf[YourSinkExec] -> CometYourSinkOperator,
+ )
+```
+
+### Step 3: Add Configuration
+
+Add a configuration entry in `CometConf.scala`:
+
+```scala
+val COMET_EXEC_YOUR_SINK_ENABLED: ConfigEntry[Boolean] =
+ conf("spark.comet.exec.yourSink.enabled")
+ .doc("Whether to enable your sink operator in Comet")
+ .booleanConf
+ .createWithDefault(true)
+```
+
+### Step 4: Add Tests
+
+Test that your sink operator correctly feeds data into native execution:
+
+```scala
+test("your sink operator") {
+ withTable("test_table") {
+ sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet")
+ sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")
+
+ // Test query that uses your sink operator followed by native operators
+ checkSparkAnswerAndOperator(
+ "SELECT col1 + 1 FROM (/* query that produces YourSinkExec */)"
+ )
+ }
+}
+```
+
+**Important Notes for Sinks:**
+
+- Sinks extend the `CometSink` base class, which provides the `convert()`
method implementation
+- The `CometSink.convert()` method automatically handles conversion to
`ScanExec` in the native plan
+- You don't need to add protobuf definitions for sink operators - they use the
standard `Scan` message
+- You don't need Rust implementation for sinks - they become standard
`ScanExec` operators that read from the JVM
+- Sink implementations should provide a columnar-compatible execution class
(e.g., `CometCoalesceExec`)
+- The `createExec()` method wraps the operator with `CometSinkPlaceHolder` to
manage the JVM-to-native boundary
+- See `CometCoalesceExec.scala` or `CometUnionExec` in
`spark/src/main/scala/org/apache/spark/sql/comet/` for reference implementations
+
+## Implementing a JVM Operator
+
+For operators that run in the JVM:
+
+1. Create a new operator class extending appropriate Spark base classes in
`spark/src/main/scala/org/apache/comet/`
+2. Add matching logic in `CometExecRule.scala` to transform the Spark operator
+3. No protobuf or Rust implementation needed
+
+Example pattern from `CometExecRule.scala`:
+
+```scala
+case s: ShuffleExchangeExec if nativeShuffleSupported(s) =>
+ CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
+```
+
+## Common Patterns and Helpers
+
+### Expression Conversion
+
+Use `QueryPlanSerde.exprToProto` to convert Spark expressions to protobuf:
+
+```scala
+val protoExpr = exprToProto(sparkExpr, inputSchema)
+```
+
+### Handling Fallback
+
+Use `withInfo` to tag operators with fallback reasons:
+
+```scala
+if (!canConvert) {
+ withInfo(op, "Reason for fallback", childNodes: _*)
+ return None
+}
+```
+
+### Child Operator Validation
+
+Always check that child operators were successfully converted:
+
+```scala
+if (childOp.isEmpty) {
+ // Cannot convert if children failed
+ return None
+}
+```
+
+## Debugging Tips
+
+1. **Enable verbose logging**: Set `spark.comet.explain.format=verbose` to see
detailed plan transformations
+2. **Check fallback reasons**: Set
`spark.comet.logFallbackReasons.enabled=true` to log why operators fall back to
Spark
+3. **Verify protobuf**: Add debug prints in Rust to inspect deserialized
operators
+4. **Use EXPLAIN**: Run `EXPLAIN EXTENDED` on queries to see the physical plan
diff --git a/docs/source/contributor-guide/index.md
b/docs/source/contributor-guide/index.md
index eb79f7ab5..7b0385094 100644
--- a/docs/source/contributor-guide/index.md
+++ b/docs/source/contributor-guide/index.md
@@ -30,6 +30,7 @@ Parquet Scans <parquet_scans>
Development Guide <development>
Debugging Guide <debugging>
Benchmarking Guide <benchmarking>
+Adding a New Operator <adding_a_new_operator>
Adding a New Expression <adding_a_new_expression>
Tracing <tracing>
Profiling Native Code <profiling_native_code>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]