andygrove commented on code in PR #2758:
URL: https://github.com/apache/datafusion-comet/pull/2758#discussion_r2542974847


##########
docs/source/contributor-guide/adding_a_new_operator.md:
##########
@@ -0,0 +1,356 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Adding a New Operator
+
+This guide explains how to add support for a new Spark physical operator in 
Apache DataFusion Comet.
+
+## Overview
+
+`CometExecRule` is responsible for replacing Spark operators with Comet 
operators. There are different approaches to implementing Comet operators 
depending on where they execute and how they integrate with the native 
execution engine.
+
+### Types of Comet Operators
+
+#### 1. Comet Native Operators
+
+These operators run entirely in native Rust code and are the primary way to 
accelerate Spark workloads. For native operators, `CometExecRule` delegates to 
`QueryPlanSerde.operator2Proto` to:
+
+- Check if the operator is enabled or disabled via configuration
+- Validate if the operator can be supported
+- Tag the operator with fallback reasons if conversion fails
+- Serialize the operator to protobuf for native execution
+
+Examples: `ProjectExec`, `FilterExec`, `SortExec`, `HashAggregateExec`, 
`SortMergeJoinExec`
+
+#### 2. Comet JVM Operators
+
+These operators run in the JVM but are part of the Comet execution path. For 
JVM operators, all checks happen in `CometExecRule` rather than 
`QueryPlanSerde`, because they don't need protobuf serialization.
+
+Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
+
+#### 3. Comet Sinks
+
+Some operators serve as "sinks" or entry points for native execution, meaning 
they can be leaf nodes that feed data into native execution blocks.
+
+Examples: `CometScanExec`, `CometBatchScanExec`, `UnionExec`, 
`CometSparkToColumnarExec`
+
+## Implementing a Native Operator
+
+This section focuses on adding a native operator, which is the most common and 
complex case.
+
+### Step 1: Define the Protobuf Message
+
+First, add the operator definition to `native/proto/src/proto/operator.proto`.
+
+#### Add to the Operator Message
+
+Add your new operator to the `oneof op_struct` in the main `Operator` message:
+
+```proto
+message Operator {
+  repeated Operator children = 1;
+  uint32 plan_id = 2;
+
+  oneof op_struct {
+    Scan scan = 100;
+    Projection projection = 101;
+    Filter filter = 102;
+    // ... existing operators ...
+    YourNewOperator your_new_operator = 112;  // Choose next available number
+  }
+}
+```
+
+#### Define the Operator Message
+
+Create a message for your operator with the necessary fields:
+
+```proto
+message YourNewOperator {
+  // Fields specific to your operator
+  repeated spark.spark_expression.Expr expressions = 1;
+  // Add other configuration fields as needed
+}
+```
+
+For reference, see existing operators like `Filter` (simple), `HashAggregate` 
(complex), or `Sort` (with ordering).
+
+### Step 2: Create a CometOperatorSerde Implementation
+
+Create a new Scala file in 
`spark/src/main/scala/org/apache/comet/serde/operator/` (e.g., 
`CometYourOperator.scala`) that extends `CometOperatorSerde[T]` where `T` is 
the Spark operator type.
+
+The `CometOperatorSerde` trait provides three key methods:
+
+- `enabledConfig: Option[ConfigEntry[Boolean]]` - Configuration to 
enable/disable this operator
+- `getSupportLevel(operator: T): SupportLevel` - Determines if the operator is 
supported
+- `convert(op: T, builder: Operator.Builder, childOp: Operator*): 
Option[Operator]` - Converts to protobuf
+
+#### Simple Example (Filter)
+
+```scala
+object CometFilter extends CometOperatorSerde[FilterExec] {
+
+  override def enabledConfig: Option[ConfigEntry[Boolean]] =
+    Some(CometConf.COMET_EXEC_FILTER_ENABLED)
+
+  override def convert(
+      op: FilterExec,
+      builder: Operator.Builder,
+      childOp: OperatorOuterClass.Operator*): 
Option[OperatorOuterClass.Operator] = {
+    val cond = exprToProto(op.condition, op.child.output)
+
+    if (cond.isDefined && childOp.nonEmpty) {
+      val filterBuilder = OperatorOuterClass.Filter
+        .newBuilder()
+        .setPredicate(cond.get)
+      Some(builder.setFilter(filterBuilder).build())
+    } else {
+      withInfo(op, op.condition, op.child)
+      None
+    }
+  }
+}
+```
+
+#### More Complex Example (Project)
+
+```scala
+object CometProject extends CometOperatorSerde[ProjectExec] {
+
+  override def enabledConfig: Option[ConfigEntry[Boolean]] =
+    Some(CometConf.COMET_EXEC_PROJECT_ENABLED)
+
+  override def convert(
+      op: ProjectExec,
+      builder: Operator.Builder,
+      childOp: Operator*): Option[OperatorOuterClass.Operator] = {
+    val exprs = op.projectList.map(exprToProto(_, op.child.output))
+
+    if (exprs.forall(_.isDefined) && childOp.nonEmpty) {
+      val projectBuilder = OperatorOuterClass.Projection
+        .newBuilder()
+        .addAllProjectList(exprs.map(_.get).asJava)
+      Some(builder.setProjection(projectBuilder).build())
+    } else {
+      withInfo(op, op.projectList: _*)
+      None
+    }
+  }
+}
+```
+
+#### Using getSupportLevel
+
+Override `getSupportLevel` to control operator support based on specific 
conditions:
+
+```scala
+override def getSupportLevel(operator: YourOperatorExec): SupportLevel = {
+  // Check for unsupported features
+  if (operator.hasUnsupportedFeature) {
+    return Unsupported(Some("Feature X is not supported"))
+  }
+
+  // Check for incompatible behavior
+  if (operator.hasKnownDifferences) {
+    return Incompatible(Some("Known differences in edge case Y"))
+  }
+
+  Compatible()
+}
+```
+
+Support levels:
+
+- **`Compatible()`** - Fully compatible with Spark (default)

Review Comment:
   I added a brief note to address this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to