This is an automated email from the ASF dual-hosted git repository.

anishshri-db pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new bb4e75db39ce [SPARK-56719][SS] Add DataStreamWriter.name() API for 
sink evolution
bb4e75db39ce is described below

commit bb4e75db39ce7e606bd0abf777126427b8073657
Author: ericm-db <[email protected]>
AuthorDate: Fri May 22 17:18:40 2026 -0700

    [SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution
    
    ### What changes were proposed in this pull request?
    
    This PR adds the ability to name streaming sinks via the `name()` method on 
`DataStreamWriter`, laying the groundwork for sink evolution capability. This 
is analogous to the existing source evolution support 
(`DataStreamReader.name()`).
    
    **Changes:**
    - Add `name(sinkName)` method to `DataStreamWriter` (API abstract method, 
classic implementation, Connect stub)
    - Add `sinkName: Option[String]` field to `WriteToStream` and 
`userSpecifiedSinkName: Option[String]` to `WriteToStreamStatement` plan nodes
    - Add `spark.sql.streaming.queryEvolution.enableSinkEvolution` internal 
config to `SQLConf`
    - Add sink name validation — names must be alphanumeric + underscore only
    - Add enforcement in `MicroBatchExecution` — when sink evolution is 
enabled, sinks must be explicitly named
    - Add `MicroBatchExecution.DEFAULT_SINK_NAME` (`"sink-0"`) for backward 
compatibility
    - Thread `sinkName` through `StreamingQueryManager` and 
`ResolveWriteToStream`
    - Add error conditions: `INVALID_SINK_NAME`, 
`UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
    - Add `QueryCompilationErrors.invalidStreamingSinkNameError`
    - Add `StreamingSinkEvolutionSuite` with tests for validation and 
enforcement
    
    All new APIs are `private[sql]` or `internal()` — the `name()` method is 
not yet publicly callable. It will be opened up once commit log support for 
persisting sink metadata is added in a follow-up PR.
    
    ### Why are the changes needed?
    
    Currently, streaming queries have no mechanism for sink evolution. If a 
user wants to change the sink of a streaming query while preserving the 
checkpoint, there is no way to track which sink was used historically. This PR 
introduces the naming API as the first step toward full sink evolution support, 
where sinks can be added, removed, or replaced while maintaining checkpoint 
integrity.
    
    This mirrors the existing source evolution support added via 
`DataStreamReader.name()` and 
`spark.sql.streaming.queryEvolution.enableSourceEvolution`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. All new APIs are `private[sql]` and the config is `internal()`. No 
user-facing changes until the feature is fully implemented with commit log 
support in a follow-up PR.
    
    ### How was this patch tested?
    
    Added `StreamingSinkEvolutionSuite` with 7 test cases covering:
    - Invalid sink name validation (hyphen, space, special characters)
    - Valid sink name patterns (alphanumeric, underscore, digits)
    - Enforcement: unnamed sink with evolution enabled throws 
`UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
    - Enforcement: unnamed sink without evolution enabled succeeds (backward 
compatibility)
    - Named sink with evolution enabled succeeds
    - Continuing with the same sink name across restarts works
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-6)
    
    Closes #55672 from ericm-db/sink-evolution-api.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
    (cherry picked from commit 2039927a4df6ef61bce6e66d98e1a19cba52548f)
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  10 +
 project/MimaExcludes.scala                         |   5 +-
 .../spark/sql/streaming/DataStreamReader.scala     |  15 +-
 .../spark/sql/streaming/DataStreamWriter.scala     |  33 +++-
 .../sql/streaming/StreamingNameValidator.scala     |  50 +++++
 .../sql/catalyst/streaming/WriteToStream.scala     |   1 +
 .../streaming/WriteToStreamStatement.scala         |   2 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |   6 +
 .../org/apache/spark/sql/internal/SQLConf.scala    |  12 ++
 .../spark/sql/connect/DataStreamWriter.scala       |   5 +
 .../spark/sql/classic/DataStreamWriter.scala       |  10 +
 .../spark/sql/classic/StreamingQueryManager.scala  |   4 +
 .../streaming/runtime/MicroBatchExecution.scala    |  24 ++-
 .../streaming/runtime/ResolveWriteToStream.scala   |   1 +
 .../apache/spark/sql/streaming/StreamTest.scala    |   1 +
 .../test/StreamingSinkEvolutionSuite.scala         | 201 +++++++++++++++++++++
 .../configs-without-binding-policy-exceptions      |   1 +
 17 files changed, 369 insertions(+), 12 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 3dcab86b04a5..f18d3d275282 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -6987,6 +6987,11 @@
           "Duplicate streaming source names detected: <names>. Each streaming 
