This is an automated email from the ASF dual-hosted git repository.
anishshri-db pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new bb4e75db39ce [SPARK-56719][SS] Add DataStreamWriter.name() API for
sink evolution
bb4e75db39ce is described below
commit bb4e75db39ce7e606bd0abf777126427b8073657
Author: ericm-db <[email protected]>
AuthorDate: Fri May 22 17:18:40 2026 -0700
[SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution
### What changes were proposed in this pull request?
This PR adds the ability to name streaming sinks via the `name()` method on
`DataStreamWriter`, laying the groundwork for sink evolution capability. This
is analogous to the existing source evolution support
(`DataStreamReader.name()`).
**Changes:**
- Add `name(sinkName)` method to `DataStreamWriter` (API abstract method,
classic implementation, Connect stub)
- Add `sinkName: Option[String]` field to `WriteToStream` and
`userSpecifiedSinkName: Option[String]` to `WriteToStreamStatement` plan nodes
- Add `spark.sql.streaming.queryEvolution.enableSinkEvolution` internal
config to `SQLConf`
- Add sink name validation — names must be alphanumeric + underscore only
- Add enforcement in `MicroBatchExecution` — when sink evolution is
enabled, sinks must be explicitly named
- Add `MicroBatchExecution.DEFAULT_SINK_NAME` (`"sink-0"`) for backward
compatibility
- Thread `sinkName` through `StreamingQueryManager` and
`ResolveWriteToStream`
- Add error conditions: `INVALID_SINK_NAME`,
`UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
- Add `QueryCompilationErrors.invalidStreamingSinkNameError`
- Add `StreamingSinkEvolutionSuite` with tests for validation and
enforcement
All new APIs are `private[sql]` or `internal()` — the `name()` method is
not yet publicly callable. It will be opened up once commit log support for
persisting sink metadata is added in a follow-up PR.
### Why are the changes needed?
Currently, streaming queries have no mechanism for sink evolution. If a
user wants to change the sink of a streaming query while preserving the
checkpoint, there is no way to track which sink was used historically. This PR
introduces the naming API as the first step toward full sink evolution support,
where sinks can be added, removed, or replaced while maintaining checkpoint
integrity.
This mirrors the existing source evolution support added via
`DataStreamReader.name()` and
`spark.sql.streaming.queryEvolution.enableSourceEvolution`.
### Does this PR introduce _any_ user-facing change?
No. All new APIs are `private[sql]` and the config is `internal()`. No
user-facing changes until the feature is fully implemented with commit log
support in a follow-up PR.
### How was this patch tested?
Added `StreamingSinkEvolutionSuite` with 7 test cases covering:
- Invalid sink name validation (hyphen, space, special characters)
- Valid sink name patterns (alphanumeric, underscore, digits)
- Enforcement: unnamed sink with evolution enabled throws
`UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
- Enforcement: unnamed sink without evolution enabled succeeds (backward
compatibility)
- Named sink with evolution enabled succeeds
- Continuing with the same sink name across restarts works
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-6)
Closes #55672 from ericm-db/sink-evolution-api.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
(cherry picked from commit 2039927a4df6ef61bce6e66d98e1a19cba52548f)
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 10 +
project/MimaExcludes.scala | 5 +-
.../spark/sql/streaming/DataStreamReader.scala | 15 +-
.../spark/sql/streaming/DataStreamWriter.scala | 33 +++-
.../sql/streaming/StreamingNameValidator.scala | 50 +++++
.../sql/catalyst/streaming/WriteToStream.scala | 1 +
.../streaming/WriteToStreamStatement.scala | 2 +
.../spark/sql/errors/QueryCompilationErrors.scala | 6 +
.../org/apache/spark/sql/internal/SQLConf.scala | 12 ++
.../spark/sql/connect/DataStreamWriter.scala | 5 +
.../spark/sql/classic/DataStreamWriter.scala | 10 +
.../spark/sql/classic/StreamingQueryManager.scala | 4 +
.../streaming/runtime/MicroBatchExecution.scala | 24 ++-
.../streaming/runtime/ResolveWriteToStream.scala | 1 +
.../apache/spark/sql/streaming/StreamTest.scala | 1 +
.../test/StreamingSinkEvolutionSuite.scala | 201 +++++++++++++++++++++
.../configs-without-binding-policy-exceptions | 1 +
17 files changed, 369 insertions(+), 12 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 3dcab86b04a5..f18d3d275282 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -6987,6 +6987,11 @@
"Duplicate streaming source names detected: <names>. Each streaming
source must have a unique name."
]
},
+ "INVALID_SINK_NAME" : {
+ "message" : [
+ "Invalid streaming sink name: '<sinkName>'. Sink names must only
contain ASCII letters ('a'-'z', 'A'-'Z'), digits ('0'-'9'), and underscores
('_')."
+ ]
+ },
"INVALID_SOURCE_NAME" : {
"message" : [
"Invalid streaming source name '<sourceName>'. Source names must
only contain ASCII letters (a-z, A-Z), digits (0-9), and underscores (_)."
@@ -6997,6 +7002,11 @@
"Streaming source naming is not supported. Source name '<name>' was
provided but the feature is disabled. Please enable the feature by setting
spark.sql.streaming.queryEvolution.enableSourceEvolution to true."
]
},
+ "UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT" : {
+ "message" : [
+ "Streaming sink must be named when
spark.sql.streaming.queryEvolution.enableSinkEvolution is enabled. Use the
name() method on DataStreamWriter to assign a name to the streaming sink."
+ ]
+ },
"UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT" : {
"message" : [
"All streaming sources must be named when
spark.sql.streaming.queryEvolution.enableSourceEvolution is enabled. Unnamed
sources found: <sourceInfo>. Use the name() method to assign names to all
streaming sources."
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 25e60a4fecd7..e92ef67080fb 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -92,7 +92,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.artifact.ArtifactManager.cachedBlockIdList"),
// [SPARK-54323][PYTHON] Change the way to access logs to TVF instead of
system view
-
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs")
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs"),
+
+ // [SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.DataStreamWriter.name")
)
// Default exclude rules
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index b90a7c910f9c..726b10d6416a 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.streaming
import scala.jdk.CollectionConverters._
-import scala.util.matching.Regex
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Encoders}
@@ -356,18 +355,16 @@ abstract class DataStreamReader {
*
* @param sourceName
* the source name to validate
+ * @throws AnalysisException
+ * if the source name contains invalid characters
* @throws IllegalArgumentException
- * if the source name is null, empty, or contains invalid characters
+ * if the source name is null or empty
*/
private[sql] def validateSourceName(sourceName: String): Unit = {
- require(sourceName != null, "Source name cannot be null")
- require(sourceName.nonEmpty, "Source name cannot be empty")
-
- val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r
- if (!validNamePattern.pattern.matcher(sourceName).matches()) {
- throw new AnalysisException(
+ StreamingNameValidator.validate(sourceName, "Source") { invalid =>
+ new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
- messageParameters = Map("sourceName" -> sourceName))
+ messageParameters = Map("sourceName" -> invalid))
}
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index cb5ecc728c44..8f98466d1f17 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.TimeoutException
import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.function.VoidFunction2
-import org.apache.spark.sql.{Dataset, ForeachWriter, WriteConfigMethods}
+import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter,
WriteConfigMethods}
/**
* Interface used to write a streaming `Dataset` to external storage systems
(e.g. file systems,
@@ -90,6 +90,19 @@ abstract class DataStreamWriter[T] extends
WriteConfigMethods[DataStreamWriter[T
*/
def queryName(queryName: String): this.type
+ /**
+ * Assigns a name to this streaming sink for sink evolution capability. When
sinks are named,
+ * they can be tracked in checkpoint metadata, enabling query evolution.
+ *
+ * If not specified, sinks are automatically assigned a default name based
on their position in
+ * the query, which maintains backward compatibility.
+ *
+ * @param sinkName
+ * the unique name for this sink (alphanumeric and underscore only)
+ * @since 4.1.0
+ */
+ private[sql] def name(sinkName: String): this.type
+
/**
* Specifies the underlying output data source.
*
@@ -217,6 +230,24 @@ abstract class DataStreamWriter[T] extends
WriteConfigMethods[DataStreamWriter[T
@throws[TimeoutException]
def toTable(tableName: String): StreamingQuery
+ /**
+ * Validates that a streaming sink name only contains alphanumeric
characters and underscores.
+ *
+ * @param sinkName
+ * the sink name to validate
+ * @throws AnalysisException
+ * if the sink name contains invalid characters
+ * @throws IllegalArgumentException
+ * if the sink name is null or empty
+ */
+ private[sql] def validateSinkName(sinkName: String): Unit = {
+ StreamingNameValidator.validate(sinkName, "Sink") { invalid =>
+ new AnalysisException(
+ errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+ messageParameters = Map("sinkName" -> invalid))
+ }
+ }
+
///////////////////////////////////////////////////////////////////////////////////////
// Covariant Overrides
///////////////////////////////////////////////////////////////////////////////////////
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingNameValidator.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingNameValidator.scala
new file mode 100644
index 000000000000..9b8700844310
--- /dev/null
+++
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingNameValidator.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.util.matching.Regex
+
+import org.apache.spark.sql.AnalysisException
+
+/**
+ * Shared validation for user-assigned streaming source and sink names. Names
must be non-null,
+ * non-empty, and contain only alphanumeric characters and underscores.
+ */
+private[sql] object StreamingNameValidator {
+ private val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r
+
+ /**
+ * Validates the given streaming entity name. Throws an
`IllegalArgumentException` if the name
+ * is null or empty, and invokes `onInvalid` to build the
`AnalysisException` to throw if the
+ * name does not match the allowed character set.
+ *
+ * @param name
+ * the source/sink name to validate
+ * @param entityKind
+ * a human-readable label (e.g. "Source", "Sink") used in null/empty
messages
+ * @param onInvalid
+ * builds the AnalysisException to throw when `name` has invalid characters
+ */
+ def validate(name: String, entityKind: String)(onInvalid: String =>
AnalysisException): Unit = {
+ require(name != null, s"$entityKind name cannot be null")
+ require(name.nonEmpty, s"$entityKind name cannot be empty")
+ if (!validNamePattern.pattern.matcher(name).matches()) {
+ throw onInvalid(name)
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
index 884a4165d077..6e0583f77835 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.streaming.OutputMode
*/
case class WriteToStream(
name: String,
+ sinkName: Option[String],
resolvedCheckpointLocation: String,
sink: Table,
outputMode: OutputMode,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
index 7015d0dd3b2c..61e64a526aed 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
* rule [[ResolveStreamWrite]].
*
* @param userSpecifiedName Query name optionally specified by the user.
+ * @param userSpecifiedSinkName Sink name optionally specified by the user
for sink evolution.
* @param userSpecifiedCheckpointLocation Checkpoint location optionally
specified by the user.
* @param useTempCheckpointLocation Whether to use a temporary checkpoint
location when the user
* has not specified one. If false, then
error will be thrown.
@@ -47,6 +48,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
*/
case class WriteToStreamStatement(
userSpecifiedName: Option[String],
+ userSpecifiedSinkName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9b899867a9e3..d5a9cc723bc3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2530,6 +2530,12 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
messageParameters = Map("sourceName" -> sourceName))
}
+ def invalidStreamingSinkNameError(sinkName: String): Throwable = {
+ new AnalysisException(
+ errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+ messageParameters = Map("sinkName" -> sinkName))
+ }
+
def duplicateStreamingSourceNamesError(duplicateNames: Seq[String]):
Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES",
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8ab725350448..6a2f9ad17b8b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3200,6 +3200,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val ENABLE_STREAMING_SINK_EVOLUTION =
+ buildConf("spark.sql.streaming.queryEvolution.enableSinkEvolution")
+ .internal()
+ .doc("When true, streaming sinks can be named using the name() API on
DataStreamWriter. " +
+ "This enables sink evolution capability where sinks can be changed
while maintaining " +
+ "a historical record of all sinks used in the checkpoint.")
+ .version("4.1.0")
+ .booleanConf
+ .createWithDefault(false)
+
val STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART =
buildConf("spark.sql.streaming.checkUnfinishedRepartitionOnRestart")
.internal()
@@ -7800,6 +7810,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def enableStreamingSourceEvolution: Boolean =
getConf(ENABLE_STREAMING_SOURCE_EVOLUTION)
+ def enableStreamingSinkEvolution: Boolean =
getConf(ENABLE_STREAMING_SINK_EVOLUTION)
+
def streamingCheckUnfinishedRepartitionOnRestart: Boolean =
getConf(STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART)
diff --git
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
index ffa11b5d7ab0..bac41acc83f0 100644
---
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
+++
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
@@ -82,6 +82,11 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T])
this
}
+ /** @inheritdoc */
+ private[sql] def name(sinkName: String): this.type = {
+ throw new UnsupportedOperationException("Sink naming is not supported in
Spark Connect")
+ }
+
/** @inheritdoc */
def format(source: String): this.type = {
sinkBuilder.setFormat(source)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala
index 3e2760594038..75c3fc3e356e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala
@@ -83,6 +83,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T])
extends streaming.D
this
}
+ /** @inheritdoc */
+ private[sql] def name(sinkName: String): this.type = {
+ validateSinkName(sinkName)
+ this.sinkName = Some(sinkName)
+ this
+ }
+
/** @inheritdoc */
def format(source: String): this.type = {
this.source = source
@@ -312,6 +319,7 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) extends streaming.D
ds.sparkSession.sessionState.streamingQueryManager.startQuery(
newOptions.get("queryName"),
+ sinkName,
newOptions.get("checkpointLocation"),
ds,
newOptions.originalMap,
@@ -444,6 +452,8 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) extends streaming.D
private var partitioningColumns: Option[Seq[String]] = None
private var clusteringColumns: Option[Seq[String]] = None
+
+ private var sinkName: Option[String] = None
}
object DataStreamWriter {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
index 72ae3b21d662..fff8d32a0709 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
@@ -176,6 +176,7 @@ class StreamingQueryManager private[sql] (
// scalastyle:off argcount
private def createQuery(
userSpecifiedName: Option[String],
+ userSpecifiedSinkName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: Dataset[_],
extraOptions: Map[String, String],
@@ -207,6 +208,7 @@ class StreamingQueryManager private[sql] (
val dataStreamWritePlan = WriteToStreamStatement(
userSpecifiedName,
+ userSpecifiedSinkName,
userSpecifiedCheckpointLocation,
useTempCheckpointLocation,
recoverFromCheckpointLocation,
@@ -277,6 +279,7 @@ class StreamingQueryManager private[sql] (
@throws[TimeoutException]
private[sql] def startQuery(
userSpecifiedName: Option[String],
+ userSpecifiedSinkName: Option[String] = None,
userSpecifiedCheckpointLocation: Option[String],
df: Dataset[_],
extraOptions: Map[String, String],
@@ -290,6 +293,7 @@ class StreamingQueryManager private[sql] (
catalogTable: Option[CatalogTable] = None): StreamingQuery = {
val query = createQuery(
userSpecifiedName,
+ userSpecifiedSinkName,
userSpecifiedCheckpointLocation,
df,
extraOptions,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index e6d0666aca25..d4fae034e797 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
-import org.apache.spark.{SparkIllegalArgumentException,
SparkIllegalStateException}
+import org.apache.spark.{SparkException, SparkIllegalArgumentException,
SparkIllegalStateException}
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -112,6 +112,22 @@ class MicroBatchExecution(
override protected def sourceToIdMap: Map[SparkDataStream, String] =
sourceIdMap.map(_.swap)
+ // Sink name for commit log support
+ // If sink evolution is enabled, use user-provided sinkName (or error if not
provided)
+ // Otherwise, always use DEFAULT_SINK_NAME for backward compatibility
+ private val sinkName: String = {
+ if (sparkSession.sessionState.conf.enableStreamingSinkEvolution) {
+ plan.sinkName.getOrElse {
+ throw new SparkException(
+ errorClass =
"STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT",
+ messageParameters = Map.empty,
+ cause = null)
+ }
+ } else {
+ MicroBatchExecution.DEFAULT_SINK_NAME
+ }
+ }
+
@volatile protected[sql] var triggerExecutor: TriggerExecutor = _
protected def getTrigger(): TriggerExecutor = {
@@ -1466,6 +1482,12 @@ class MicroBatchExecution(
object MicroBatchExecution {
val BATCH_ID_KEY = "streaming.sql.batchId"
+
+ /**
+ * Default sink name used when sink evolution is disabled or no explicit
name is provided.
+ * This maintains backward compatibility with existing streaming queries.
+ */
+ private[sql] val DEFAULT_SINK_NAME = "sink-0"
}
case class OffsetHolder(start: OffsetV2, end: Option[OffsetV2]) extends
LeafNode {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala
index ff0d71d0f075..0be430591dbd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala
@@ -66,6 +66,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] {
WriteToStream(
s.userSpecifiedName.orNull,
+ s.userSpecifiedSinkName,
resolvedCheckpointLocation,
s.sink,
s.outputMode,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 0e33b271522d..a6067aaf189e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -598,6 +598,7 @@ trait StreamTest extends SharedSparkSession with TimeLimits
{
sparkSession
.streams
.startQuery(
+ None,
None,
Some(metadataRoot),
stream,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala
new file mode 100644
index 000000000000..a242faabaf92
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.test
+
+import org.scalatest.{BeforeAndAfterEach, Tag}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.util.Utils
+
+/**
+ * Test suite for streaming sink evolution features including:
+ * - Sink naming via DataStreamWriter.name()
+ * - Sink name validation
+ * - Sink evolution enforcement
+ */
+class StreamingSinkEvolutionSuite extends StreamTest with BeforeAndAfterEach {
+ import testImplicits._
+
+ private def newMetadataDir =
+ Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+ override def afterEach(): Unit = {
+ spark.streams.active.foreach(_.stop())
+ super.afterEach()
+ }
+
+ // =========================
+ // Sink Name Validation Tests
+ // =========================
+
+ testWithSinkEvolution("invalid sink name - contains hyphen") {
+ val input = MemoryStream[Int]
+ input.addData(1, 2, 3)
+ checkError(
+ exception = intercept[AnalysisException] {
+ input.toDF().writeStream
+ .format("noop")
+ .name("my-sink")
+ .option("checkpointLocation", newMetadataDir)
+ .start()
+ },
+ condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+ parameters = Map("sinkName" -> "my-sink"))
+ }
+
+ testWithSinkEvolution("invalid sink name - contains space") {
+ val input = MemoryStream[Int]
+ input.addData(1, 2, 3)
+ checkError(
+ exception = intercept[AnalysisException] {
+ input.toDF().writeStream
+ .format("noop")
+ .name("my sink")
+ .option("checkpointLocation", newMetadataDir)
+ .start()
+ },
+ condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+ parameters = Map("sinkName" -> "my sink"))
+ }
+
+ testWithSinkEvolution("invalid sink name - contains special characters") {
+ val input = MemoryStream[Int]
+ input.addData(1, 2, 3)
+ checkError(
+ exception = intercept[AnalysisException] {
+ input.toDF().writeStream
+ .format("noop")
+ .name("my.sink@123!")
+ .option("checkpointLocation", newMetadataDir)
+ .start()
+ },
+ condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+ parameters = Map("sinkName" -> "my.sink@123!"))
+ }
+
+ testWithSinkEvolution("valid sink names - various patterns") {
+ Seq("mySink", "my_sink", "MySink123", "_private", "sink_123_test",
"123sink")
+ .foreach { sinkName =>
+ val checkpointDir = newMetadataDir
+ val input = MemoryStream[Int]
+ input.addData(1, 2, 3)
+ val q = input.toDF().writeStream
+ .format("noop")
+ .name(sinkName)
+ .option("checkpointLocation", checkpointDir)
+ .start()
+ q.processAllAvailable()
+ q.stop()
+ }
+ }
+
+ // ===========================
+ // Sink Evolution Enforcement
+ // ===========================
+
+ testWithSinkEvolution("unnamed sink with sink evolution enabled throws
error") {
+ val input = MemoryStream[Int]
+ input.addData(1, 2, 3)
+ val exception = intercept[SparkException] {
+ val q = input.toDF().writeStream
+ .format("noop")
+ // No .name() call - sink is unnamed
+ .option("checkpointLocation", newMetadataDir)
+ .start()
+ q.processAllAvailable()
+ q.stop()
+ }
+
+ checkError(
+ exception = exception,
+ condition =
"STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT",
+ parameters = Map.empty)
+ }
+
+ test("unnamed sink without sink evolution enabled uses default name") {
+ withSQLConf(
+ SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") {
+ val input = MemoryStream[Int]
+ input.addData(1, 2, 3)
+ // Should succeed - no name required when sink evolution is disabled
+ val q = input.toDF().writeStream
+ .format("noop")
+ .option("checkpointLocation", newMetadataDir)
+ .start()
+ q.processAllAvailable()
+ q.stop()
+ }
+ }
+
+ testWithSinkEvolution("named sink succeeds with sink evolution enabled") {
+ val input = MemoryStream[Int]
+ input.addData(1, 2, 3)
+ val q = input.toDF().writeStream
+ .format("noop")
+ .name("my_sink")
+ .option("checkpointLocation", newMetadataDir)
+ .start()
+ q.processAllAvailable()
+ q.stop()
+ }
+
+ testWithSinkEvolution("continuing with same sink name works") {
+ val checkpointDir = newMetadataDir
+ val input = MemoryStream[Int]
+
+ // Start with my_sink
+ input.addData(1, 2, 3)
+ val q1 = input.toDF().writeStream
+ .format("noop")
+ .name("my_sink")
+ .option("checkpointLocation", checkpointDir)
+ .start()
+ q1.processAllAvailable()
+ q1.stop()
+
+ // Restart with same sink name - should work
+ input.addData(4, 5, 6)
+ val q2 = input.toDF().writeStream
+ .format("noop")
+ .name("my_sink")
+ .option("checkpointLocation", checkpointDir)
+ .start()
+ q2.processAllAvailable()
+ q2.stop()
+ }
+
+ // ==============
+ // Helper Methods
+ // ==============
+
+ /**
+ * Helper method to run tests with sink evolution enabled.
+ */
+ def testWithSinkEvolution(testName: String, testTags: Tag*)(testBody: =>
Any): Unit = {
+ test(testName, testTags: _*) {
+ withSQLConf(
+ SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "true") {
+ testBody
+ }
+ }
+ }
+}
diff --git
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
index c8dc75ae9182..de94d0127931 100644
---
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
+++
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
@@ -1037,6 +1037,7 @@ spark.sql.streaming.numRecentProgressUpdates
spark.sql.streaming.offsetLog.formatVersion
spark.sql.streaming.optimizeOneRowPlan.enabled
spark.sql.streaming.pollingDelay
+spark.sql.streaming.queryEvolution.enableSinkEvolution
spark.sql.streaming.queryEvolution.enableSourceEvolution
spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint
spark.sql.streaming.realTimeMode.allowlistCheck
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]