This is an automated email from the ASF dual-hosted git repository.

gengliangwang pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new df8f7c08ee4d [SPARK-56961][SQL] Pass all options while loading 
changelog
df8f7c08ee4d is described below

commit df8f7c08ee4da04fd3d86265f0bf1e7fed6d74d4
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sat May 23 10:35:20 2026 -0700

    [SPARK-56961][SQL] Pass all options while loading changelog
    
    ### What changes were proposed in this pull request?
    
    This PR passes all specified options while loading changelogs.
    
    ### Why are the changes needed?
    
    These changes are needed to make the API usable in connectors like Iceberg 
and Delta.
    
    ### Does this PR introduce _any_ user-facing change?
    
    The functionality hasn't been released yet.
    
    ### How was this patch tested?
    
    This PR comes with tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude Code v2.1.145.
    
    Closes #56044 from aokolnychyi/spark-56961.
    
    Lead-authored-by: Anton Okolnychyi <[email protected]>
    Co-authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
    (cherry picked from commit da6e110231beea1fa1bd0d259c2b49c7ea4d5085)
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../{ChangelogInfo.java => ChangelogContext.java}  | 12 ++--
 .../spark/sql/connector/catalog/TableCatalog.java  | 16 ++++--
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  4 +-
 ...InfoUtils.scala => ChangelogContextUtils.scala} | 18 +++---
 .../sql/catalyst/analysis/RelationChanges.scala    |  6 +-
 .../sql/catalyst/analysis/RelationResolution.scala | 10 ++--
 .../catalyst/analysis/ResolveChangelogTable.scala  | 22 +++----
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 35 ++++++-----
 .../execution/datasources/v2/ChangelogTable.scala  |  4 +-
 ...uite.scala => ChangelogContextUtilsSuite.scala} | 50 ++++++++--------
 .../sql/catalyst/parser/PlanParserSuite.scala      | 40 ++++++-------
 .../parser/StreamRelationParserSuite.scala         | 23 ++++----
 .../catalog/InMemoryChangelogCatalog.scala         | 19 +++---
 .../streaming_changes_API_with_options.explain     |  2 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |  6 +-
 .../apache/spark/sql/classic/DataFrameReader.scala |  6 +-
 .../spark/sql/classic/DataStreamReader.scala       |  6 +-
 .../sql/connector/ChangelogEndToEndSuite.scala     | 28 ++++-----
 .../sql/connector/ChangelogResolutionSuite.scala   | 67 ++++++++++++++++++++--
 19 files changed, 221 insertions(+), 153 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
similarity index 89%
rename from 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java
rename to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
index 04a6d055f56b..d6ea072b9634 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
@@ -20,16 +20,18 @@ package org.apache.spark.sql.connector.catalog;
 import java.util.Objects;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * Encapsulates the parameters of a Change Data Capture (CDC) query, passed 
from the
  * parser / DataFrame API to the catalog's
- * {@link TableCatalog#loadChangelog(Identifier, ChangelogInfo)} method.
+ * {@link TableCatalog#loadChangelog(Identifier, ChangelogContext, 
CaseInsensitiveStringMap)}
+ * method.
  *
  * @since 4.2.0
  */
 @Evolving
