mbutrovich commented on code in PR #2758:
URL: https://github.com/apache/datafusion-comet/pull/2758#discussion_r2519309345


##########
docs/source/contributor-guide/adding_a_new_operator.md:
##########
@@ -0,0 +1,356 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Adding a New Operator
+
+This guide explains how to add support for a new Spark physical operator in 
Apache DataFusion Comet.
+
+## Overview
+
+`CometExecRule` is responsible for replacing Spark operators with Comet 
operators. There are different approaches to implementing Comet operators 
depending on where they execute and how they integrate with the native 
execution engine.
+
+### Types of Comet Operators
+
+#### 1. Comet Native Operators
+
+These operators run entirely in native Rust code and are the primary way to 
accelerate Spark workloads. For native operators, `CometExecRule` delegates to 
`QueryPlanSerde.operator2Proto` to:
+
+- Check if the operator is enabled or disabled via configuration
+- Validate if the operator can be supported
+- Tag the operator with fallback reasons if conversion fails
+- Serialize the operator to protobuf for native execution
+
+Examples: `ProjectExec`, `FilterExec`, `SortExec`, `HashAggregateExec`, 
`SortMergeJoinExec`
+
+#### 2. Comet JVM Operators
+
+These operators run in the JVM but are part of the Comet execution path. For 
JVM operators, all checks happen in `CometExecRule` rather than 
`QueryPlanSerde`, because they don't need protobuf serialization.
+
+Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
+
+#### 3. Comet Sinks
+
+Some operators serve as "sinks" or entry points for native execution, meaning 
they can be leaf nodes that feed data into native execution blocks.
+
+Examples: `CometScanExec`, `CometBatchScanExec`, `UnionExec`, 
`CometSparkToColumnarExec`
+
+## Implementing a Native Operator
+
+This section focuses on adding a native operator, which is the most common and 
complex case.
+
+### Step 1: Define the Protobuf Message
+
+First, add the operator definition to `native/proto/src/proto/operator.proto`.
+
+#### Add to the Operator Message
+
+Add your new operator to the `oneof op_struct` in the main `Operator` message:
+
+```proto
+message Operator {
+  repeated Operator children = 1;
+  uint32 plan_id = 2;
+
+  oneof op_struct {
+    Scan scan = 100;
+    Projection projection = 101;
+    Filter filter = 102;
+    // ... existing operators ...
+    YourNewOperator your_new_operator = 112;  // Choose next available number
+  }
+}
+```
+
+#### Define the Operator Message
+
+Create a message for your operator with the necessary fields:
+
+```proto
+message YourNewOperator {
+  // Fields specific to your operator
+  repeated spark.spark_expression.Expr expressions = 1;
+  // Add other configuration fields as needed
+}
+```
+
+For reference, see existing operators like `Filter` (simple), `HashAggregate` 
(complex), or `Sort` (with ordering).
+
+### Step 2: Create a CometOperatorSerde Implementation
+
+Create a new Scala file in 
`spark/src/main/scala/org/apache/comet/serde/operator/` (e.g., 
`CometYourOperator.scala`) that extends `CometOperatorSerde[T]` where `T` is 
the Spark operator type.
+
+The `CometOperatorSerde` trait provides three key methods:
+
+- `enabledConfig: Option[ConfigEntry[Boolean]]` - Configuration to 
enable/disable this operator
+- `getSupportLevel(operator: T): SupportLevel` - Determines if the operator is 
supported
+- `convert(op: T, builder: Operator.Builder, childOp: Operator*): 
Option[Operator]` - Converts to protobuf
+
+#### Simple Example (Filter)
+
+```scala
+object CometFilter extends CometOperatorSerde[FilterExec] {
+
+  override def enabledConfig: Option[ConfigEntry[Boolean]] =
+    Some(CometConf.COMET_EXEC_FILTER_ENABLED)
+
+  override def convert(
+      op: FilterExec,
+      builder: Operator.Builder,
+      childOp: OperatorOuterClass.Operator*): 
Option[OperatorOuterClass.Operator] = {
+    val cond = exprToProto(op.condition, op.child.output)
+
+    if (cond.isDefined && childOp.nonEmpty) {
+      val filterBuilder = OperatorOuterClass.Filter
+        .newBuilder()
+        .setPredicate(cond.get)
+      Some(builder.setFilter(filterBuilder).build())
+    } else {
+      withInfo(op, op.condition, op.child)
+      None
+    }
+  }
+}
+```
+
+#### More Complex Example (Project)
+
+```scala
+object CometProject extends CometOperatorSerde[ProjectExec] {
+
+  override def enabledConfig: Option[ConfigEntry[Boolean]] =
+    Some(CometConf.COMET_EXEC_PROJECT_ENABLED)
+
+  override def convert(
+      op: ProjectExec,
+      builder: Operator.Builder,
+      childOp: Operator*): Option[OperatorOuterClass.Operator] = {
+    val exprs = op.projectList.map(exprToProto(_, op.child.output))
+
+    if (exprs.forall(_.isDefined) && childOp.nonEmpty) {
+      val projectBuilder = OperatorOuterClass.Projection
+        .newBuilder()
+        .addAllProjectList(exprs.map(_.get).asJava)
+      Some(builder.setProjection(projectBuilder).build())
+    } else {
+      withInfo(op, op.projectList: _*)
+      None
+    }
+  }
+}
+```
+
+#### Using getSupportLevel
+
+Override `getSupportLevel` to control operator support based on specific 
conditions:
+
+```scala
+override def getSupportLevel(operator: YourOperatorExec): SupportLevel = {
+  // Check for unsupported features
+  if (operator.hasUnsupportedFeature) {
+    return Unsupported(Some("Feature X is not supported"))
+  }
+
+  // Check for incompatible behavior
+  if (operator.hasKnownDifferences) {
+    return Incompatible(Some("Known differences in edge case Y"))
+  }
+
+  Compatible()
+}
+```
+
+Support levels:
+
+- **`Compatible()`** - Fully compatible with Spark (default)

