andygrove opened a new issue, #3104:
URL: https://github.com/apache/datafusion-comet/issues/3104

   ## What is the problem the feature request solves?
   
   > **Note:** This issue was generated with AI assistance. The specification 
details have been extracted from Spark documentation and may need verification.
   
   Comet does not currently support the Spark `current_batch_timestamp` 
function, causing queries using this function to fall back to Spark's JVM 
execution instead of running natively on DataFusion.
   
   The `CurrentBatchTimestamp` expression represents a timestamp value that 
remains constant for the duration of a streaming batch. It is designed to 
prevent optimizer from pushing it below stateful operators and allows 
IncrementalExecution to substitute it with a literal value during streaming 
query execution.
   
   Supporting this expression would allow more Spark workloads to benefit from 
Comet's native acceleration.
   
   ## Describe the potential solution
   
   ### Spark Specification
   
   **Syntax:**
   ```sql
   current_batch_timestamp()
   ```
   
   **Arguments:**
   | Argument | Type | Description |
   |----------|------|-------------|
   | timestampMs | Long | The timestamp value in milliseconds |
   | dataType | DataType | The target data type (TimestampType, 
TimestampNTZType, or DateType) |
   | timeZoneId | Option[String] | Optional timezone identifier for 
timezone-aware conversions |
   
   **Return Type:** Returns one of the following data types based on the 
configured `dataType` parameter:
   
   - `TimestampType` - Returns timestamp in microseconds
   - `TimestampNTZType` - Returns timezone-naive timestamp in microseconds  
   - `DateType` - Returns date as days since epoch
   
   **Supported Data Types:**
   This expression supports conversion to the following output data types:
   
   - TimestampType (with timezone)
   - TimestampNTZType (timezone-naive)
   - DateType
   
   **Edge Cases:**
   - Null handling: Expression is marked as non-nullable (`nullable = false`)
   - Empty input: Returns the configured timestamp value regardless of input 
row content
   - Timezone conversion: When no timeZoneId is provided, defaults to system 
timezone for conversions
   - Batch consistency: Same timestamp value is returned for all rows within a 
single batch
   
   **Examples:**
   ```sql
   -- Returns current batch timestamp as timestamp type
   SELECT current_batch_timestamp()
   
   -- Can be used in streaming queries to get batch processing time
   SELECT id, value, current_batch_timestamp() as batch_time FROM 
streaming_table
   ```
   
   ```scala
   // DataFrame API usage in streaming context
   import org.apache.spark.sql.functions._
   
   val streamingDF = spark.readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "localhost:9092")  
     .load()
   
   val withBatchTime = streamingDF.select(
     col("value"),
     expr("current_batch_timestamp()").as("batch_time")
   )
   ```
   
   ### Implementation Approach
   
   See the [Comet guide on adding new 
expressions](https://datafusion.apache.org/comet/contributor-guide/adding_a_new_expression.html)
 for detailed instructions.
   
   1. **Scala Serde**: Add expression handler in 
`spark/src/main/scala/org/apache/comet/serde/`
   2. **Register**: Add to appropriate map in `QueryPlanSerde.scala`
   3. **Protobuf**: Add message type in `native/proto/src/proto/expr.proto` if 
needed
   4. **Rust**: Implement in `native/spark-expr/src/` (check if DataFusion has 
built-in support first)
   
   
   ## Additional context
   
   **Difficulty:** Medium
   **Spark Expression Class:** 
`org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp`
   
   **Related:**
   - `CurrentTimestamp` - Returns current system timestamp
   - `Literal` - Static literal values
   - `TimeZoneAwareExpression` - Base trait for timezone-aware expressions
   - `Nondeterministic` - Trait for expressions that return different values 
across evaluations
   
   ---
   *This issue was auto-generated from Spark reference documentation.*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to