ericm-db opened a new pull request, #55672:
URL: https://github.com/apache/spark/pull/55672

   ### What changes were proposed in this pull request?
   
   This PR adds the ability to name streaming sinks via the `name()` method on 
`DataStreamWriter`, laying the groundwork for sink evolution capability. This 
is analogous to the existing source evolution support 
(`DataStreamReader.name()`).
   
   **Changes:**
   - Add `name(sinkName)` method to `DataStreamWriter` (API abstract method, 
classic implementation, Connect stub)
   - Add `sinkName: Option[String]` field to `WriteToStream` and 
`userSpecifiedSinkName: Option[String]` to `WriteToStreamStatement` plan nodes
   - Add `spark.sql.streaming.queryEvolution.enableSinkEvolution` internal 
config to `SQLConf`
   - Add sink name validation — names must be alphanumeric + underscore only
   - Add enforcement in `MicroBatchExecution` — when sink evolution is enabled, 
sinks must be explicitly named
   - Add `MicroBatchExecution.DEFAULT_SINK_NAME` (`"sink-0"`) for backward 
compatibility
   - Thread `sinkName` through `StreamingQueryManager` and 
`ResolveWriteToStream`
   - Add error conditions: `INVALID_SINK_NAME`, 
`UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
   - Add `QueryCompilationErrors.invalidStreamingSinkNameError`
   - Add `StreamingSinkEvolutionSuite` with tests for validation and enforcement
   
   All new APIs are `private[sql]` or `internal()` — the `name()` method is not 
yet publicly callable. It will be opened up once commit log support for 
persisting sink metadata is added in a follow-up PR.
   
   ### Why are the changes needed?
   
   Currently, streaming queries have no mechanism for sink evolution. If a user 
wants to change the sink of a streaming query while preserving the checkpoint, 
there is no way to track which sink was used historically. This PR introduces 
the naming API as the first step toward full sink evolution support, where 
sinks can be added, removed, or replaced while maintaining checkpoint integrity.
   
   This mirrors the existing source evolution support added via 
`DataStreamReader.name()` and 
`spark.sql.streaming.queryEvolution.enableSourceEvolution`.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. All new APIs are `private[sql]` and the config is `internal()`. No 
user-facing changes until the feature is fully implemented with commit log 
support in a follow-up PR.
   
   ### How was this patch tested?
   
   Added `StreamingSinkEvolutionSuite` with 7 test cases covering:
   - Invalid sink name validation (hyphen, space, special characters)
   - Valid sink name patterns (alphanumeric, underscore, digits)
   - Enforcement: unnamed sink with evolution enabled throws 
`UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
   - Enforcement: unnamed sink without evolution enabled succeeds (backward 
compatibility)
   - Named sink with evolution enabled succeeds
   - Continuing with the same sink name across restarts works
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (claude-opus-4-6)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to