-public class ChangelogInfo {
+public class ChangelogContext {
 
   /**
    * Deduplication modes controlling how Spark post-processes raw change data.
@@ -47,7 +49,7 @@ public class ChangelogInfo {
   private final DeduplicationMode deduplicationMode;
   private final boolean computeUpdates;
 
-  public ChangelogInfo(
+  public ChangelogContext(
       ChangelogRange range,
       DeduplicationMode deduplicationMode,
       boolean computeUpdates) {
@@ -68,7 +70,7 @@ public class ChangelogInfo {
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
-    if (!(o instanceof ChangelogInfo that)) return false;
+    if (!(o instanceof ChangelogContext that)) return false;
     return computeUpdates == that.computeUpdates
         && Objects.equals(range, that.range)
         && deduplicationMode == that.deduplicationMode;
@@ -81,7 +83,7 @@ public class ChangelogInfo {
 
   @Override
   public String toString() {
-    return "ChangelogInfo{range=" + range +
+    return "ChangelogContext{range=" + range +
         ", deduplicationMode=" + deduplicationMode +
         ", computeUpdates=" + computeUpdates + "}";
   }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index a6f51342aef5..f64c34ee0e07 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -25,6 +25,7 @@ import 
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.apache.spark.sql.errors.QueryCompilationErrors;
 import org.apache.spark.sql.errors.QueryExecutionErrors;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.util.ArrayList;
 import java.util.Map;
@@ -195,22 +196,25 @@ public interface TableCatalog extends CatalogPlugin {
 
   /**
    * Load a {@link Changelog} for the given table, representing the row-level 
changes within the
-   * range specified by {@code changelogInfo}.
+   * range specified by {@code context}.
    * <p>
    * The default implementation throws an analysis exception indicating that 
the catalog does
    * not support CDC. Catalogs that support CDC must override this method.
    *
    * @param ident a table identifier
-   * @param changelogInfo the CDC query parameters (range, deduplication mode, 
etc.)
+   * @param context the CDC query context (range, deduplication mode, etc.)
+   * @param options all options passed to the changelog query, including the 
CDC-recognized
+   *                keys (range, deduplication mode, etc.) that are also 
parsed into {@code context}
    * @return a Changelog instance for the requested table and range
    * @throws NoSuchTableException If the table doesn't exist
    *
    * @since 4.2.0
    */
-  default Changelog loadChangelog(Identifier ident, ChangelogInfo 
changelogInfo)
-      throws NoSuchTableException {
-    throw new UnsupportedOperationException(
-        name() + " does not support Change Data Capture (CDC)");
+  default Changelog loadChangelog(
+      Identifier ident,
+      ChangelogContext context,
+      CaseInsensitiveStringMap options) throws NoSuchTableException {
+    throw new UnsupportedOperationException(name() + " does not support Change 
Data Capture (CDC)");
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 95a856ac28e6..d123d36c23b2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1140,8 +1140,8 @@ class Analyzer(
         val timeTravelSpec = TimeTravelSpec.create(timestamp, version, 
conf.sessionLocalTimeZone)
         resolveRelation(u, timeTravelSpec).getOrElse(r)
 
-      case r @ RelationChanges(u: UnresolvedRelation, changelogInfo) =>
-        relationResolution.resolveChangelog(u, changelogInfo).getOrElse(r)
+      case r @ RelationChanges(u: UnresolvedRelation, ctx) =>
+        relationResolution.resolveChangelog(u, ctx).getOrElse(r)
 
       case u @ UnresolvedTable(identifier, cmd, suggestAlternative) =>
         lookupTableOrView(identifier).map {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
similarity index 87%
rename from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala
rename to 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
index fb7ae01843d6..2b679d955f52 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
@@ -21,16 +21,16 @@ import java.lang.{Long => JLong}
 import java.util.{Locale, Optional => JOptional}
 
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
-import org.apache.spark.sql.connector.catalog.ChangelogInfo
+import org.apache.spark.sql.connector.catalog.ChangelogContext
 import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, 
UnboundedRange, VersionRange}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
- * Utility methods for constructing [[ChangelogInfo]] from DataFrame API 
options.
+ * Utility methods for constructing [[ChangelogContext]] from DataFrame API 
options.
  */
-object ChangelogInfoUtils {
+object ChangelogContextUtils {
 
   private val STARTING_VERSION = "startingVersion"
   private val ENDING_VERSION = "endingVersion"
@@ -42,12 +42,12 @@ object ChangelogInfoUtils {
   private val COMPUTE_UPDATES = "computeUpdates"
 
   /**
-   * Build a [[ChangelogInfo]] from the options specified via `.option()` 
calls on
+   * Build a [[ChangelogContext]] from the options specified via `.option()` 
calls on
    * `DataFrameReader` or `DataStreamReader`.
    */
   def fromOptions(
       options: CaseInsensitiveStringMap,
-      sessionLocalTimeZone: String): ChangelogInfo = {
+      sessionLocalTimeZone: String): ChangelogContext = {
     val startVersion = Option(options.get(STARTING_VERSION))
     val endVersion = Option(options.get(ENDING_VERSION))
     val startTimestamp = Option(options.get(STARTING_TIMESTAMP))
@@ -59,9 +59,9 @@ object ChangelogInfoUtils {
     val deduplicationModeStr = Option(options.get(DEDUPLICATION_MODE))
       .getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
     val deduplicationMode = deduplicationModeStr match {
-      case "none" => ChangelogInfo.DeduplicationMode.NONE
-      case "dropcarryovers" => ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS
-      case "netchanges" => ChangelogInfo.DeduplicationMode.NET_CHANGES
+      case "none" => ChangelogContext.DeduplicationMode.NONE
+      case "dropcarryovers" => 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
+      case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
       case other =>
         throw 
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
     }
@@ -98,7 +98,7 @@ object ChangelogInfoUtils {
       new UnboundedRange()
     }
 
-    new ChangelogInfo(range, deduplicationMode, computeUpdates)
+    new ChangelogContext(range, deduplicationMode, computeUpdates)
   }
 
   private def parseTimestamp(timestampStr: String, sessionLocalTimeZone: 
String): Long = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala
index 2b4ba58d1745..84f82ffc1f2a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.trees.TreePattern.{RELATION_CHANGES, 
TreePattern}
-import org.apache.spark.sql.connector.catalog.ChangelogInfo
+import org.apache.spark.sql.connector.catalog.ChangelogContext
 
 /**
  * A logical node used to query Change Data Capture (CDC) changes for a table 
relation.
@@ -33,10 +33,10 @@ import org.apache.spark.sql.connector.catalog.ChangelogInfo
  * [[UnresolvedLeafNode]]). Tree traversals like `transformUp` will not visit 
`relation`.
  *
  * @param relation the table relation (typically an [[UnresolvedRelation]])
- * @param changelogInfo the CDC query parameters (range, deduplication mode, 
etc.)
+ * @param changelogContext the CDC query context (range, deduplication mode, 
etc.)
  */
 case class RelationChanges(
     relation: LogicalPlan,
-    changelogInfo: ChangelogInfo) extends UnresolvedLeafNode {
+    changelogContext: ChangelogContext) extends UnresolvedLeafNode {
   override val nodePatterns: Seq[TreePattern] = Seq(RELATION_CHANGES)
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
index 8769d1c4e4ff..55a7ad10790e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.{
   CatalogManager,
   CatalogPlugin,
   CatalogV2Util,
-  ChangelogInfo,
+  ChangelogContext,
   Identifier,
   LookupCatalog,
   MetadataTable,
@@ -330,19 +330,17 @@ class RelationResolution(
    * Resolve a CDC (CHANGES) query: look up the catalog, call loadChangelog(), 
wrap in
    * ChangelogTable, and return a DataSourceV2Relation.
    */
-  def resolveChangelog(
-      u: UnresolvedRelation,
-      changelogInfo: ChangelogInfo): Option[LogicalPlan] = {
+  def resolveChangelog(u: UnresolvedRelation, ctx: ChangelogContext): 
Option[LogicalPlan] = {
     expandIdentifier(u.multipartIdentifier) match {
       case CatalogAndIdentifier(catalog, ident) =>
         val tableCatalog = catalog.asTableCatalog
         val changelog = try {
-          tableCatalog.loadChangelog(ident, changelogInfo)
+          tableCatalog.loadChangelog(ident, ctx, u.options)
         } catch {
           case _: UnsupportedOperationException =>
             throw 
QueryCompilationErrors.cdcNotSupportedError(tableCatalog.name())
         }
-        val changelogTable = ChangelogTable(changelog, changelogInfo)
+        val changelogTable = ChangelogTable(changelog, ctx)
         val relation = if (u.isStreaming) {
           StreamingRelationV2(
             None, changelogTable.name, changelogTable, u.options,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala
index 7db9f6fa405b..ed9333d3a27a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
-import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo}
+import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogContext}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, 
DataSourceV2Relation}
 import org.apache.spark.sql.streaming.{OutputMode, StatefulProcessor}
@@ -123,7 +123,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp 
{
     case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) if 
!table.resolved =>
       val changelog = table.changelog
-      val req = evaluateRequirements(changelog, table.changelogInfo)
+      val req = evaluateRequirements(changelog, table.changelogContext)
 
       val resolvedRel = rel.copy(table = table.copy(resolved = true))
       var updatedRel: LogicalPlan = resolvedRel
@@ -140,14 +140,14 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
         val rowIdExprs =
           
V2ExpressionUtils.resolveRefs[NamedExpression](changelog.rowId().toSeq, 
resolvedRel)
         updatedRel = injectNetChangeComputation(
-          updatedRel, rowIdExprs, table.changelogInfo.computeUpdates())
+          updatedRel, rowIdExprs, table.changelogContext.computeUpdates())
       }
       updatedRel
 
     case rel @ StreamingRelationV2(_, _, table: ChangelogTable, _, _, _, _, _, 
_)
         if !table.resolved =>
       val changelog = table.changelog
-      val req = evaluateRequirements(changelog, table.changelogInfo)
+      val req = evaluateRequirements(changelog, table.changelogContext)
       val resolvedRel = rel.copy(table = table.copy(resolved = true))
       var updatedRel: LogicalPlan = resolvedRel
       if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) {
@@ -164,7 +164,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
         // output, so name-based resolution against `updatedRel` recovers the 
right
         // attributes regardless of any preceding wrapping.
         updatedRel = addStreamingNetChangeComputation(
-          updatedRel, changelog, table.changelogInfo.computeUpdates())
+          updatedRel, changelog, table.changelogContext.computeUpdates())
       }
       updatedRel
   }
@@ -175,7 +175,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
 
   /**
    * Captures which post-processing passes a CDC query requires, derived from 
the
-   * user-provided [[ChangelogInfo]] options and the connector-declared 
[[Changelog]]
+   * user-provided [[ChangelogContext]] and the connector-declared 
[[Changelog]]
    * capability flags.
    */
   private case class PostProcessingRequirements(
@@ -194,14 +194,14 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
    */
   private def evaluateRequirements(
       changelog: Changelog,
-      options: ChangelogInfo): PostProcessingRequirements = {
+      context: ChangelogContext): PostProcessingRequirements = {
     val requiresCarryOverRemoval =
-      options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE &&
+      context.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE &&
         changelog.containsCarryoverRows()
     val requiresUpdateDetection =
-      options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
+      context.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
     val requiresNetChanges =
-      options.deduplicationMode() == 
ChangelogInfo.DeduplicationMode.NET_CHANGES &&
+      context.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NET_CHANGES &&
         changelog.containsIntermediateChanges()
 
     // If carry-overs are surfaced and update detection is enabled without 
carry-over
@@ -209,7 +209,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
     // results. Hence we throw.
     if (requiresUpdateDetection &&
         changelog.containsCarryoverRows() &&
-        options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) {
+        context.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NONE) {
       throw QueryCompilationErrors.cdcUpdateDetectionRequiresCarryOverRemoval(
         changelog.name())
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index cdb01c36d744..c5eb458e2580 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -47,7 +47,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, 
DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, 
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, 
stringToTime, stringToTimestamp, stringToTimestampWithoutTimeZone}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo, 
PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
ChangelogContext, PathElement, SupportsNamespaces, TableCatalog, 
TableWritePrivilege}
 import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, 
UnboundedRange, VersionRange}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
@@ -2635,17 +2635,17 @@ class AstBuilder extends DataTypeAstBuilder
     withOrigin(ctx) {
       val relation = createUnresolvedRelation(ctx.identifierReference, 
Option(ctx.optionsClause))
       val options = resolveOptions(Option(ctx.optionsClause))
-      val changelogInfo = buildChangelogInfo(ctx.changesClause, options)
-      val result = RelationChanges(relation, changelogInfo)
+      val changelogContext = buildChangelogContext(ctx.changesClause, options)
+      val result = RelationChanges(relation, changelogContext)
       mayApplyAliasPlan(ctx.tableAlias, result)
     }
 
   /**
-   * Build a [[ChangelogInfo]] from a batch changesClause context and optional 
WITH options.
+   * Build a [[ChangelogContext]] from a batch changesClause context and 
optional WITH options.
    */
-  private def buildChangelogInfo(
+  private def buildChangelogContext(
       ctx: ChangesClauseContext,
-      options: CaseInsensitiveStringMap): ChangelogInfo = {
+      options: CaseInsensitiveStringMap): ChangelogContext = {
     val startExclusive = ctx.startExclusive != null
     val endExclusive = ctx.endExclusive != null
     val startInclusive = !startExclusive
@@ -2690,16 +2690,16 @@ class AstBuilder extends DataTypeAstBuilder
     }
 
     val (deduplicationMode, computeUpdates) = resolveChangelogOptions(options)
-    new ChangelogInfo(range, deduplicationMode, computeUpdates)
+    new ChangelogContext(range, deduplicationMode, computeUpdates)
   }
 
   /**
-   * Build a [[ChangelogInfo]] from a streaming streamChangesClause context 
and optional
+   * Build a [[ChangelogContext]] from a streaming streamChangesClause context 
and optional
    * WITH options.
    */
-  private def buildStreamChangelogInfo(
+  private def buildStreamChangelogContext(
       ctx: StreamChangesClauseContext,
-      options: CaseInsensitiveStringMap): ChangelogInfo = {
+      options: CaseInsensitiveStringMap): ChangelogContext = {
     val startExclusive = ctx.startExclusive != null
     val startInclusive = !startExclusive
 
@@ -2730,7 +2730,7 @@ class AstBuilder extends DataTypeAstBuilder
     }
 
     val (deduplicationMode, computeUpdates) = resolveChangelogOptions(options)
-    new ChangelogInfo(range, deduplicationMode, computeUpdates)
+    new ChangelogContext(range, deduplicationMode, computeUpdates)
   }
 
   /**
@@ -2738,14 +2738,13 @@ class AstBuilder extends DataTypeAstBuilder
    * Defaults: DROP_CARRYOVERS for deduplicationMode, false for computeUpdates.
    */
   private def resolveChangelogOptions(
-      options: CaseInsensitiveStringMap)
-      : (ChangelogInfo.DeduplicationMode, Boolean) = {
+      options: CaseInsensitiveStringMap): (ChangelogContext.DeduplicationMode, 
Boolean) = {
     val deduplicationModeStr = Option(options.get("deduplicationMode"))
       .getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
     val deduplicationMode = deduplicationModeStr match {
-      case "none" => ChangelogInfo.DeduplicationMode.NONE
-      case "dropcarryovers" => ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS
-      case "netchanges" => ChangelogInfo.DeduplicationMode.NET_CHANGES
+      case "none" => ChangelogContext.DeduplicationMode.NONE
+      case "dropcarryovers" => 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
+      case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
       case other =>
         throw 
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
     }
@@ -2911,8 +2910,8 @@ class AstBuilder extends DataTypeAstBuilder
       case Some(changesCtx) =>
         // Streaming CDC: wrap in RelationChanges and NamedStreamingRelation
         val options = resolveOptions(Option(ctx.optionsClause))
-        val changelogInfo = buildStreamChangelogInfo(changesCtx, options)
-        val result = RelationChanges(relation, changelogInfo)
+        val changelogContext = buildStreamChangelogContext(changesCtx, options)
+        val result = RelationChanges(relation, changelogContext)
         val table = mayApplyAliasPlan(ctx.tableAlias, result)
         val tableWithWatermark = 
table.optionalMap(ctx.watermarkClause)(withWatermark)
         val sourceNameOpt = extractSourceName(ctx.identifiedByClause)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala
index 0a341fe90687..ec45f1f37317 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import java.util.{EnumSet => JEnumSet, Set => JSet}
 
-import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, 
Column, SupportsRead, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogContext, 
Column, SupportsRead, Table, TableCapability}
 import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, 
MICRO_BATCH_READ}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -35,7 +35,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  */
 case class ChangelogTable(
     changelog: Changelog,
-    changelogInfo: ChangelogInfo,
+    changelogContext: ChangelogContext,
     resolved: Boolean = false) extends Table with SupportsRead {
 
   // Validate that the connector returned a schema with the required CDC 
metadata columns
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtilsSuite.scala
similarity index 80%
rename from 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala
rename to 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtilsSuite.scala
index 312754fa24dd..93bab0009d67 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtilsSuite.scala
@@ -22,11 +22,11 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.plans.SQLHelper
-import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange}
+import org.apache.spark.sql.connector.catalog.{ChangelogContext, 
ChangelogRange}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper {
+class ChangelogContextUtilsSuite extends SparkFunSuite with SQLHelper {
 
   private val testTimeZone = "UTC"
 
@@ -35,7 +35,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("version range with both start and end") {
-    val info = ChangelogInfoUtils.fromOptions(
+    val info = ChangelogContextUtils.fromOptions(
       makeOptions("startingVersion" -> "1", "endingVersion" -> "5"), 
testTimeZone)
     val range = info.range().asInstanceOf[ChangelogRange.VersionRange]
     assert(range.startingVersion() == "1")
@@ -45,7 +45,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("version range with only start") {
-    val info = ChangelogInfoUtils.fromOptions(
+    val info = ChangelogContextUtils.fromOptions(
       makeOptions("startingVersion" -> "10"), testTimeZone)
     val range = info.range().asInstanceOf[ChangelogRange.VersionRange]
     assert(range.startingVersion() == "10")
@@ -55,14 +55,14 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
   test("version range - endingVersion without startingVersion throws") {
     checkError(
       intercept[AnalysisException] {
-        ChangelogInfoUtils.fromOptions(
+        ChangelogContextUtils.fromOptions(
           makeOptions("endingVersion" -> "5"), testTimeZone)
       },
       condition = "INVALID_CDC_OPTION.MISSING_STARTING_VERSION")
   }
 
   test("timestamp range with both start and end") {
-    val info = ChangelogInfoUtils.fromOptions(
+    val info = ChangelogContextUtils.fromOptions(
       makeOptions("startingTimestamp" -> "2026-01-01", "endingTimestamp" -> 
"2026-02-01"),
       testTimeZone)
     val range = info.range().asInstanceOf[ChangelogRange.TimestampRange]
@@ -72,7 +72,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("timestamp range with only start") {
-    val info = ChangelogInfoUtils.fromOptions(
+    val info = ChangelogContextUtils.fromOptions(
       makeOptions("startingTimestamp" -> "2026-01-01"), testTimeZone)
     val range = info.range().asInstanceOf[ChangelogRange.TimestampRange]
     assert(!range.endingTimestamp().isPresent)
@@ -81,7 +81,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
   test("timestamp range - endingTimestamp without startingTimestamp throws") {
     checkError(
       intercept[AnalysisException] {
-        ChangelogInfoUtils.fromOptions(
+        ChangelogContextUtils.fromOptions(
           makeOptions("endingTimestamp" -> "2026-02-01"), testTimeZone)
       },
       condition = "INVALID_CDC_OPTION.MISSING_STARTING_TIMESTAMP")
@@ -90,7 +90,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
   test("cannot mix version and timestamp range") {
     checkError(
       intercept[AnalysisException] {
-        ChangelogInfoUtils.fromOptions(
+        ChangelogContextUtils.fromOptions(
           makeOptions("startingVersion" -> "1", "startingTimestamp" -> 
"2026-01-01"),
           testTimeZone)
       },
@@ -98,37 +98,37 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("unbounded range when no version or timestamp specified") {
-    val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone)
+    val info = ChangelogContextUtils.fromOptions(makeOptions(), testTimeZone)
     assert(info.range().isInstanceOf[ChangelogRange.UnboundedRange])
   }
 
   test("deduplication mode - none") {
-    val info = ChangelogInfoUtils.fromOptions(
+    val info = ChangelogContextUtils.fromOptions(
       makeOptions("deduplicationMode" -> "none"), testTimeZone)
-    assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE)
+    assert(info.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE)
   }
 
   test("deduplication mode - dropCarryovers (default)") {
-    val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone)
-    assert(info.deduplicationMode() == 
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+    val info = ChangelogContextUtils.fromOptions(makeOptions(), testTimeZone)
+    assert(info.deduplicationMode() == 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
   }
 
   test("deduplication mode - netChanges") {
-    val info = ChangelogInfoUtils.fromOptions(
+    val info = ChangelogContextUtils.fromOptions(
       makeOptions("deduplicationMode" -> "netChanges"), testTimeZone)
-    assert(info.deduplicationMode() == 
ChangelogInfo.DeduplicationMode.NET_CHANGES)
+    assert(info.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NET_CHANGES)
   }
 
   test("deduplication mode - case insensitive") {
-    val info = ChangelogInfoUtils.fromOptions(
+    val info = ChangelogContextUtils.fromOptions(
       makeOptions("deduplicationMode" -> "DROPCARRYOVERS"), testTimeZone)
-    assert(info.deduplicationMode() == 
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+    assert(info.deduplicationMode() == 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
   }
 
   test("deduplication mode - invalid value throws") {
     checkError(
       intercept[AnalysisException] {
-        ChangelogInfoUtils.fromOptions(
+        ChangelogContextUtils.fromOptions(
           makeOptions("deduplicationMode" -> "invalid"), testTimeZone)
       },
       condition = "INVALID_CDC_OPTION.INVALID_DEDUPLICATION_MODE",
@@ -136,18 +136,18 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("computeUpdates option") {
-    val info = ChangelogInfoUtils.fromOptions(
+    val info = ChangelogContextUtils.fromOptions(
       makeOptions("computeUpdates" -> "true"), testTimeZone)
     assert(info.computeUpdates())
   }
 
   test("computeUpdates defaults to false") {
-    val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone)
+    val info = ChangelogContextUtils.fromOptions(makeOptions(), testTimeZone)
     assert(!info.computeUpdates())
   }
 
   test("bound inclusivity options") {
-    val info = ChangelogInfoUtils.fromOptions(
+    val info = ChangelogContextUtils.fromOptions(
       makeOptions(
         "startingVersion" -> "1",
         "endingVersion" -> "5",
@@ -162,7 +162,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
   test("invalid timestamp throws") {
     checkError(
       intercept[AnalysisException] {
-        ChangelogInfoUtils.fromOptions(
+        ChangelogContextUtils.fromOptions(
           makeOptions("startingTimestamp" -> "not-a-timestamp"), testTimeZone)
       },
       condition = "INVALID_CDC_OPTION.INVALID_TIMESTAMP",
@@ -177,7 +177,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
     // = 2026-01-01 08:00:00 UTC = expectedUtcMicros + 8h
     val expectedPstMicros = 1767254400000000L
 
-    val utcInfo = ChangelogInfoUtils.fromOptions(
+    val utcInfo = ChangelogContextUtils.fromOptions(
       makeOptions("startingTimestamp" -> tsStr), "UTC")
     val utcRange =
       utcInfo.range().asInstanceOf[ChangelogRange.TimestampRange]
@@ -185,7 +185,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with 
SQLHelper {
 
     withSQLConf(
         SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") {
-      val laInfo = ChangelogInfoUtils.fromOptions(
+      val laInfo = ChangelogContextUtils.fromOptions(
         makeOptions("startingTimestamp" -> tsStr),
         SQLConf.get.sessionLocalTimeZone)
       val laRange =
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 1ecb7fa539c2..b17634e0b56b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{EvaluateUnresolvedInlineTable, 
IntervalUtils}
-import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange}
+import org.apache.spark.sql.connector.catalog.{ChangelogContext, 
ChangelogRange}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, 
LongType, StringType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -2194,14 +2194,14 @@ class PlanParserSuite extends AnalysisTest {
         endInclusive: Boolean = true): RelationChanges = {
       RelationChanges(
         UnresolvedRelation(Seq("a", "b", "c")),
-        new ChangelogInfo(
+        new ChangelogContext(
           new ChangelogRange.VersionRange(
             startVersion,
             endVersion.map(java.util.Optional.of[String])
               .getOrElse(java.util.Optional.empty[String]),
             startInclusive,
             endInclusive),
-          ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS,
+          ChangelogContext.DeduplicationMode.DROP_CARRYOVERS,
           false))
     }
 
@@ -2262,7 +2262,7 @@ class PlanParserSuite extends AnalysisTest {
         case rc: RelationChanges => rc
         case sa: SubqueryAlias => sa.child.asInstanceOf[RelationChanges]
       }
-      changes.changelogInfo.range().asInstanceOf[ChangelogRange.TimestampRange]
+      
changes.changelogContext.range().asInstanceOf[ChangelogRange.TimestampRange]
     }
 
     // Basic timestamp range
@@ -2300,54 +2300,54 @@ class PlanParserSuite extends AnalysisTest {
   }
 
   test("CHANGES clause - with options") {
-    def assertChangelogInfo(sql: String): ChangelogInfo = {
+    def assertChangelogContext(sql: String): ChangelogContext = {
       val plan = parsePlan(sql)
       val project = plan.asInstanceOf[Project]
       val changes = project.child match {
         case rc: RelationChanges => rc
         case sa: SubqueryAlias => sa.child.asInstanceOf[RelationChanges]
       }
-      changes.changelogInfo
+      changes.changelogContext
     }
 
     // Default: DROP_CARRYOVERS and computeUpdates = false
-    val info1 = assertChangelogInfo(
+    val info1 = assertChangelogContext(
       "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20")
-    assert(info1.deduplicationMode() == 
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+    assert(info1.deduplicationMode() == 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
     assert(!info1.computeUpdates())
 
     // deduplicationMode = none
-    val info2 = assertChangelogInfo(
+    val info2 = assertChangelogContext(
       "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
         "WITH (deduplicationMode = 'none')")
-    assert(info2.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE)
+    assert(info2.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NONE)
     assert(!info2.computeUpdates())
 
     // deduplicationMode = netChanges
-    val info3 = assertChangelogInfo(
+    val info3 = assertChangelogContext(
       "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
         "WITH (deduplicationMode = 'netChanges')")
-    assert(info3.deduplicationMode() == 
ChangelogInfo.DeduplicationMode.NET_CHANGES)
+    assert(info3.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NET_CHANGES)
 
     // computeUpdates = true
-    val info4 = assertChangelogInfo(
+    val info4 = assertChangelogContext(
       "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
         "WITH (computeUpdates = 'true')")
-    assert(info4.deduplicationMode() == 
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+    assert(info4.deduplicationMode() == 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
     assert(info4.computeUpdates())
 
     // Both options together
-    val info5 = assertChangelogInfo(
+    val info5 = assertChangelogContext(
       "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
         "WITH (deduplicationMode = 'none', computeUpdates = 'true')")
-    assert(info5.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE)
+    assert(info5.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NONE)
     assert(info5.computeUpdates())
 
     // Case-insensitive deduplicationMode value
-    val info6 = assertChangelogInfo(
+    val info6 = assertChangelogContext(
       "SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
         "WITH (deduplicationMode = 'DROPCARRYOVERS')")
-    assert(info6.deduplicationMode() == 
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+    assert(info6.deduplicationMode() == 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
   }
 
   test("CHANGES clause - invalid deduplicationMode") {
@@ -2378,10 +2378,10 @@ class PlanParserSuite extends AnalysisTest {
       Project(Seq(UnresolvedStar(None)),
         RelationChanges(
           UnresolvedRelation(Seq("my_table")),
-          new ChangelogInfo(
+          new ChangelogContext(
             new ChangelogRange.VersionRange(
               "1", java.util.Optional.empty[String], true, true),
-            ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS,
+            ChangelogContext.DeduplicationMode.DROP_CARRYOVERS,
             false))))
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
index 880431b189a7..61e193bf54c7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.AliasIdentifier
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, 
NamedStreamingRelation, RelationChanges, UnresolvedRelation, UnresolvedStar, 
UnresolvedTableValuedFunction}
 import org.apache.spark.sql.catalyst.plans.logical.{Project, SubqueryAlias}
 import org.apache.spark.sql.catalyst.streaming.{Unassigned, UserProvided}
-import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange}
+import org.apache.spark.sql.connector.catalog.{ChangelogContext, 
ChangelogRange}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class StreamRelationParserSuite extends AnalysisTest {
@@ -594,17 +594,18 @@ class StreamRelationParserSuite extends AnalysisTest {
     val plan = parsePlan("SELECT * FROM STREAM t CHANGES")
     val relationChanges = plan.collect { case rc: RelationChanges => rc }
     assert(relationChanges.size == 1)
-    
assert(relationChanges.head.changelogInfo.range().isInstanceOf[ChangelogRange.UnboundedRange])
-    assert(relationChanges.head.changelogInfo.deduplicationMode() ==
-      ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
-    assert(!relationChanges.head.changelogInfo.computeUpdates())
+    assert(relationChanges.head.changelogContext.range()
+      .isInstanceOf[ChangelogRange.UnboundedRange])
+    assert(relationChanges.head.changelogContext.deduplicationMode() ==
+      ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
+    assert(!relationChanges.head.changelogContext.computeUpdates())
   }
 
   test("STREAM t CHANGES FROM VERSION") {
     val plan = parsePlan("SELECT * FROM STREAM t CHANGES FROM VERSION 1")
     val relationChanges = plan.collect { case rc: RelationChanges => rc }
     assert(relationChanges.size == 1)
-    val range = relationChanges.head.changelogInfo.range()
+    val range = relationChanges.head.changelogContext.range()
       .asInstanceOf[ChangelogRange.VersionRange]
     assert(range.startingVersion() == "1")
     assert(!range.endingVersion().isPresent)
@@ -615,7 +616,7 @@ class StreamRelationParserSuite extends AnalysisTest {
     val plan = parsePlan("SELECT * FROM STREAM t CHANGES FROM VERSION 5 
EXCLUSIVE")
     val relationChanges = plan.collect { case rc: RelationChanges => rc }
     assert(relationChanges.size == 1)
-    val range = relationChanges.head.changelogInfo.range()
+    val range = relationChanges.head.changelogContext.range()
       .asInstanceOf[ChangelogRange.VersionRange]
     assert(range.startingVersion() == "5")
     assert(!range.startingBoundInclusive())
@@ -625,7 +626,7 @@ class StreamRelationParserSuite extends AnalysisTest {
     val plan = parsePlan("SELECT * FROM STREAM t CHANGES FROM TIMESTAMP 
'2026-01-01'")
     val relationChanges = plan.collect { case rc: RelationChanges => rc }
     assert(relationChanges.size == 1)
-    assert(relationChanges.head.changelogInfo.range()
+    assert(relationChanges.head.changelogContext.range()
       .isInstanceOf[ChangelogRange.TimestampRange])
   }
 
@@ -647,9 +648,9 @@ class StreamRelationParserSuite extends AnalysisTest {
         "WITH (deduplicationMode = 'none', computeUpdates = 'true')")
     val relationChanges = plan.collect { case rc: RelationChanges => rc }
     assert(relationChanges.size == 1)
-    assert(relationChanges.head.changelogInfo.deduplicationMode() ==
-      ChangelogInfo.DeduplicationMode.NONE)
-    assert(relationChanges.head.changelogInfo.computeUpdates())
+    assert(relationChanges.head.changelogContext.deduplicationMode() ==
+      ChangelogContext.DeduplicationMode.NONE)
+    assert(relationChanges.head.changelogContext.computeUpdates())
   }
 
   test("STREAM t CHANGES - error: subquery in timestamp") {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala
index 2b19bac8d62e..0c1def1ac55c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala
@@ -40,10 +40,13 @@ class InMemoryChangelogCatalog extends InMemoryCatalog {
   private val changeData: mutable.Map[String, 
mutable.ArrayBuffer[InternalRow]] =
     mutable.Map.empty
 
-  // Stores the most recent ChangelogInfo passed to loadChangelog(), so tests 
can verify
-  // that the parser/DataFrame API correctly constructed and forwarded it.
-  private var _lastChangelogInfo: Option[ChangelogInfo] = None
-  def lastChangelogInfo: Option[ChangelogInfo] = _lastChangelogInfo
+  // Stores the most recent ChangelogContext and options passed to 
loadChangelog(), so tests
+  // can verify that the parser/DataFrame API correctly constructed and 
forwarded them.
+  private var _lastChangelogContext: Option[ChangelogContext] = None
+  def lastChangelogContext: Option[ChangelogContext] = _lastChangelogContext
+
+  private var _lastOptions: Option[CaseInsensitiveStringMap] = None
+  def lastOptions: Option[CaseInsensitiveStringMap] = _lastOptions
 
   // Per-table overrides for Changelog properties (carry-over rows, 
intermediate changes,
   // update representation, row identity). Tests can set these to exercise 
post-processing.
@@ -63,8 +66,10 @@ class InMemoryChangelogCatalog extends InMemoryCatalog {
 
   override def loadChangelog(
       ident: Identifier,
-      changelogInfo: ChangelogInfo): Changelog = {
-    _lastChangelogInfo = Some(changelogInfo)
+      changelogContext: ChangelogContext,
+      options: CaseInsensitiveStringMap): Changelog = {
+    _lastChangelogContext = Some(changelogContext)
+    _lastOptions = Some(options)
     if (!tableExists(ident)) {
       throw new NoSuchTableException(ident.asMultipartIdentifier)
     }
@@ -74,7 +79,7 @@ class InMemoryChangelogCatalog extends InMemoryCatalog {
     val numDataCols = table.columns.length
     // _commit_version is at index numDataCols + 1 (after _change_type)
     val commitVersionIdx = numDataCols + 1
-    val filtered = filterByRange(allRows.toSeq, commitVersionIdx, 
changelogInfo.range())
+    val filtered = filterByRange(allRows.toSeq, commitVersionIdx, 
changelogContext.range())
     val props = changelogProperties.getOrElse(ident.toString, 
ChangelogProperties())
     new InMemoryChangelog(
       table.name + "_changelog", table.columns, filtered, props)
diff --git 
a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain
 
b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain
index 9666c4ac76ae..6a1afa73f7cd 100644
--- 
a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain
+++ 
b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain
@@ -1,2 +1,2 @@
 ~SubqueryAlias spark_catalog.tempdb.myStreamingTable
-+- ~StreamingRelationV2 spark_catalog.tempdb.myStreamingTable_changelog, 
ChangelogTable(org.apache.spark.sql.connector.catalog.InMemoryChangelog,ChangelogInfo{range=VersionRange[startingVersion=1,
 endingVersion=Optional.empty, startingBoundInclusive=true, 
endingBoundInclusive=true], deduplicationMode=DROP_CARRYOVERS, 
computeUpdates=false},true), [startingVersion=1, 
deduplicationMode=dropCarryovers], [id#0L, _change_type#0, _commit_version#0L, 
_commit_timestamp#0], org.apache.spark.sql.co [...]
++- ~StreamingRelationV2 spark_catalog.tempdb.myStreamingTable_changelog, 
ChangelogTable(org.apache.spark.sql.connector.catalog.InMemoryChangelog,ChangelogContext{range=VersionRange[startingVersion=1,
 endingVersion=Optional.empty, startingBoundInclusive=true, 
endingBoundInclusive=true], deduplicationMode=DROP_CARRYOVERS, 
computeUpdates=false},true), [startingVersion=1, 
deduplicationMode=dropCarryovers], [id#0L, _change_type#0, _commit_version#0L, 
_commit_timestamp#0], org.apache.spark.sql [...]
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index dff80cb24268..db78dc1744ec 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -43,7 +43,7 @@ import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, 
SESSION_ID}
 import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, 
TaskResourceProfile, TaskResourceRequest}
 import org.apache.spark.sql.{AnalysisException, Column, Encoders, 
ForeachWriter, Row}
 import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier, 
FunctionIdentifier, InternalRow, QueryPlanningTracker}
-import org.apache.spark.sql.catalyst.analysis.{ChangelogInfoUtils, 
FunctionRegistry, GlobalTempView, LocalTempView, MultiAlias, RelationChanges, 
UnresolvedAlias, UnresolvedAttribute, UnresolvedDataFrameStar, 
UnresolvedDeserializer, UnresolvedExtractValue, UnresolvedFunction, 
UnresolvedOrdinal, UnresolvedPlanId, UnresolvedRegex, UnresolvedRelation, 
UnresolvedStar, UnresolvedStarWithColumns, UnresolvedStarWithColumnsRenames, 
UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction, U [...]
+import org.apache.spark.sql.catalyst.analysis.{ChangelogContextUtils, 
FunctionRegistry, GlobalTempView, LocalTempView, MultiAlias, RelationChanges, 
UnresolvedAlias, UnresolvedAttribute, UnresolvedDataFrameStar, 
UnresolvedDeserializer, UnresolvedExtractValue, UnresolvedFunction, 
UnresolvedOrdinal, UnresolvedPlanId, UnresolvedRegex, UnresolvedRelation, 
UnresolvedStar, UnresolvedStarWithColumns, UnresolvedStarWithColumnsRenames, 
UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction [...]
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, AgnosticEncoder, 
ExpressionEncoder, RowEncoder}
 import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder, 
RowEncoder => AgnosticRowEncoder, StringEncoder, UnboundRowEncoder}
 import org.apache.spark.sql.catalyst.expressions._
@@ -1741,10 +1741,10 @@ class SparkConnectPlanner(
     val tableName = rel.getUnparsedIdentifier
     val options = new CaseInsensitiveStringMap(rel.getOptionsMap)
     val timeZone = session.sessionState.conf.sessionLocalTimeZone
-    val changelogInfo = ChangelogInfoUtils.fromOptions(options, timeZone)
+    val ctx = ChangelogContextUtils.fromOptions(options, timeZone)
     val ident = parser.parseMultipartIdentifier(tableName)
     val relation = UnresolvedRelation(ident, options, isStreaming = 
rel.getIsStreaming)
-    RelationChanges(relation, changelogInfo)
+    RelationChanges(relation, ctx)
   }
 
   private def transformParse(rel: proto.Parse): LogicalPlan = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala
index d0d6bf1e8ec0..3dbdf0530516 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.catalyst.DataSourceOptions
 import org.apache.spark.sql.catalyst.analysis.{RelationChanges, 
UnresolvedRelation}
-import org.apache.spark.sql.catalyst.analysis.ChangelogInfoUtils
+import org.apache.spark.sql.catalyst.analysis.ChangelogContextUtils
 import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, 
UnivocityParser}
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JSONOptions}
@@ -328,10 +328,10 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession)
     val multipartIdentifier =
       sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
     val options = new CaseInsensitiveStringMap(extraOptions.toMap.asJava)
-    val changelogInfo = ChangelogInfoUtils.fromOptions(
+    val changelogContext = ChangelogContextUtils.fromOptions(
       options, sparkSession.sessionState.conf.sessionLocalTimeZone)
     val relation = UnresolvedRelation(multipartIdentifier, options)
-    Dataset.ofRows(sparkSession, RelationChanges(relation, changelogInfo))
+    Dataset.ofRows(sparkSession, RelationChanges(relation, changelogContext))
   }
 
   /** @inheritdoc */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
index a3ab235372d8..eb3120cac05a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.classic
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.annotation.{Evolving, Experimental}
-import org.apache.spark.sql.catalyst.analysis.{ChangelogInfoUtils, 
NamedStreamingRelation, RelationChanges, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{ChangelogContextUtils, 
NamedStreamingRelation, RelationChanges, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.plans.logical.UnresolvedDataSource
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
 import org.apache.spark.sql.classic.ClassicConversions._
@@ -117,10 +117,10 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession)
     assertNoSpecifiedSchema("changes")
     val identifier = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
     val options = new CaseInsensitiveStringMap(extraOptions.toMap.asJava)
-    val changelogInfo = ChangelogInfoUtils.fromOptions(
+    val changelogContext = ChangelogContextUtils.fromOptions(
       options, sparkSession.sessionState.conf.sessionLocalTimeZone)
     val unresolved = UnresolvedRelation(identifier, options, isStreaming = 
true)
-    val changes = RelationChanges(unresolved, changelogInfo)
+    val changes = RelationChanges(unresolved, changelogContext)
     val plan = NamedStreamingRelation.withUserProvidedName(changes, 
userProvidedSourceName)
     Dataset.ofRows(sparkSession, plan)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala
index 3a7281f94d88..bb40cd9874d2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala
@@ -389,14 +389,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
 
     // DataFrame API
     spark.read.option("startingVersion", "1").changes(fullTableName).collect()
-    val info1 = catalog.lastChangelogInfo.get
-    assert(info1.deduplicationMode() === 
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+    val info1 = catalog.lastChangelogContext.get
+    assert(info1.deduplicationMode() === 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
     assert(info1.computeUpdates() === false)
 
     // SQL (no WITH clause = defaults)
     sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1").collect()
-    val info2 = catalog.lastChangelogInfo.get
-    assert(info2.deduplicationMode() === 
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+    val info2 = catalog.lastChangelogContext.get
+    assert(info2.deduplicationMode() === 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
     assert(info2.computeUpdates() === false)
   }
 
@@ -410,14 +410,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
       .option("deduplicationMode", "none")
       .changes(fullTableName)
       .collect()
-    assert(catalog.lastChangelogInfo.get.deduplicationMode() ===
-      ChangelogInfo.DeduplicationMode.NONE)
+    assert(catalog.lastChangelogContext.get.deduplicationMode() ===
+      ChangelogContext.DeduplicationMode.NONE)
 
     // SQL
     sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1 " +
       "WITH (deduplicationMode = 'none')").collect()
-    assert(catalog.lastChangelogInfo.get.deduplicationMode() ===
-      ChangelogInfo.DeduplicationMode.NONE)
+    assert(catalog.lastChangelogContext.get.deduplicationMode() ===
+      ChangelogContext.DeduplicationMode.NONE)
   }
 
   test("changes() passes computeUpdates to catalog") {
@@ -430,12 +430,12 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
       .option("computeUpdates", "true")
       .changes(fullTableName)
       .collect()
-    assert(catalog.lastChangelogInfo.get.computeUpdates() === true)
+    assert(catalog.lastChangelogContext.get.computeUpdates() === true)
 
     // SQL
     sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1 " +
       "WITH (computeUpdates = 'true')").collect()
-    assert(catalog.lastChangelogInfo.get.computeUpdates() === true)
+    assert(catalog.lastChangelogContext.get.computeUpdates() === true)
   }
 
   // ---------- Batch: timestamp range ----------
@@ -450,14 +450,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
       .option("endingTimestamp", "2024-12-31 23:59:59")
       .changes(fullTableName)
       .collect()
-    assert(catalog.lastChangelogInfo.get.range()
+    assert(catalog.lastChangelogContext.get.range()
       .isInstanceOf[ChangelogRange.TimestampRange])
 
     // SQL
     sql(s"SELECT * FROM $fullTableName " +
       "CHANGES FROM TIMESTAMP '2024-01-01 00:00:00' " +
       "TO TIMESTAMP '2024-12-31 23:59:59'").collect()
-    assert(catalog.lastChangelogInfo.get.range()
+    assert(catalog.lastChangelogContext.get.range()
       .isInstanceOf[ChangelogRange.TimestampRange])
   }
 
@@ -599,7 +599,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
       .format("memory").queryName("cdc_stream_opts_df").start()
     try {
       q1.processAllAvailable()
-      assert(catalog.lastChangelogInfo.get.computeUpdates() === true)
+      assert(catalog.lastChangelogContext.get.computeUpdates() === true)
     } finally {
       q1.stop()
     }
@@ -612,7 +612,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
       .format("memory").queryName("cdc_stream_opts_sql").start()
     try {
       q2.processAllAvailable()
-      assert(catalog.lastChangelogInfo.get.computeUpdates() === true)
+      assert(catalog.lastChangelogContext.get.computeUpdates() === true)
     } finally {
       q2.stop()
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala
index 0d8f573cc448..082be2ac22c8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala
@@ -196,18 +196,77 @@ class ChangelogResolutionSuite extends SharedSparkSession 
{
       parameters = Map("relationId" -> "`x`"))
   }
 
-  test("CHANGES clause passes changelogInfo to catalog") {
+  test("CHANGES clause passes changelogContext to catalog") {
     sql(s"SELECT * FROM $cdcCatalogName.test_table CHANGES FROM VERSION 1 TO 
VERSION 5")
     val cat = spark.sessionState.catalogManager
       .catalog(cdcCatalogName)
       .asInstanceOf[InMemoryChangelogCatalog]
-    val info = cat.lastChangelogInfo
+    val info = cat.lastChangelogContext
     assert(info.isDefined)
     val range = info.get.range().asInstanceOf[ChangelogRange.VersionRange]
     assert(range.startingVersion() == "1")
     assert(range.endingVersion().get() == "5")
   }
 
+  test("user-defined options are forwarded to loadChangelog") {
+    val cat = spark.sessionState.catalogManager
+      .catalog(cdcCatalogName)
+      .asInstanceOf[InMemoryChangelogCatalog]
+
+    spark.read
+      .option("startingVersion", "1")
+      .option("customOption", "customValue")
+      .changes(s"$cdcCatalogName.test_table")
+
+    val opts = cat.lastOptions
+    assert(opts.isDefined)
+    assert(opts.get.get("customOption") == "customValue")
+    assert(opts.get.get("startingVersion") == "1")
+  }
+
+  test("user-defined options are forwarded to loadChangelog - SQL WITH 
clause") {
+    val cat = spark.sessionState.catalogManager
+      .catalog(cdcCatalogName)
+      .asInstanceOf[InMemoryChangelogCatalog]
+
+    sql(s"SELECT * FROM $cdcCatalogName.test_table CHANGES FROM VERSION 1 " +
+      "WITH ('customOption' = 'customValue')").queryExecution.analyzed
+
+    val opts = cat.lastOptions
+    assert(opts.isDefined)
+    assert(opts.get.get("customOption") == "customValue")
+  }
+
+  test("user-defined options are forwarded to loadChangelog - 
DataStreamReader") {
+    val cat = spark.sessionState.catalogManager
+      .catalog(cdcCatalogName)
+      .asInstanceOf[InMemoryChangelogCatalog]
+
+    spark.readStream
+      .option("startingVersion", "1")
+      .option("customOption", "customValue")
+      .changes(s"$cdcCatalogName.test_table")
+      .queryExecution.analyzed
+
+    val opts = cat.lastOptions
+    assert(opts.isDefined)
+    assert(opts.get.get("customOption") == "customValue")
+    assert(opts.get.get("startingVersion") == "1")
+  }
+
+  test("user-defined options are forwarded to loadChangelog - streaming SQL") {
+    val cat = spark.sessionState.catalogManager
+      .catalog(cdcCatalogName)
+      .asInstanceOf[InMemoryChangelogCatalog]
+
+    sql(s"SELECT * FROM STREAM $cdcCatalogName.test_table CHANGES FROM VERSION 
1 " +
+      "WITH ('customOption' = 'customValue')").queryExecution.analyzed
+
+    val opts = cat.lastOptions
+    assert(opts.isDefined)
+    assert(opts.get.get("customOption") == "customValue")
+  }
+
   // 
===========================================================================
   // Streaming post-processing
   // 
===========================================================================
@@ -306,9 +365,9 @@ class ChangelogResolutionSuite extends SharedSparkSession {
   // Generic changelog schema validation
   // 
===========================================================================
 
-  private def stubInfo(): ChangelogInfo = new ChangelogInfo(
+  private def stubInfo(): ChangelogContext = new ChangelogContext(
     new ChangelogRange.VersionRange("1", java.util.Optional.of("2"), true, 
true),
-    ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS,
+    ChangelogContext.DeduplicationMode.DROP_CARRYOVERS,
     false)
 
   private def cl(name: String, cols: (String, 
org.apache.spark.sql.types.DataType)*)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to