source must have a unique name."
         ]
       },
+      "INVALID_SINK_NAME" : {
+        "message" : [
+          "Invalid streaming sink name: '<sinkName>'. Sink names must only 
contain ASCII letters ('a'-'z', 'A'-'Z'), digits ('0'-'9'), and underscores 
('_')."
+        ]
+      },
       "INVALID_SOURCE_NAME" : {
         "message" : [
           "Invalid streaming source name '<sourceName>'. Source names must 
only contain ASCII letters (a-z, A-Z), digits (0-9), and underscores (_)."
@@ -6997,6 +7002,11 @@
           "Streaming source naming is not supported. Source name '<name>' was 
provided but the feature is disabled. Please enable the feature by setting 
spark.sql.streaming.queryEvolution.enableSourceEvolution to true."
         ]
       },
+      "UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT" : {
+        "message" : [
+          "Streaming sink must be named when 
spark.sql.streaming.queryEvolution.enableSinkEvolution is enabled. Use the 
name() method on DataStreamWriter to assign a name to the streaming sink."
+        ]
+      },
       "UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT" : {
         "message" : [
           "All streaming sources must be named when 
spark.sql.streaming.queryEvolution.enableSourceEvolution is enabled. Unnamed 
sources found: <sourceInfo>. Use the name() method to assign names to all 
streaming sources."
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 25e60a4fecd7..e92ef67080fb 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -92,7 +92,10 @@ object MimaExcludes {
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.artifact.ArtifactManager.cachedBlockIdList"),
 
     // [SPARK-54323][PYTHON] Change the way to access logs to TVF instead of 
system view
-    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs")
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs"),
+
+    // [SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.DataStreamWriter.name")
   )
 
   // Default exclude rules
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index b90a7c910f9c..726b10d6416a 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.streaming
 
 import scala.jdk.CollectionConverters._
-import scala.util.matching.Regex
 
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Encoders}
@@ -356,18 +355,16 @@ abstract class DataStreamReader {
    *
    * @param sourceName
    *   the source name to validate
+   * @throws AnalysisException
+   *   if the source name contains invalid characters
    * @throws IllegalArgumentException
-   *   if the source name is null, empty, or contains invalid characters
+   *   if the source name is null or empty
    */
   private[sql] def validateSourceName(sourceName: String): Unit = {
-    require(sourceName != null, "Source name cannot be null")
-    require(sourceName.nonEmpty, "Source name cannot be empty")
-
-    val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r
-    if (!validNamePattern.pattern.matcher(sourceName).matches()) {
-      throw new AnalysisException(
+    StreamingNameValidator.validate(sourceName, "Source") { invalid =>
+      new AnalysisException(
         errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
-        messageParameters = Map("sourceName" -> sourceName))
+        messageParameters = Map("sourceName" -> invalid))
     }
   }
 
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index cb5ecc728c44..8f98466d1f17 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.TimeoutException
 
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.api.java.function.VoidFunction2
-import org.apache.spark.sql.{Dataset, ForeachWriter, WriteConfigMethods}
+import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter, 
WriteConfigMethods}
 
 /**
  * Interface used to write a streaming `Dataset` to external storage systems 
(e.g. file systems,
@@ -90,6 +90,19 @@ abstract class DataStreamWriter[T] extends 
WriteConfigMethods[DataStreamWriter[T
    */
   def queryName(queryName: String): this.type
 
+  /**
+   * Assigns a name to this streaming sink for sink evolution capability. When 
sinks are named,
+   * they can be tracked in checkpoint metadata, enabling query evolution.
+   *
+   * If not specified, sinks are automatically assigned a default name based 
on their position in
+   * the query, which maintains backward compatibility.
+   *
+   * @param sinkName
+   *   the unique name for this sink (alphanumeric and underscore only)
+   * @since 4.1.0
+   */
+  private[sql] def name(sinkName: String): this.type
+
   /**
    * Specifies the underlying output data source.
    *
@@ -217,6 +230,24 @@ abstract class DataStreamWriter[T] extends 
WriteConfigMethods[DataStreamWriter[T
   @throws[TimeoutException]
   def toTable(tableName: String): StreamingQuery
 
+  /**
+   * Validates that a streaming sink name only contains alphanumeric 
characters and underscores.
+   *
+   * @param sinkName
+   *   the sink name to validate
+   * @throws AnalysisException
+   *   if the sink name contains invalid characters
+   * @throws IllegalArgumentException
+   *   if the sink name is null or empty
+   */
+  private[sql] def validateSinkName(sinkName: String): Unit = {
+    StreamingNameValidator.validate(sinkName, "Sink") { invalid =>
+      new AnalysisException(
+        errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+        messageParameters = Map("sinkName" -> invalid))
+    }
+  }
+
   
///////////////////////////////////////////////////////////////////////////////////////
   // Covariant Overrides
   
///////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingNameValidator.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingNameValidator.scala
new file mode 100644
index 000000000000..9b8700844310
--- /dev/null
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingNameValidator.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.util.matching.Regex
+
+import org.apache.spark.sql.AnalysisException
+
+/**
+ * Shared validation for user-assigned streaming source and sink names. Names 
must be non-null,
+ * non-empty, and contain only alphanumeric characters and underscores.
+ */
+private[sql] object StreamingNameValidator {
+  private val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r
+
+  /**
+   * Validates the given streaming entity name. Throws an 
`IllegalArgumentException` if the name
+   * is null or empty, and invokes `onInvalid` to build the 
`AnalysisException` to throw if the
+   * name does not match the allowed character set.
+   *
+   * @param name
+   *   the source/sink name to validate
+   * @param entityKind
+   *   a human-readable label (e.g. "Source", "Sink") used in null/empty 
messages
+   * @param onInvalid
+   *   builds the AnalysisException to throw when `name` has invalid characters
+   */
+  def validate(name: String, entityKind: String)(onInvalid: String => 
AnalysisException): Unit = {
+    require(name != null, s"$entityKind name cannot be null")
+    require(name.nonEmpty, s"$entityKind name cannot be empty")
+    if (!validNamePattern.pattern.matcher(name).matches()) {
+      throw onInvalid(name)
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
index 884a4165d077..6e0583f77835 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.streaming.OutputMode
  */
 case class WriteToStream(
     name: String,
+    sinkName: Option[String],
     resolvedCheckpointLocation: String,
     sink: Table,
     outputMode: OutputMode,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
index 7015d0dd3b2c..61e64a526aed 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
  * rule [[ResolveStreamWrite]].
  *
  * @param userSpecifiedName  Query name optionally specified by the user.
+ * @param userSpecifiedSinkName  Sink name optionally specified by the user 
for sink evolution.
  * @param userSpecifiedCheckpointLocation  Checkpoint location optionally 
specified by the user.
  * @param useTempCheckpointLocation  Whether to use a temporary checkpoint 
location when the user
  *                                   has not specified one. If false, then 
error will be thrown.
@@ -47,6 +48,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
  */
 case class WriteToStreamStatement(
     userSpecifiedName: Option[String],
+    userSpecifiedSinkName: Option[String],
     userSpecifiedCheckpointLocation: Option[String],
     useTempCheckpointLocation: Boolean,
     recoverFromCheckpointLocation: Boolean,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9b899867a9e3..d5a9cc723bc3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2530,6 +2530,12 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("sourceName" -> sourceName))
   }
 
+  def invalidStreamingSinkNameError(sinkName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+      messageParameters = Map("sinkName" -> sinkName))
+  }
+
   def duplicateStreamingSourceNamesError(duplicateNames: Seq[String]): 
Throwable = {
     new AnalysisException(
       errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8ab725350448..6a2f9ad17b8b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3200,6 +3200,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ENABLE_STREAMING_SINK_EVOLUTION =
+    buildConf("spark.sql.streaming.queryEvolution.enableSinkEvolution")
+      .internal()
+      .doc("When true, streaming sinks can be named using the name() API on 
DataStreamWriter. " +
+        "This enables sink evolution capability where sinks can be changed 
while maintaining " +
+        "a historical record of all sinks used in the checkpoint.")
+      .version("4.1.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART =
     buildConf("spark.sql.streaming.checkUnfinishedRepartitionOnRestart")
       .internal()
@@ -7800,6 +7810,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def enableStreamingSourceEvolution: Boolean = 
getConf(ENABLE_STREAMING_SOURCE_EVOLUTION)
 
+  def enableStreamingSinkEvolution: Boolean = 
getConf(ENABLE_STREAMING_SINK_EVOLUTION)
+
   def streamingCheckUnfinishedRepartitionOnRestart: Boolean =
     getConf(STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART)
 
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
index ffa11b5d7ab0..bac41acc83f0 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala
@@ -82,6 +82,11 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T])
     this
   }
 
+  /** @inheritdoc */
+  private[sql] def name(sinkName: String): this.type = {
+    throw new UnsupportedOperationException("Sink naming is not supported in 
Spark Connect")
+  }
+
   /** @inheritdoc */
   def format(source: String): this.type = {
     sinkBuilder.setFormat(source)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala
index 3e2760594038..75c3fc3e356e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala
@@ -83,6 +83,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) 
extends streaming.D
     this
   }
 
+  /** @inheritdoc */
+  private[sql] def name(sinkName: String): this.type = {
+    validateSinkName(sinkName)
+    this.sinkName = Some(sinkName)
+    this
+  }
+
   /** @inheritdoc */
   def format(source: String): this.type = {
     this.source = source
@@ -312,6 +319,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) extends streaming.D
 
     ds.sparkSession.sessionState.streamingQueryManager.startQuery(
       newOptions.get("queryName"),
+      sinkName,
       newOptions.get("checkpointLocation"),
       ds,
       newOptions.originalMap,
@@ -444,6 +452,8 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) extends streaming.D
   private var partitioningColumns: Option[Seq[String]] = None
 
   private var clusteringColumns: Option[Seq[String]] = None
+
+  private var sinkName: Option[String] = None
 }
 
 object DataStreamWriter {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
index 72ae3b21d662..fff8d32a0709 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
@@ -176,6 +176,7 @@ class StreamingQueryManager private[sql] (
   // scalastyle:off argcount
   private def createQuery(
       userSpecifiedName: Option[String],
+      userSpecifiedSinkName: Option[String],
       userSpecifiedCheckpointLocation: Option[String],
       df: Dataset[_],
       extraOptions: Map[String, String],
@@ -207,6 +208,7 @@ class StreamingQueryManager private[sql] (
 
     val dataStreamWritePlan = WriteToStreamStatement(
       userSpecifiedName,
+      userSpecifiedSinkName,
       userSpecifiedCheckpointLocation,
       useTempCheckpointLocation,
       recoverFromCheckpointLocation,
@@ -277,6 +279,7 @@ class StreamingQueryManager private[sql] (
   @throws[TimeoutException]
   private[sql] def startQuery(
       userSpecifiedName: Option[String],
+      userSpecifiedSinkName: Option[String] = None,
       userSpecifiedCheckpointLocation: Option[String],
       df: Dataset[_],
       extraOptions: Map[String, String],
@@ -290,6 +293,7 @@ class StreamingQueryManager private[sql] (
       catalogTable: Option[CatalogTable] = None): StreamingQuery = {
     val query = createQuery(
       userSpecifiedName,
+      userSpecifiedSinkName,
       userSpecifiedCheckpointLocation,
       df,
       extraOptions,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index e6d0666aca25..d4fae034e797 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkIllegalArgumentException, 
SparkIllegalStateException}
+import org.apache.spark.{SparkException, SparkIllegalArgumentException, 
SparkIllegalStateException}
 import org.apache.spark.internal.LogKeys
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -112,6 +112,22 @@ class MicroBatchExecution(
 
   override protected def sourceToIdMap: Map[SparkDataStream, String] = 
sourceIdMap.map(_.swap)
 
+  // Sink name for commit log support
+  // If sink evolution is enabled, use user-provided sinkName (or error if not 
provided)
+  // Otherwise, always use DEFAULT_SINK_NAME for backward compatibility
+  private val sinkName: String = {
+    if (sparkSession.sessionState.conf.enableStreamingSinkEvolution) {
+      plan.sinkName.getOrElse {
+        throw new SparkException(
+          errorClass = 
"STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT",
+          messageParameters = Map.empty,
+          cause = null)
+      }
+    } else {
+      MicroBatchExecution.DEFAULT_SINK_NAME
+    }
+  }
+
   @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
 
   protected def getTrigger(): TriggerExecutor = {
@@ -1466,6 +1482,12 @@ class MicroBatchExecution(
 
 object MicroBatchExecution {
   val BATCH_ID_KEY = "streaming.sql.batchId"
+
+  /**
+   * Default sink name used when sink evolution is disabled or no explicit 
name is provided.
+   * This maintains backward compatibility with existing streaming queries.
+   */
+  private[sql] val DEFAULT_SINK_NAME = "sink-0"
 }
 
 case class OffsetHolder(start: OffsetV2, end: Option[OffsetV2]) extends 
LeafNode {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala
index ff0d71d0f075..0be430591dbd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala
@@ -66,6 +66,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] {
 
       WriteToStream(
         s.userSpecifiedName.orNull,
+        s.userSpecifiedSinkName,
         resolvedCheckpointLocation,
         s.sink,
         s.outputMode,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 0e33b271522d..a6067aaf189e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -598,6 +598,7 @@ trait StreamTest extends SharedSparkSession with TimeLimits 
{
             sparkSession
               .streams
               .startQuery(
+                None,
                 None,
                 Some(metadataRoot),
                 stream,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala
new file mode 100644
index 000000000000..a242faabaf92
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.test
+
+import org.scalatest.{BeforeAndAfterEach, Tag}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.util.Utils
+
+/**
+ * Test suite for streaming sink evolution features including:
+ * - Sink naming via DataStreamWriter.name()
+ * - Sink name validation
+ * - Sink evolution enforcement
+ */
+class StreamingSinkEvolutionSuite extends StreamTest with BeforeAndAfterEach {
+  import testImplicits._
+
+  private def newMetadataDir =
+    Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  override def afterEach(): Unit = {
+    spark.streams.active.foreach(_.stop())
+    super.afterEach()
+  }
+
+  // =========================
+  // Sink Name Validation Tests
+  // =========================
+
+  testWithSinkEvolution("invalid sink name - contains hyphen") {
+    val input = MemoryStream[Int]
+    input.addData(1, 2, 3)
+    checkError(
+      exception = intercept[AnalysisException] {
+        input.toDF().writeStream
+          .format("noop")
+          .name("my-sink")
+          .option("checkpointLocation", newMetadataDir)
+          .start()
+      },
+      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+      parameters = Map("sinkName" -> "my-sink"))
+  }
+
+  testWithSinkEvolution("invalid sink name - contains space") {
+    val input = MemoryStream[Int]
+    input.addData(1, 2, 3)
+    checkError(
+      exception = intercept[AnalysisException] {
+        input.toDF().writeStream
+          .format("noop")
+          .name("my sink")
+          .option("checkpointLocation", newMetadataDir)
+          .start()
+      },
+      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+      parameters = Map("sinkName" -> "my sink"))
+  }
+
+  testWithSinkEvolution("invalid sink name - contains special characters") {
+    val input = MemoryStream[Int]
+    input.addData(1, 2, 3)
+    checkError(
+      exception = intercept[AnalysisException] {
+        input.toDF().writeStream
+          .format("noop")
+          .name("my.sink@123!")
+          .option("checkpointLocation", newMetadataDir)
+          .start()
+      },
+      condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
+      parameters = Map("sinkName" -> "my.sink@123!"))
+  }
+
+  testWithSinkEvolution("valid sink names - various patterns") {
+    Seq("mySink", "my_sink", "MySink123", "_private", "sink_123_test", 
"123sink")
+      .foreach { sinkName =>
+        val checkpointDir = newMetadataDir
+        val input = MemoryStream[Int]
+        input.addData(1, 2, 3)
+        val q = input.toDF().writeStream
+          .format("noop")
+          .name(sinkName)
+          .option("checkpointLocation", checkpointDir)
+          .start()
+        q.processAllAvailable()
+        q.stop()
+      }
+  }
+
+  // ===========================
+  // Sink Evolution Enforcement
+  // ===========================
+
+  testWithSinkEvolution("unnamed sink with sink evolution enabled throws 
error") {
+    val input = MemoryStream[Int]
+    input.addData(1, 2, 3)
+    val exception = intercept[SparkException] {
+      val q = input.toDF().writeStream
+        .format("noop")
+        // No .name() call - sink is unnamed
+        .option("checkpointLocation", newMetadataDir)
+        .start()
+      q.processAllAvailable()
+      q.stop()
+    }
+
+    checkError(
+      exception = exception,
+      condition = 
"STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT",
+      parameters = Map.empty)
+  }
+
+  test("unnamed sink without sink evolution enabled uses default name") {
+    withSQLConf(
+      SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") {
+      val input = MemoryStream[Int]
+      input.addData(1, 2, 3)
+      // Should succeed - no name required when sink evolution is disabled
+      val q = input.toDF().writeStream
+        .format("noop")
+        .option("checkpointLocation", newMetadataDir)
+        .start()
+      q.processAllAvailable()
+      q.stop()
+    }
+  }
+
+  testWithSinkEvolution("named sink succeeds with sink evolution enabled") {
+    val input = MemoryStream[Int]
+    input.addData(1, 2, 3)
+    val q = input.toDF().writeStream
+      .format("noop")
+      .name("my_sink")
+      .option("checkpointLocation", newMetadataDir)
+      .start()
+    q.processAllAvailable()
+    q.stop()
+  }
+
+  testWithSinkEvolution("continuing with same sink name works") {
+    val checkpointDir = newMetadataDir
+    val input = MemoryStream[Int]
+
+    // Start with my_sink
+    input.addData(1, 2, 3)
+    val q1 = input.toDF().writeStream
+      .format("noop")
+      .name("my_sink")
+      .option("checkpointLocation", checkpointDir)
+      .start()
+    q1.processAllAvailable()
+    q1.stop()
+
+    // Restart with same sink name - should work
+    input.addData(4, 5, 6)
+    val q2 = input.toDF().writeStream
+      .format("noop")
+      .name("my_sink")
+      .option("checkpointLocation", checkpointDir)
+      .start()
+    q2.processAllAvailable()
+    q2.stop()
+  }
+
+  // ==============
+  // Helper Methods
+  // ==============
+
+  /**
+   * Helper method to run tests with sink evolution enabled.
+   */
+  def testWithSinkEvolution(testName: String, testTags: Tag*)(testBody: => 
Any): Unit = {
+    test(testName, testTags: _*) {
+      withSQLConf(
+        SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "true") {
+        testBody
+      }
+    }
+  }
+}
diff --git 
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
 
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
index c8dc75ae9182..de94d0127931 100644
--- 
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
+++ 
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
@@ -1037,6 +1037,7 @@ spark.sql.streaming.numRecentProgressUpdates
 spark.sql.streaming.offsetLog.formatVersion
 spark.sql.streaming.optimizeOneRowPlan.enabled
 spark.sql.streaming.pollingDelay
+spark.sql.streaming.queryEvolution.enableSinkEvolution
 spark.sql.streaming.queryEvolution.enableSourceEvolution
 spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint
 spark.sql.streaming.realTimeMode.allowlistCheck


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to