parthchandra opened a new issue, #4288: URL: https://github.com/apache/datafusion-comet/issues/4288
### What is the problem the feature request solves? # TimeType Support in Comet: Comprehensive Implementation Plan ## Background Spark 4.1 introduces `TimeType` (gated behind `spark.sql.timeType.enabled`). - Internal representation: `Long` (nanoseconds since midnight, 0 to 86,399,999,999,999) - Arrow mapping: `Time64(Nanosecond)` / `TimeNanoVector` - Physical type: same as `LongType` (8 bytes, fixed-width) - External type: `java.time.LocalTime` Most TimeType expressions in Spark are `RuntimeReplaceable` and resolve to `StaticInvoke(DateTimeUtils, ...)` or `Invoke(ToTimeParser, "parse", ...)` nodes in the execution plan. Comet intercepts these in the shim's `StaticInvoke` / `Invoke` match arms. --- ## Current State https://github.com/apache/datafusion-comet/pull/4256 implement `make_time` and `to_time` ### Already implemented | Component | File | Notes | |---|---|---| | Protobuf type ID | `native/proto/src/proto/types.proto` | `TIME = 17` | | `isTimeType` shim | `common/.../CometTypeShim.scala` | 3.x returns false, 4.x uses reflection | | `supportedDataType` | `QueryPlanSerde.scala:363` | Includes TimeType | | `serializeDataType` | `QueryPlanSerde.scala:399` | Maps TimeType to 17 | | Rust deserialization | `native/core/src/execution/serde.rs:99` | `Time -> Time64(Nanosecond)` | | Columnar-to-row | `native/core/src/execution/columnar_to_row.rs` | Full `Time64Nano` support | | Planner literal | `native/core/src/execution/planner.rs:347` | `ScalarValue::Time64Nanosecond` | | `to_time` / `try_to_time` | Rust + shim | Default (no-format) parsing only | | `make_time` | Rust + shim | `StaticInvoke(DateTimeUtils, "makeTime", ...)` | | Hash exclusion | `hash.scala:129` | `isTimeType(dt)` correctly rejects | ### Known correct exclusions | Item | Why | |---|---| | Hash (Murmur3, XxHash64) | Rust hash doesn't handle Time64. Correctly rejected in `hash.scala`. Spark hashes it as long; could be added later. | | Arithmetic (Add/Subtract) | TimeType uses dedicated `TimeAddInterval`/`SubtractTimes`, not generic arithmetic. `arithmetic.scala` correctly excludes. | --- ## Gap Analysis #### `to_time` with format patterns (fallback) **File**: `spark/src/main/spark-4.1/.../CometExprShim.scala:157-163` Currently `parser.fmt.isEmpty` gates the match, so `to_time('12.10.05', 'HH.mm.ss')` falls back to Spark. The SQL test has `query expect_fallback(invoke is not supported)`. **Status**: Correct behavior for now. Tracked for Phase 3. --- ### 1: Infrastructure (shuffle, sort, min/max) These are the core infrastructure pieces that make TimeType columns usable in real-world queries. Without them, any query that shuffles, sorts, or aggregates a time column falls back to Spark. Since TimeType is physically a fixed-width i64 (same as LongType/TimestampType), all of these should be straightforward additions to existing type lists. #### 1a. Shuffle (native) **File**: `spark/.../CometShuffleExchangeExec.scala:367-380` `supportedSerializableDataType` does not include TimeType. Add it alongside the existing `TimestampType | TimestampNTZType` case. Also need to check the Rust shuffle path: - `native/shuffle/src/spark_unsafe/row.rs` - needs Time64 handling #### 1b. Shuffle (JVM columnar) **File**: `spark/.../CometShuffleExchangeExec.scala:490-506` Same `supportedSerializableDataType` function (different scope). Add TimeType. #### 1c. Sort **File**: `spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:833-843` `supportedScalarSortElementType` does not include TimeType. Add alongside `DateType | TimestampType | TimestampNTZType`. The Rust sort implementation already handles i64 types, so no native changes should be needed. #### 1d. Min/Max aggregates **File**: `spark/src/main/scala/org/apache/comet/serde/aggregates.scala:742-751` `minMaxDataTypeSupported` has `DateType | TimestampType` but not TimeType. Add it. The Rust min/max implementation works on any orderable Arrow type, so Time64(Nanosecond) should work without native changes. #### 1e. Hash functions (optional) **File**: `spark/src/main/scala/org/apache/comet/serde/hash.scala:129-131` Currently correctly excluded. Spark hashes TimeType as its underlying long value (`genHashLong`). Could be enabled by removing the `isTimeType` rejection, since the Rust hash implementation already handles i64 types. However, this needs verification that the Rust hash produces identical results to Spark's `genHashLong` for the same long values. Low priority. --- ### 2: Time extraction and conversion functions These are the dedicated TimeType expressions from `timeExpressions.scala`. All are `RuntimeReplaceable` and resolve to `StaticInvoke(DateTimeUtils, methodName, ...)`. They follow the same shim pattern as `make_time`. #### 2a. Time extraction (from TimeType to Int/Decimal) | SQL | Spark expression | StaticInvoke method | Return type | |---|---|---|---| | `extract(HOUR FROM t)` | `HoursOfTime` | `getHoursOfTime` | IntegerType | | `extract(MINUTE FROM t)` | `MinutesOfTime` | `getMinutesOfTime` | IntegerType | | `extract(SECOND FROM t)` | `SecondsOfTime` | `getSecondsOfTime` | IntegerType | | `extract(SECOND FROM t)` (with fraction) | `SecondsOfTimeWithFraction` | `getSecondsOfTimeWithFraction` | DecimalType | **Shim pattern**: Match `StaticInvoke` where `staticObject == classOf[DateTimeUtils.type]` and `functionName` is one of the above. **Rust implementation**: Simple arithmetic on nanoseconds since midnight: - hours = nanos / NANOS_PER_HOUR - minutes = (nanos % NANOS_PER_HOUR) / NANOS_PER_MINUTE - seconds = (nanos % NANOS_PER_MINUTE) / NANOS_PER_SECOND #### 2b. Time construction (from numeric to TimeType) | SQL | Spark expression | StaticInvoke method | Input type | |---|---|---|---| | `time_from_seconds(n)` | `TimeFromSeconds` | `timeFromSeconds` | NumericType | | `time_from_millis(n)` | `TimeFromMillis` | `timeFromMillis` | IntegralType | | `time_from_micros(n)` | `TimeFromMicros` | `timeFromMicros` | IntegralType | **Rust implementation**: Multiply input by appropriate scale factor, validate range [0, 86_399_999_999_999]. #### 2c. Time deconstruction (from TimeType to numeric) | SQL | Spark expression | StaticInvoke method | Return type | |---|---|---|---| | `time_to_seconds(t)` | `TimeToSeconds` | `timeToSeconds` | LongType | | `time_to_millis(t)` | `TimeToMillis` | `timeToMillis` | LongType | | `time_to_micros(t)` | `TimeToMicros` | `timeToMicros` | LongType | **Rust implementation**: Divide nanoseconds by appropriate scale factor. #### 2d. current_time | SQL | Spark expression | Notes | |---|---|---| | `current_time()` | `CurrentTime` | NOT RuntimeReplaceable. It's a `UnaryExpression` with `foldable = true`. The optimizer constant-folds it to a literal. Comet would receive it as a Time literal (handled by Phase 0a). | **Likely no serde work needed** if Phase 0a (literal serialization) is done. --- ### 3: Time arithmetic and formatting #### 3a. Time arithmetic | SQL | Spark expression | Notes | |---|---|---| | `time_col + interval` | `TimeAddInterval` | RuntimeReplaceable -> StaticInvoke. Returns TimeType. | | `time1 - time2` | `SubtractTimes` | RuntimeReplaceable -> StaticInvoke. Returns DayTimeIntervalType. | | `time_trunc('HOUR', t)` | `TimeTrunc` | RuntimeReplaceable. Truncates to specified unit. | | `time_diff(unit, t1, t2)` | `TimeDiff` | RuntimeReplaceable. Returns Long. | **Rust**: Arithmetic on nanosecond values. `TimeTrunc` needs modular arithmetic. `SubtractTimes` returns a `DayTimeIntervalType` which needs its own support. #### 3b. to_time with format patterns **Reference**: `/Users/parth/work/claude/comet/to_time_format_patterns.md` Currently falls back to Spark when a format pattern is provided. **Implementation steps**: 1. **Rust** (`native/spark-expr/src/datetime_funcs/to_time.rs`): Implement a format-pattern-based time parser supporting Spark's [Datetime Patterns](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html) for time fields: `HH`, `hh`, `kk`, `KK`, `mm`, `ss`, `S` (fractional), `a` (AM/PM), literal characters, and quoted strings. This is a significant implementation effort. Spark delegates to `java.time.format.DateTimeFormatter` via `TimeFormatter.parse()`. The Rust implementation must produce identical results for all supported patterns. Key challenges: - Pattern parsing (interpreting the format string itself) - Lenient vs strict parsing modes - Locale-dependent AM/PM tokens - Edge cases with padding and optional fields 2. **Scala shim** (`CometExprShim.scala` spark-4.1, spark-4.2): Remove the `parser.fmt.isEmpty` guard. Pass the format string as an additional argument to the `to_time` scalar function. 3. **Protobuf/serde**: Pass the format string as a string literal argument appended to the ScalarFunc args list. 4. **Tests**: Add SQL test cases with explicit format patterns. **Complexity**: High. Consider whether the limited set of time-only patterns (no date fields) makes this tractable compared to the full datetime format pattern support. --- ### 4: Cast paths #### 4a. Cast String -> TimeType **File**: `spark/.../CometCast.scala` Spark's `Cast` expression can cast strings to TimeType using `DateTimeUtils.stringToTime()`. This is distinct from `to_time()` which goes through `Invoke(ToTimeParser, "parse", ...)`. **Implementation**: Reuse the Rust `to_time` parser. Add a `String -> Time64` path in the Rust cast implementation and register it in `CometCast.isSupported`. #### 4b. Cast TimeType -> String Display formatting. The Rust implementation needs to format nanoseconds as `HH:mm:ss[.SSSSSS]` (dropping trailing fractional zeros). #### 4c. Cast TimeType -> TimeType (precision change) Truncation via `DateTimeUtils.truncateTimeToPrecision()`. Simple modular arithmetic on the nanosecond value. #### 4d. Cast TimeType -> Decimal Convert nanoseconds to a decimal value. #### 4e. Cast TimeType -> Byte/Short Spark supports this but it's unlikely to be commonly used. --- ### 5: File format support (future) | Format | Spark mapping | Effort | |---|---|---| | Parquet | INT64 with time logical annotation (MICROS) | Medium - needs Parquet reader changes | | Arrow IPC | Time64(Nanosecond) | Should work if Arrow handles it | --- ## Priority Summary | Phase | Scope | Effort | Impact | |---|---|---|---| | **0** | Current PR fixes (literal serde, hash exclusion) | Small | Correctness | | **1** | Shuffle + Sort + Min/Max | Small-Medium | Unblocks real queries | | **2** | Extraction + conversion functions (10 functions) | Medium | Feature completeness | | **3a** | Time arithmetic (4 functions) | Medium | Feature completeness | | **3b** | `to_time` format patterns | High | Removes last fallback for `to_time` | | **4** | Cast paths | Medium | Feature completeness | | **5** | Parquet I/O | Large | End-to-end file support | ## Expression Inventory Complete list of Spark TimeType expressions and their Comet status: | SQL function | Spark class | Resolves to | Comet status | |---|---|---|---| | `to_time(str)` | `ToTime` | `Invoke(ToTimeParser, "parse")` | **Done** (default format) | | `try_to_time(str)` | via `TryToTimeExpressionBuilder` | `TryEval(Invoke(...))` | **Done** (default format) | | `to_time(str, fmt)` | `ToTime` | `Invoke(ToTimeParser, "parse")` | Falls back (Phase 3b) | | `try_to_time(str, fmt)` | via builder | `TryEval(Invoke(...))` | Falls back (Phase 3b) | | `make_time(h, m, s)` | `MakeTime` | `StaticInvoke(makeTime)` | **Done** | | `current_time()` | `CurrentTime` | Folds to literal | Needs literal serde (Phase 0a) | | `extract(HOUR FROM t)` | `HoursOfTime` | `StaticInvoke(getHoursOfTime)` | Phase 2a | | `extract(MINUTE FROM t)` | `MinutesOfTime` | `StaticInvoke(getMinutesOfTime)` | Phase 2a | | `extract(SECOND FROM t)` | `SecondsOfTime` | `StaticInvoke(getSecondsOfTime)` | Phase 2a | | `time_from_seconds(n)` | `TimeFromSeconds` | `StaticInvoke(timeFromSeconds)` | Phase 2b | | `time_from_millis(n)` | `TimeFromMillis` | `StaticInvoke(timeFromMillis)` | Phase 2b | | `time_from_micros(n)` | `TimeFromMicros` | `StaticInvoke(timeFromMicros)` | Phase 2b | | `time_to_seconds(t)` | `TimeToSeconds` | `StaticInvoke(timeToSeconds)` | Phase 2c | | `time_to_millis(t)` | `TimeToMillis` | `StaticInvoke(timeToMillis)` | Phase 2c | | `time_to_micros(t)` | `TimeToMicros` | `StaticInvoke(timeToMicros)` | Phase 2c | | `t + interval` | `TimeAddInterval` | `StaticInvoke(...)` | Phase 3a | | `t1 - t2` | `SubtractTimes` | `StaticInvoke(...)` | Phase 3a | | `time_trunc(unit, t)` | `TimeTrunc` | RuntimeReplaceable | Phase 3a | | `time_diff(unit, t1, t2)` | `TimeDiff` | RuntimeReplaceable | Phase 3a | | `CAST(str AS TIME)` | `Cast` | Direct | Phase 4a | | `CAST(t AS STRING)` | `Cast` | Direct | Phase 4b | | `CAST(t AS TIME(p))` | `Cast` | Direct | Phase 4c | | `CAST(t AS DECIMAL)` | `Cast` | Direct | Phase 4d | ## Infrastructure Gaps (non-expression) | Component | File | Line(s) | Status | Phase | |---|---|---|---|---| | Literal serialization | `literals.scala` | 76-104 | **Gap** | 0a | | Shuffle (native) | `CometShuffleExchangeExec.scala` | 367-380 | **Gap** | 1a | | Shuffle (JVM) | `CometShuffleExchangeExec.scala` | 490-506 | **Gap** | 1b | | Shuffle row conversion | `native/shuffle/.../row.rs` | - | **Gap** | 1a | | Sort element type | `QueryPlanSerde.scala` | 833-843 | **Gap** | 1c | | Min/Max aggregate | `aggregates.scala` | 742-751 | **Gap** | 1d | | Hash functions | `hash.scala` | 129-131 | Correctly excluded | 1e (optional) | | Cast support matrix | `CometCast.scala` | 35-49 | **Gap** | 4 | | `supportedDataType` | `QueryPlanSerde.scala` | 363 | **Done** | - | | `serializeDataType` | `QueryPlanSerde.scala` | 399 | **Done** | - | | Protobuf type | `types.proto` | 62 | **Done** | - | | Rust deserialization | `serde.rs` | 99 | **Done** | - | | Columnar-to-row | `columnar_to_row.rs` | multiple | **Done** | - | | Planner literal | `planner.rs` | 347 | **Done** | - | ### Describe the potential solution _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