Review Comment:
   There's a composability challenge here: how should we classify an operator 
that we know certain expressions can yield different results (e.g., min or max 
aggregates with certain types (I'm thinking -0.0 and +0.0))? Is that to be 
handled under the expression compatibility, or does an operator that could 
contain an expression inherit possible expression compatibilities? The latter 
seems wrong as I type it out, just raising the issue and possibly asking for a 
clarification in the doc that a compatible operator could still be incompatible 
given certain expressions.



##########
docs/source/contributor-guide/adding_a_new_operator.md:
##########
@@ -0,0 +1,356 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Adding a New Operator
+
+This guide explains how to add support for a new Spark physical operator in 
Apache DataFusion Comet.
+
+## Overview
+
+`CometExecRule` is responsible for replacing Spark operators with Comet 
operators. There are different approaches to implementing Comet operators 
depending on where they execute and how they integrate with the native 
execution engine.
+
+### Types of Comet Operators
+
+#### 1. Comet Native Operators
+
+These operators run entirely in native Rust code and are the primary way to 
accelerate Spark workloads. For native operators, `CometExecRule` delegates to 
`QueryPlanSerde.operator2Proto` to:
+
+- Check if the operator is enabled or disabled via configuration
+- Validate if the operator can be supported
+- Tag the operator with fallback reasons if conversion fails
+- Serialize the operator to protobuf for native execution
+
+Examples: `ProjectExec`, `FilterExec`, `SortExec`, `HashAggregateExec`, 
`SortMergeJoinExec`
+
+#### 2. Comet JVM Operators
+
+These operators run in the JVM but are part of the Comet execution path. For 
JVM operators, all checks happen in `CometExecRule` rather than 
`QueryPlanSerde`, because they don't need protobuf serialization.
+
+Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
+
+#### 3. Comet Sinks
+
+Some operators serve as "sinks" or entry points for native execution, meaning 
they can be leaf nodes that feed data into native execution blocks.

Review Comment:
   I find "entry points" confusing because depending on if you're talking push 
or pull execution it can mean different things. We know how Spark, Comet, and 
DataFusion work, but for new contributors it might be confusing. I suggest 
"data sources."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to