This is an automated email from the ASF dual-hosted git repository.
gengliangwang pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new a50400ba70ff [SPARK-56961][SQL] Pass all options while loading
changelog
a50400ba70ff is described below
commit a50400ba70ff040aa964136bf6c9afb1e1eb0a2f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sat May 23 10:35:20 2026 -0700
[SPARK-56961][SQL] Pass all options while loading changelog
### What changes were proposed in this pull request?
This PR passes all specified options while loading changelogs.
### Why are the changes needed?
These changes are needed to make the API usable in connectors like Iceberg
and Delta.
### Does this PR introduce _any_ user-facing change?
The functionality hasn't been released yet.
### How was this patch tested?
This PR comes with tests.
### Was this patch authored or co-authored using generative AI tooling?
Claude Code v2.1.145.
Closes #56044 from aokolnychyi/spark-56961.
Lead-authored-by: Anton Okolnychyi <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit da6e110231beea1fa1bd0d259c2b49c7ea4d5085)
Signed-off-by: Gengliang Wang <[email protected]>
---
.../{ChangelogInfo.java => ChangelogContext.java} | 12 ++--
.../spark/sql/connector/catalog/TableCatalog.java | 16 ++++--
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +-
...InfoUtils.scala => ChangelogContextUtils.scala} | 18 +++---
.../sql/catalyst/analysis/RelationChanges.scala | 6 +-
.../sql/catalyst/analysis/RelationResolution.scala | 10 ++--
.../catalyst/analysis/ResolveChangelogTable.scala | 22 +++----
.../spark/sql/catalyst/parser/AstBuilder.scala | 35 ++++++-----
.../execution/datasources/v2/ChangelogTable.scala | 4 +-
...uite.scala => ChangelogContextUtilsSuite.scala} | 50 ++++++++--------
.../sql/catalyst/parser/PlanParserSuite.scala | 40 ++++++-------
.../parser/StreamRelationParserSuite.scala | 23 ++++----
.../catalog/InMemoryChangelogCatalog.scala | 19 +++---
.../streaming_changes_API_with_options.explain | 2 +-
.../sql/connect/planner/SparkConnectPlanner.scala | 6 +-
.../apache/spark/sql/classic/DataFrameReader.scala | 6 +-
.../spark/sql/classic/DataStreamReader.scala | 6 +-
.../sql/connector/ChangelogEndToEndSuite.scala | 28 ++++-----
.../sql/connector/ChangelogResolutionSuite.scala | 67 ++++++++++++++++++++--
19 files changed, 221 insertions(+), 153 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
similarity index 89%
rename from
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java
rename to
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
index 04a6d055f56b..d6ea072b9634 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
@@ -20,16 +20,18 @@ package org.apache.spark.sql.connector.catalog;
import java.util.Objects;
import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* Encapsulates the parameters of a Change Data Capture (CDC) query, passed
from the
* parser / DataFrame API to the catalog's
- * {@link TableCatalog#loadChangelog(Identifier, ChangelogInfo)} method.
+ * {@link TableCatalog#loadChangelog(Identifier, ChangelogContext,
CaseInsensitiveStringMap)}
+ * method.
*
* @since 4.2.0
*/
@Evolving
-public class ChangelogInfo {
+public class ChangelogContext {
/**
* Deduplication modes controlling how Spark post-processes raw change data.
@@ -47,7 +49,7 @@ public class ChangelogInfo {
private final DeduplicationMode deduplicationMode;
private final boolean computeUpdates;
- public ChangelogInfo(
+ public ChangelogContext(
ChangelogRange range,
DeduplicationMode deduplicationMode,
boolean computeUpdates) {
@@ -68,7 +70,7 @@ public class ChangelogInfo {
@Override
public boolean equals(Object o) {
if (this == o) return true;
- if (!(o instanceof ChangelogInfo that)) return false;
+ if (!(o instanceof ChangelogContext that)) return false;
return computeUpdates == that.computeUpdates
&& Objects.equals(range, that.range)
&& deduplicationMode == that.deduplicationMode;
@@ -81,7 +83,7 @@ public class ChangelogInfo {
@Override
public String toString() {
- return "ChangelogInfo{range=" + range +
+ return "ChangelogContext{range=" + range +
", deduplicationMode=" + deduplicationMode +
", computeUpdates=" + computeUpdates + "}";
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index a6f51342aef5..f64c34ee0e07 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -25,6 +25,7 @@ import
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.errors.QueryCompilationErrors;
import org.apache.spark.sql.errors.QueryExecutionErrors;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.ArrayList;
import java.util.Map;
@@ -195,22 +196,25 @@ public interface TableCatalog extends CatalogPlugin {
/**
* Load a {@link Changelog} for the given table, representing the row-level
changes within the
- * range specified by {@code changelogInfo}.
+ * range specified by {@code context}.
* <p>
* The default implementation throws an analysis exception indicating that
the catalog does
* not support CDC. Catalogs that support CDC must override this method.
*
* @param ident a table identifier
- * @param changelogInfo the CDC query parameters (range, deduplication mode,
etc.)
+ * @param context the CDC query context (range, deduplication mode, etc.)
+ * @param options all options passed to the changelog query, including the
CDC-recognized
+ * keys (range, deduplication mode, etc.) that are also
parsed into {@code context}
* @return a Changelog instance for the requested table and range
* @throws NoSuchTableException If the table doesn't exist
*
* @since 4.2.0
*/
- default Changelog loadChangelog(Identifier ident, ChangelogInfo
changelogInfo)
- throws NoSuchTableException {
- throw new UnsupportedOperationException(
- name() + " does not support Change Data Capture (CDC)");
+ default Changelog loadChangelog(
+ Identifier ident,
+ ChangelogContext context,
+ CaseInsensitiveStringMap options) throws NoSuchTableException {
+ throw new UnsupportedOperationException(name() + " does not support Change
Data Capture (CDC)");
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e94ee9ca0c82..a6137e9a6c8f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1142,8 +1142,8 @@ class Analyzer(
val timeTravelSpec = TimeTravelSpec.create(timestamp, version,
conf.sessionLocalTimeZone)
resolveRelation(u, timeTravelSpec).getOrElse(r)
- case r @ RelationChanges(u: UnresolvedRelation, changelogInfo) =>
- relationResolution.resolveChangelog(u, changelogInfo).getOrElse(r)
+ case r @ RelationChanges(u: UnresolvedRelation, ctx) =>
+ relationResolution.resolveChangelog(u, ctx).getOrElse(r)
case u @ UnresolvedTable(identifier, cmd, suggestAlternative) =>
lookupTableOrView(identifier).map {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
similarity index 87%
rename from
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala
rename to
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
index fb7ae01843d6..2b679d955f52 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
@@ -21,16 +21,16 @@ import java.lang.{Long => JLong}
import java.util.{Locale, Optional => JOptional}
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
-import org.apache.spark.sql.connector.catalog.ChangelogInfo
+import org.apache.spark.sql.connector.catalog.ChangelogContext
import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange,
UnboundedRange, VersionRange}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
- * Utility methods for constructing [[ChangelogInfo]] from DataFrame API
options.
+ * Utility methods for constructing [[ChangelogContext]] from DataFrame API
options.
*/
-object ChangelogInfoUtils {
+object ChangelogContextUtils {
private val STARTING_VERSION = "startingVersion"
private val ENDING_VERSION = "endingVersion"
@@ -42,12 +42,12 @@ object ChangelogInfoUtils {
private val COMPUTE_UPDATES = "computeUpdates"
/**
- * Build a [[ChangelogInfo]] from the options specified via `.option()`
calls on
+ * Build a [[ChangelogContext]] from the options specified via `.option()`
calls on
* `DataFrameReader` or `DataStreamReader`.
*/
def fromOptions(
options: CaseInsensitiveStringMap,
- sessionLocalTimeZone: String): ChangelogInfo = {
+ sessionLocalTimeZone: String): ChangelogContext = {
val startVersion = Option(options.get(STARTING_VERSION))
val endVersion = Option(options.get(ENDING_VERSION))
val startTimestamp = Option(options.get(STARTING_TIMESTAMP))
@@ -59,9 +59,9 @@ object ChangelogInfoUtils {
val deduplicationModeStr = Option(options.get(DEDUPLICATION_MODE))
.getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
val deduplicationMode = deduplicationModeStr match {
- case "none" => ChangelogInfo.DeduplicationMode.NONE
- case "dropcarryovers" => ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS
- case "netchanges" => ChangelogInfo.DeduplicationMode.NET_CHANGES
+ case "none" => ChangelogContext.DeduplicationMode.NONE
+ case "dropcarryovers" =>
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
+ case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
case other =>
throw
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
}
@@ -98,7 +98,7 @@ object ChangelogInfoUtils {
new UnboundedRange()
}
- new ChangelogInfo(range, deduplicationMode, computeUpdates)
+ new ChangelogContext(range, deduplicationMode, computeUpdates)
}
private def parseTimestamp(timestampStr: String, sessionLocalTimeZone:
String): Long = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala
index 2b4ba58d1745..84f82ffc1f2a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreePattern.{RELATION_CHANGES,
TreePattern}
-import org.apache.spark.sql.connector.catalog.ChangelogInfo
+import org.apache.spark.sql.connector.catalog.ChangelogContext
/**
* A logical node used to query Change Data Capture (CDC) changes for a table
relation.
@@ -33,10 +33,10 @@ import org.apache.spark.sql.connector.catalog.ChangelogInfo
* [[UnresolvedLeafNode]]). Tree traversals like `transformUp` will not visit
`relation`.
*
* @param relation the table relation (typically an [[UnresolvedRelation]])
- * @param changelogInfo the CDC query parameters (range, deduplication mode,
etc.)
+ * @param changelogContext the CDC query context (range, deduplication mode,
etc.)
*/
case class RelationChanges(
relation: LogicalPlan,
- changelogInfo: ChangelogInfo) extends UnresolvedLeafNode {
+ changelogContext: ChangelogContext) extends UnresolvedLeafNode {
override val nodePatterns: Seq[TreePattern] = Seq(RELATION_CHANGES)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
index 2a3ed248aa6d..462971eac066 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.{
CatalogManager,
CatalogPlugin,
CatalogV2Util,
- ChangelogInfo,
+ ChangelogContext,
Identifier,
LookupCatalog,
MetadataTable,
@@ -330,19 +330,17 @@ class RelationResolution(
* Resolve a CDC (CHANGES) query: look up the catalog, call loadChangelog(),
wrap in
* ChangelogTable, and return a DataSourceV2Relation.
*/
- def resolveChangelog(
- u: UnresolvedRelation,
- changelogInfo: ChangelogInfo): Option[LogicalPlan] = {
+ def resolveChangelog(u: UnresolvedRelation, ctx: ChangelogContext):
Option[LogicalPlan] = {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
val tableCatalog = catalog.asTableCatalog
val changelog = try {
- tableCatalog.loadChangelog(ident, changelogInfo)
+ tableCatalog.loadChangelog(ident, ctx, u.options)
} catch {
case _: UnsupportedOperationException =>
throw
QueryCompilationErrors.cdcNotSupportedError(tableCatalog.name())
}
- val changelogTable = ChangelogTable(changelog, changelogInfo)
+ val changelogTable = ChangelogTable(changelog, ctx)
val relation = if (u.isStreaming) {
StreamingRelationV2(
None, changelogTable.name, changelogTable, u.options,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala
index 7db9f6fa405b..ed9333d3a27a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala
@@ -35,7 +35,7 @@ import
org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
-import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo}
+import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogContext}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable,
DataSourceV2Relation}
import org.apache.spark.sql.streaming.{OutputMode, StatefulProcessor}
@@ -123,7 +123,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) if
!table.resolved =>
val changelog = table.changelog
- val req = evaluateRequirements(changelog, table.changelogInfo)
+ val req = evaluateRequirements(changelog, table.changelogContext)
val resolvedRel = rel.copy(table = table.copy(resolved = true))
var updatedRel: LogicalPlan = resolvedRel
@@ -140,14 +140,14 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
val rowIdExprs =
V2ExpressionUtils.resolveRefs[NamedExpression](changelog.rowId().toSeq,
resolvedRel)
updatedRel = injectNetChangeComputation(
- updatedRel, rowIdExprs, table.changelogInfo.computeUpdates())
+ updatedRel, rowIdExprs, table.changelogContext.computeUpdates())
}
updatedRel
case rel @ StreamingRelationV2(_, _, table: ChangelogTable, _, _, _, _, _,
_)
if !table.resolved =>
val changelog = table.changelog
- val req = evaluateRequirements(changelog, table.changelogInfo)
+ val req = evaluateRequirements(changelog, table.changelogContext)
val resolvedRel = rel.copy(table = table.copy(resolved = true))
var updatedRel: LogicalPlan = resolvedRel
if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) {
@@ -164,7 +164,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
// output, so name-based resolution against `updatedRel` recovers the
right
// attributes regardless of any preceding wrapping.
updatedRel = addStreamingNetChangeComputation(
- updatedRel, changelog, table.changelogInfo.computeUpdates())
+ updatedRel, changelog, table.changelogContext.computeUpdates())
}
updatedRel
}
@@ -175,7 +175,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
/**
* Captures which post-processing passes a CDC query requires, derived from
the
- * user-provided [[ChangelogInfo]] options and the connector-declared
[[Changelog]]
+ * user-provided [[ChangelogContext]] and the connector-declared
[[Changelog]]
* capability flags.
*/
private case class PostProcessingRequirements(
@@ -194,14 +194,14 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
*/
private def evaluateRequirements(
changelog: Changelog,
- options: ChangelogInfo): PostProcessingRequirements = {
+ context: ChangelogContext): PostProcessingRequirements = {
val requiresCarryOverRemoval =
- options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE &&
+ context.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE &&
changelog.containsCarryoverRows()
val requiresUpdateDetection =
- options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
+ context.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
val requiresNetChanges =
- options.deduplicationMode() ==
ChangelogInfo.DeduplicationMode.NET_CHANGES &&
+ context.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NET_CHANGES &&
changelog.containsIntermediateChanges()
// If carry-overs are surfaced and update detection is enabled without
carry-over
@@ -209,7 +209,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
// results. Hence we throw.
if (requiresUpdateDetection &&
changelog.containsCarryoverRows() &&
- options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) {
+ context.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NONE) {
throw QueryCompilationErrors.cdcUpdateDetectionRequiresCarryOverRemoval(
changelog.name())
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index cdb01c36d744..c5eb458e2580 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -47,7 +47,7 @@ import
org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory,
DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate,
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate,
stringToTime, stringToTimestamp, stringToTimestampWithoutTimeZone}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo,
PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
ChangelogContext, PathElement, SupportsNamespaces, TableCatalog,
TableWritePrivilege}
import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange,
UnboundedRange, VersionRange}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference,
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform,
YearsTransform}
@@ -2635,17 +2635,17 @@ class AstBuilder extends DataTypeAstBuilder
withOrigin(ctx) {
val relation = createUnresolvedRelation(ctx.identifierReference,
Option(ctx.optionsClause))
val options = resolveOptions(Option(ctx.optionsClause))
- val changelogInfo = buildChangelogInfo(ctx.changesClause, options)
- val result = RelationChanges(relation, changelogInfo)
+ val changelogContext = buildChangelogContext(ctx.changesClause, options)
+ val result = RelationChanges(relation, changelogContext)
mayApplyAliasPlan(ctx.tableAlias, result)
}
/**
- * Build a [[ChangelogInfo]] from a batch changesClause context and optional
WITH options.
+ * Build a [[ChangelogContext]] from a batch changesClause context and
optional WITH options.
*/
- private def buildChangelogInfo(
+ private def buildChangelogContext(
ctx: ChangesClauseContext,
- options: CaseInsensitiveStringMap): ChangelogInfo = {
+ options: CaseInsensitiveStringMap): ChangelogContext = {
val startExclusive = ctx.startExclusive != null
val endExclusive = ctx.endExclusive != null
val startInclusive = !startExclusive
@@ -2690,16 +2690,16 @@ class AstBuilder extends DataTypeAstBuilder
}
val (deduplicationMode, computeUpdates) = resolveChangelogOptions(options)
- new ChangelogInfo(range, deduplicationMode, computeUpdates)
+ new ChangelogContext(range, deduplicationMode, computeUpdates)
}
/**
- * Build a [[ChangelogInfo]] from a streaming streamChangesClause context
and optional
+ * Build a [[ChangelogContext]] from a streaming streamChangesClause context
and optional
* WITH options.
*/
- private def buildStreamChangelogInfo(
+ private def buildStreamChangelogContext(
ctx: StreamChangesClauseContext,
- options: CaseInsensitiveStringMap): ChangelogInfo = {
+ options: CaseInsensitiveStringMap): ChangelogContext = {
val startExclusive = ctx.startExclusive != null
val startInclusive = !startExclusive
@@ -2730,7 +2730,7 @@ class AstBuilder extends DataTypeAstBuilder
}
val (deduplicationMode, computeUpdates) = resolveChangelogOptions(options)
- new ChangelogInfo(range, deduplicationMode, computeUpdates)
+ new ChangelogContext(range, deduplicationMode, computeUpdates)
}
/**
@@ -2738,14 +2738,13 @@ class AstBuilder extends DataTypeAstBuilder
* Defaults: DROP_CARRYOVERS for deduplicationMode, false for computeUpdates.
*/
private def resolveChangelogOptions(
- options: CaseInsensitiveStringMap)
- : (ChangelogInfo.DeduplicationMode, Boolean) = {
+ options: CaseInsensitiveStringMap): (ChangelogContext.DeduplicationMode,
Boolean) = {
val deduplicationModeStr = Option(options.get("deduplicationMode"))
.getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
val deduplicationMode = deduplicationModeStr match {
- case "none" => ChangelogInfo.DeduplicationMode.NONE
- case "dropcarryovers" => ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS
- case "netchanges" => ChangelogInfo.DeduplicationMode.NET_CHANGES
+ case "none" => ChangelogContext.DeduplicationMode.NONE
+ case "dropcarryovers" =>
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
+ case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
case other =>
throw
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
}
@@ -2911,8 +2910,8 @@ class AstBuilder extends DataTypeAstBuilder
case Some(changesCtx) =>
// Streaming CDC: wrap in RelationChanges and NamedStreamingRelation
val options = resolveOptions(Option(ctx.optionsClause))
- val changelogInfo = buildStreamChangelogInfo(changesCtx, options)
- val result = RelationChanges(relation, changelogInfo)
+ val changelogContext = buildStreamChangelogContext(changesCtx, options)
+ val result = RelationChanges(relation, changelogContext)
val table = mayApplyAliasPlan(ctx.tableAlias, result)
val tableWithWatermark =
table.optionalMap(ctx.watermarkClause)(withWatermark)
val sourceNameOpt = extractSourceName(ctx.identifiedByClause)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala
index 0a341fe90687..ec45f1f37317 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
import java.util.{EnumSet => JEnumSet, Set => JSet}
-import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo,
Column, SupportsRead, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogContext,
Column, SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ,
MICRO_BATCH_READ}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -35,7 +35,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
*/
case class ChangelogTable(
changelog: Changelog,
- changelogInfo: ChangelogInfo,
+ changelogContext: ChangelogContext,
resolved: Boolean = false) extends Table with SupportsRead {
// Validate that the connector returned a schema with the required CDC
metadata columns
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtilsSuite.scala
similarity index 80%
rename from
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala
rename to
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtilsSuite.scala
index 312754fa24dd..93bab0009d67 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtilsSuite.scala
@@ -22,11 +22,11 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.SQLHelper
-import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange}
+import org.apache.spark.sql.connector.catalog.{ChangelogContext,
ChangelogRange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper {
+class ChangelogContextUtilsSuite extends SparkFunSuite with SQLHelper {
private val testTimeZone = "UTC"
@@ -35,7 +35,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
}
test("version range with both start and end") {
- val info = ChangelogInfoUtils.fromOptions(
+ val info = ChangelogContextUtils.fromOptions(
makeOptions("startingVersion" -> "1", "endingVersion" -> "5"),
testTimeZone)
val range = info.range().asInstanceOf[ChangelogRange.VersionRange]
assert(range.startingVersion() == "1")
@@ -45,7 +45,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
}
test("version range with only start") {
- val info = ChangelogInfoUtils.fromOptions(
+ val info = ChangelogContextUtils.fromOptions(
makeOptions("startingVersion" -> "10"), testTimeZone)
val range = info.range().asInstanceOf[ChangelogRange.VersionRange]
assert(range.startingVersion() == "10")
@@ -55,14 +55,14 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
test("version range - endingVersion without startingVersion throws") {
checkError(
intercept[AnalysisException] {
- ChangelogInfoUtils.fromOptions(
+ ChangelogContextUtils.fromOptions(
makeOptions("endingVersion" -> "5"), testTimeZone)
},
condition = "INVALID_CDC_OPTION.MISSING_STARTING_VERSION")
}
test("timestamp range with both start and end") {
- val info = ChangelogInfoUtils.fromOptions(
+ val info = ChangelogContextUtils.fromOptions(
makeOptions("startingTimestamp" -> "2026-01-01", "endingTimestamp" ->
"2026-02-01"),
testTimeZone)
val range = info.range().asInstanceOf[ChangelogRange.TimestampRange]
@@ -72,7 +72,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
}
test("timestamp range with only start") {
- val info = ChangelogInfoUtils.fromOptions(
+ val info = ChangelogContextUtils.fromOptions(
makeOptions("startingTimestamp" -> "2026-01-01"), testTimeZone)
val range = info.range().asInstanceOf[ChangelogRange.TimestampRange]
assert(!range.endingTimestamp().isPresent)
@@ -81,7 +81,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
test("timestamp range - endingTimestamp without startingTimestamp throws") {
checkError(
intercept[AnalysisException] {
- ChangelogInfoUtils.fromOptions(
+ ChangelogContextUtils.fromOptions(
makeOptions("endingTimestamp" -> "2026-02-01"), testTimeZone)
},
condition = "INVALID_CDC_OPTION.MISSING_STARTING_TIMESTAMP")
@@ -90,7 +90,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
test("cannot mix version and timestamp range") {
checkError(
intercept[AnalysisException] {
- ChangelogInfoUtils.fromOptions(
+ ChangelogContextUtils.fromOptions(
makeOptions("startingVersion" -> "1", "startingTimestamp" ->
"2026-01-01"),
testTimeZone)
},
@@ -98,37 +98,37 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
}
test("unbounded range when no version or timestamp specified") {
- val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone)
+ val info = ChangelogContextUtils.fromOptions(makeOptions(), testTimeZone)
assert(info.range().isInstanceOf[ChangelogRange.UnboundedRange])
}
test("deduplication mode - none") {
- val info = ChangelogInfoUtils.fromOptions(
+ val info = ChangelogContextUtils.fromOptions(
makeOptions("deduplicationMode" -> "none"), testTimeZone)
- assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE)
+ assert(info.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE)
}
test("deduplication mode - dropCarryovers (default)") {
- val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone)
- assert(info.deduplicationMode() ==
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+ val info = ChangelogContextUtils.fromOptions(makeOptions(), testTimeZone)
+ assert(info.deduplicationMode() ==
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
}
test("deduplication mode - netChanges") {
- val info = ChangelogInfoUtils.fromOptions(
+ val info = ChangelogContextUtils.fromOptions(
makeOptions("deduplicationMode" -> "netChanges"), testTimeZone)
- assert(info.deduplicationMode() ==
ChangelogInfo.DeduplicationMode.NET_CHANGES)
+ assert(info.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NET_CHANGES)
}
test("deduplication mode - case insensitive") {
- val info = ChangelogInfoUtils.fromOptions(
+ val info = ChangelogContextUtils.fromOptions(
makeOptions("deduplicationMode" -> "DROPCARRYOVERS"), testTimeZone)
- assert(info.deduplicationMode() ==
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+ assert(info.deduplicationMode() ==
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
}
test("deduplication mode - invalid value throws") {
checkError(
intercept[AnalysisException] {
- ChangelogInfoUtils.fromOptions(
+ ChangelogContextUtils.fromOptions(
makeOptions("deduplicationMode" -> "invalid"), testTimeZone)
},
condition = "INVALID_CDC_OPTION.INVALID_DEDUPLICATION_MODE",
@@ -136,18 +136,18 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
}
test("computeUpdates option") {
- val info = ChangelogInfoUtils.fromOptions(
+ val info = ChangelogContextUtils.fromOptions(
makeOptions("computeUpdates" -> "true"), testTimeZone)
assert(info.computeUpdates())
}
test("computeUpdates defaults to false") {
- val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone)
+ val info = ChangelogContextUtils.fromOptions(makeOptions(), testTimeZone)
assert(!info.computeUpdates())
}
test("bound inclusivity options") {
- val info = ChangelogInfoUtils.fromOptions(
+ val info = ChangelogContextUtils.fromOptions(
makeOptions(
"startingVersion" -> "1",
"endingVersion" -> "5",
@@ -162,7 +162,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
test("invalid timestamp throws") {
checkError(
intercept[AnalysisException] {
- ChangelogInfoUtils.fromOptions(
+ ChangelogContextUtils.fromOptions(
makeOptions("startingTimestamp" -> "not-a-timestamp"), testTimeZone)
},
condition = "INVALID_CDC_OPTION.INVALID_TIMESTAMP",
@@ -177,7 +177,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
// = 2026-01-01 08:00:00 UTC = expectedUtcMicros + 8h
val expectedPstMicros = 1767254400000000L
- val utcInfo = ChangelogInfoUtils.fromOptions(
+ val utcInfo = ChangelogContextUtils.fromOptions(
makeOptions("startingTimestamp" -> tsStr), "UTC")
val utcRange =
utcInfo.range().asInstanceOf[ChangelogRange.TimestampRange]
@@ -185,7 +185,7 @@ class ChangelogInfoUtilsSuite extends SparkFunSuite with
SQLHelper {
withSQLConf(
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") {
- val laInfo = ChangelogInfoUtils.fromOptions(
+ val laInfo = ChangelogContextUtils.fromOptions(
makeOptions("startingTimestamp" -> tsStr),
SQLConf.get.sessionLocalTimeZone)
val laRange =
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 1ecb7fa539c2..b17634e0b56b 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{EvaluateUnresolvedInlineTable,
IntervalUtils}
-import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange}
+import org.apache.spark.sql.connector.catalog.{ChangelogContext,
ChangelogRange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType,
LongType, StringType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -2194,14 +2194,14 @@ class PlanParserSuite extends AnalysisTest {
endInclusive: Boolean = true): RelationChanges = {
RelationChanges(
UnresolvedRelation(Seq("a", "b", "c")),
- new ChangelogInfo(
+ new ChangelogContext(
new ChangelogRange.VersionRange(
startVersion,
endVersion.map(java.util.Optional.of[String])
.getOrElse(java.util.Optional.empty[String]),
startInclusive,
endInclusive),
- ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS,
+ ChangelogContext.DeduplicationMode.DROP_CARRYOVERS,
false))
}
@@ -2262,7 +2262,7 @@ class PlanParserSuite extends AnalysisTest {
case rc: RelationChanges => rc
case sa: SubqueryAlias => sa.child.asInstanceOf[RelationChanges]
}
- changes.changelogInfo.range().asInstanceOf[ChangelogRange.TimestampRange]
+
changes.changelogContext.range().asInstanceOf[ChangelogRange.TimestampRange]
}
// Basic timestamp range
@@ -2300,54 +2300,54 @@ class PlanParserSuite extends AnalysisTest {
}
test("CHANGES clause - with options") {
- def assertChangelogInfo(sql: String): ChangelogInfo = {
+ def assertChangelogContext(sql: String): ChangelogContext = {
val plan = parsePlan(sql)
val project = plan.asInstanceOf[Project]
val changes = project.child match {
case rc: RelationChanges => rc
case sa: SubqueryAlias => sa.child.asInstanceOf[RelationChanges]
}
- changes.changelogInfo
+ changes.changelogContext
}
// Default: DROP_CARRYOVERS and computeUpdates = false
- val info1 = assertChangelogInfo(
+ val info1 = assertChangelogContext(
"SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20")
- assert(info1.deduplicationMode() ==
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+ assert(info1.deduplicationMode() ==
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
assert(!info1.computeUpdates())
// deduplicationMode = none
- val info2 = assertChangelogInfo(
+ val info2 = assertChangelogContext(
"SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
"WITH (deduplicationMode = 'none')")
- assert(info2.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE)
+ assert(info2.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NONE)
assert(!info2.computeUpdates())
// deduplicationMode = netChanges
- val info3 = assertChangelogInfo(
+ val info3 = assertChangelogContext(
"SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
"WITH (deduplicationMode = 'netChanges')")
- assert(info3.deduplicationMode() ==
ChangelogInfo.DeduplicationMode.NET_CHANGES)
+ assert(info3.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NET_CHANGES)
// computeUpdates = true
- val info4 = assertChangelogInfo(
+ val info4 = assertChangelogContext(
"SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
"WITH (computeUpdates = 'true')")
- assert(info4.deduplicationMode() ==
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+ assert(info4.deduplicationMode() ==
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
assert(info4.computeUpdates())
// Both options together
- val info5 = assertChangelogInfo(
+ val info5 = assertChangelogContext(
"SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
"WITH (deduplicationMode = 'none', computeUpdates = 'true')")
- assert(info5.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE)
+ assert(info5.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NONE)
assert(info5.computeUpdates())
// Case-insensitive deduplicationMode value
- val info6 = assertChangelogInfo(
+ val info6 = assertChangelogContext(
"SELECT * FROM a.b.c CHANGES FROM VERSION 10 TO VERSION 20 " +
"WITH (deduplicationMode = 'DROPCARRYOVERS')")
- assert(info6.deduplicationMode() ==
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+ assert(info6.deduplicationMode() ==
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
}
test("CHANGES clause - invalid deduplicationMode") {
@@ -2378,10 +2378,10 @@ class PlanParserSuite extends AnalysisTest {
Project(Seq(UnresolvedStar(None)),
RelationChanges(
UnresolvedRelation(Seq("my_table")),
- new ChangelogInfo(
+ new ChangelogContext(
new ChangelogRange.VersionRange(
"1", java.util.Optional.empty[String], true, true),
- ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS,
+ ChangelogContext.DeduplicationMode.DROP_CARRYOVERS,
false))))
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
index 880431b189a7..61e193bf54c7 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest,
NamedStreamingRelation, RelationChanges, UnresolvedRelation, UnresolvedStar,
UnresolvedTableValuedFunction}
import org.apache.spark.sql.catalyst.plans.logical.{Project, SubqueryAlias}
import org.apache.spark.sql.catalyst.streaming.{Unassigned, UserProvided}
-import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange}
+import org.apache.spark.sql.connector.catalog.{ChangelogContext,
ChangelogRange}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class StreamRelationParserSuite extends AnalysisTest {
@@ -594,17 +594,18 @@ class StreamRelationParserSuite extends AnalysisTest {
val plan = parsePlan("SELECT * FROM STREAM t CHANGES")
val relationChanges = plan.collect { case rc: RelationChanges => rc }
assert(relationChanges.size == 1)
-
assert(relationChanges.head.changelogInfo.range().isInstanceOf[ChangelogRange.UnboundedRange])
- assert(relationChanges.head.changelogInfo.deduplicationMode() ==
- ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
- assert(!relationChanges.head.changelogInfo.computeUpdates())
+ assert(relationChanges.head.changelogContext.range()
+ .isInstanceOf[ChangelogRange.UnboundedRange])
+ assert(relationChanges.head.changelogContext.deduplicationMode() ==
+ ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
+ assert(!relationChanges.head.changelogContext.computeUpdates())
}
test("STREAM t CHANGES FROM VERSION") {
val plan = parsePlan("SELECT * FROM STREAM t CHANGES FROM VERSION 1")
val relationChanges = plan.collect { case rc: RelationChanges => rc }
assert(relationChanges.size == 1)
- val range = relationChanges.head.changelogInfo.range()
+ val range = relationChanges.head.changelogContext.range()
.asInstanceOf[ChangelogRange.VersionRange]
assert(range.startingVersion() == "1")
assert(!range.endingVersion().isPresent)
@@ -615,7 +616,7 @@ class StreamRelationParserSuite extends AnalysisTest {
val plan = parsePlan("SELECT * FROM STREAM t CHANGES FROM VERSION 5
EXCLUSIVE")
val relationChanges = plan.collect { case rc: RelationChanges => rc }
assert(relationChanges.size == 1)
- val range = relationChanges.head.changelogInfo.range()
+ val range = relationChanges.head.changelogContext.range()
.asInstanceOf[ChangelogRange.VersionRange]
assert(range.startingVersion() == "5")
assert(!range.startingBoundInclusive())
@@ -625,7 +626,7 @@ class StreamRelationParserSuite extends AnalysisTest {
val plan = parsePlan("SELECT * FROM STREAM t CHANGES FROM TIMESTAMP
'2026-01-01'")
val relationChanges = plan.collect { case rc: RelationChanges => rc }
assert(relationChanges.size == 1)
- assert(relationChanges.head.changelogInfo.range()
+ assert(relationChanges.head.changelogContext.range()
.isInstanceOf[ChangelogRange.TimestampRange])
}
@@ -647,9 +648,9 @@ class StreamRelationParserSuite extends AnalysisTest {
"WITH (deduplicationMode = 'none', computeUpdates = 'true')")
val relationChanges = plan.collect { case rc: RelationChanges => rc }
assert(relationChanges.size == 1)
- assert(relationChanges.head.changelogInfo.deduplicationMode() ==
- ChangelogInfo.DeduplicationMode.NONE)
- assert(relationChanges.head.changelogInfo.computeUpdates())
+ assert(relationChanges.head.changelogContext.deduplicationMode() ==
+ ChangelogContext.DeduplicationMode.NONE)
+ assert(relationChanges.head.changelogContext.computeUpdates())
}
test("STREAM t CHANGES - error: subquery in timestamp") {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala
index 2b19bac8d62e..0c1def1ac55c 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala
@@ -40,10 +40,13 @@ class InMemoryChangelogCatalog extends InMemoryCatalog {
private val changeData: mutable.Map[String,
mutable.ArrayBuffer[InternalRow]] =
mutable.Map.empty
- // Stores the most recent ChangelogInfo passed to loadChangelog(), so tests
can verify
- // that the parser/DataFrame API correctly constructed and forwarded it.
- private var _lastChangelogInfo: Option[ChangelogInfo] = None
- def lastChangelogInfo: Option[ChangelogInfo] = _lastChangelogInfo
+ // Stores the most recent ChangelogContext and options passed to
loadChangelog(), so tests
+ // can verify that the parser/DataFrame API correctly constructed and
forwarded them.
+ private var _lastChangelogContext: Option[ChangelogContext] = None
+ def lastChangelogContext: Option[ChangelogContext] = _lastChangelogContext
+
+ private var _lastOptions: Option[CaseInsensitiveStringMap] = None
+ def lastOptions: Option[CaseInsensitiveStringMap] = _lastOptions
// Per-table overrides for Changelog properties (carry-over rows,
intermediate changes,
// update representation, row identity). Tests can set these to exercise
post-processing.
@@ -63,8 +66,10 @@ class InMemoryChangelogCatalog extends InMemoryCatalog {
override def loadChangelog(
ident: Identifier,
- changelogInfo: ChangelogInfo): Changelog = {
- _lastChangelogInfo = Some(changelogInfo)
+ changelogContext: ChangelogContext,
+ options: CaseInsensitiveStringMap): Changelog = {
+ _lastChangelogContext = Some(changelogContext)
+ _lastOptions = Some(options)
if (!tableExists(ident)) {
throw new NoSuchTableException(ident.asMultipartIdentifier)
}
@@ -74,7 +79,7 @@ class InMemoryChangelogCatalog extends InMemoryCatalog {
val numDataCols = table.columns.length
// _commit_version is at index numDataCols + 1 (after _change_type)
val commitVersionIdx = numDataCols + 1
- val filtered = filterByRange(allRows.toSeq, commitVersionIdx,
changelogInfo.range())
+ val filtered = filterByRange(allRows.toSeq, commitVersionIdx,
changelogContext.range())
val props = changelogProperties.getOrElse(ident.toString,
ChangelogProperties())
new InMemoryChangelog(
table.name + "_changelog", table.columns, filtered, props)
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain
index 9666c4ac76ae..6a1afa73f7cd 100644
---
a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain
@@ -1,2 +1,2 @@
~SubqueryAlias spark_catalog.tempdb.myStreamingTable
-+- ~StreamingRelationV2 spark_catalog.tempdb.myStreamingTable_changelog,
ChangelogTable(org.apache.spark.sql.connector.catalog.InMemoryChangelog,ChangelogInfo{range=VersionRange[startingVersion=1,
endingVersion=Optional.empty, startingBoundInclusive=true,
endingBoundInclusive=true], deduplicationMode=DROP_CARRYOVERS,
computeUpdates=false},true), [startingVersion=1,
deduplicationMode=dropCarryovers], [id#0L, _change_type#0, _commit_version#0L,
_commit_timestamp#0], org.apache.spark.sql.co [...]
++- ~StreamingRelationV2 spark_catalog.tempdb.myStreamingTable_changelog,
ChangelogTable(org.apache.spark.sql.connector.catalog.InMemoryChangelog,ChangelogContext{range=VersionRange[startingVersion=1,
endingVersion=Optional.empty, startingBoundInclusive=true,
endingBoundInclusive=true], deduplicationMode=DROP_CARRYOVERS,
computeUpdates=false},true), [startingVersion=1,
deduplicationMode=dropCarryovers], [id#0L, _change_type#0, _commit_version#0L,
_commit_timestamp#0], org.apache.spark.sql [...]
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index dff80cb24268..db78dc1744ec 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -43,7 +43,7 @@ import org.apache.spark.internal.LogKeys.{DATAFRAME_ID,
SESSION_ID}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile,
TaskResourceProfile, TaskResourceRequest}
import org.apache.spark.sql.{AnalysisException, Column, Encoders,
ForeachWriter, Row}
import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier,
FunctionIdentifier, InternalRow, QueryPlanningTracker}
-import org.apache.spark.sql.catalyst.analysis.{ChangelogInfoUtils,
FunctionRegistry, GlobalTempView, LocalTempView, MultiAlias, RelationChanges,
UnresolvedAlias, UnresolvedAttribute, UnresolvedDataFrameStar,
UnresolvedDeserializer, UnresolvedExtractValue, UnresolvedFunction,
UnresolvedOrdinal, UnresolvedPlanId, UnresolvedRegex, UnresolvedRelation,
UnresolvedStar, UnresolvedStarWithColumns, UnresolvedStarWithColumnsRenames,
UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction, U [...]
+import org.apache.spark.sql.catalyst.analysis.{ChangelogContextUtils,
FunctionRegistry, GlobalTempView, LocalTempView, MultiAlias, RelationChanges,
UnresolvedAlias, UnresolvedAttribute, UnresolvedDataFrameStar,
UnresolvedDeserializer, UnresolvedExtractValue, UnresolvedFunction,
UnresolvedOrdinal, UnresolvedPlanId, UnresolvedRegex, UnresolvedRelation,
UnresolvedStar, UnresolvedStarWithColumns, UnresolvedStarWithColumnsRenames,
UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction [...]
import org.apache.spark.sql.catalyst.encoders.{encoderFor, AgnosticEncoder,
ExpressionEncoder, RowEncoder}
import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder,
RowEncoder => AgnosticRowEncoder, StringEncoder, UnboundRowEncoder}
import org.apache.spark.sql.catalyst.expressions._
@@ -1741,10 +1741,10 @@ class SparkConnectPlanner(
val tableName = rel.getUnparsedIdentifier
val options = new CaseInsensitiveStringMap(rel.getOptionsMap)
val timeZone = session.sessionState.conf.sessionLocalTimeZone
- val changelogInfo = ChangelogInfoUtils.fromOptions(options, timeZone)
+ val ctx = ChangelogContextUtils.fromOptions(options, timeZone)
val ident = parser.parseMultipartIdentifier(tableName)
val relation = UnresolvedRelation(ident, options, isStreaming =
rel.getIsStreaming)
- RelationChanges(relation, changelogInfo)
+ RelationChanges(relation, ctx)
}
private def transformParse(rel: proto.Parse): LogicalPlan = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala
index d0d6bf1e8ec0..3dbdf0530516 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.DataSourceOptions
import org.apache.spark.sql.catalyst.analysis.{RelationChanges,
UnresolvedRelation}
-import org.apache.spark.sql.catalyst.analysis.ChangelogInfoUtils
+import org.apache.spark.sql.catalyst.analysis.ChangelogContextUtils
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions,
UnivocityParser}
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser,
JSONOptions}
@@ -328,10 +328,10 @@ class DataFrameReader private[sql](sparkSession:
SparkSession)
val multipartIdentifier =
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
val options = new CaseInsensitiveStringMap(extraOptions.toMap.asJava)
- val changelogInfo = ChangelogInfoUtils.fromOptions(
+ val changelogContext = ChangelogContextUtils.fromOptions(
options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val relation = UnresolvedRelation(multipartIdentifier, options)
- Dataset.ofRows(sparkSession, RelationChanges(relation, changelogInfo))
+ Dataset.ofRows(sparkSession, RelationChanges(relation, changelogContext))
}
/** @inheritdoc */
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
index ff79ad293200..632f46acbe5b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.classic
import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.{Evolving, Experimental}
-import org.apache.spark.sql.catalyst.analysis.{ChangelogInfoUtils,
NamedStreamingRelation, RelationChanges, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{ChangelogContextUtils,
NamedStreamingRelation, RelationChanges, UnresolvedRelation}
import org.apache.spark.sql.catalyst.plans.logical.UnresolvedDataSource
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
import org.apache.spark.sql.classic.ClassicConversions._
@@ -124,10 +124,10 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession)
assertNoSpecifiedSchema("changes")
val identifier =
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
val options = new CaseInsensitiveStringMap(extraOptions.toMap.asJava)
- val changelogInfo = ChangelogInfoUtils.fromOptions(
+ val changelogContext = ChangelogContextUtils.fromOptions(
options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val unresolved = UnresolvedRelation(identifier, options, isStreaming =
true)
- val changes = RelationChanges(unresolved, changelogInfo)
+ val changes = RelationChanges(unresolved, changelogContext)
val plan = NamedStreamingRelation.withUserProvidedName(changes,
userProvidedSourceName)
Dataset.ofRows(sparkSession, plan)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala
index 3a7281f94d88..bb40cd9874d2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala
@@ -389,14 +389,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
// DataFrame API
spark.read.option("startingVersion", "1").changes(fullTableName).collect()
- val info1 = catalog.lastChangelogInfo.get
- assert(info1.deduplicationMode() ===
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+ val info1 = catalog.lastChangelogContext.get
+ assert(info1.deduplicationMode() ===
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
assert(info1.computeUpdates() === false)
// SQL (no WITH clause = defaults)
sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1").collect()
- val info2 = catalog.lastChangelogInfo.get
- assert(info2.deduplicationMode() ===
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS)
+ val info2 = catalog.lastChangelogContext.get
+ assert(info2.deduplicationMode() ===
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS)
assert(info2.computeUpdates() === false)
}
@@ -410,14 +410,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
.option("deduplicationMode", "none")
.changes(fullTableName)
.collect()
- assert(catalog.lastChangelogInfo.get.deduplicationMode() ===
- ChangelogInfo.DeduplicationMode.NONE)
+ assert(catalog.lastChangelogContext.get.deduplicationMode() ===
+ ChangelogContext.DeduplicationMode.NONE)
// SQL
sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1 " +
"WITH (deduplicationMode = 'none')").collect()
- assert(catalog.lastChangelogInfo.get.deduplicationMode() ===
- ChangelogInfo.DeduplicationMode.NONE)
+ assert(catalog.lastChangelogContext.get.deduplicationMode() ===
+ ChangelogContext.DeduplicationMode.NONE)
}
test("changes() passes computeUpdates to catalog") {
@@ -430,12 +430,12 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
.option("computeUpdates", "true")
.changes(fullTableName)
.collect()
- assert(catalog.lastChangelogInfo.get.computeUpdates() === true)
+ assert(catalog.lastChangelogContext.get.computeUpdates() === true)
// SQL
sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1 " +
"WITH (computeUpdates = 'true')").collect()
- assert(catalog.lastChangelogInfo.get.computeUpdates() === true)
+ assert(catalog.lastChangelogContext.get.computeUpdates() === true)
}
// ---------- Batch: timestamp range ----------
@@ -450,14 +450,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
.option("endingTimestamp", "2024-12-31 23:59:59")
.changes(fullTableName)
.collect()
- assert(catalog.lastChangelogInfo.get.range()
+ assert(catalog.lastChangelogContext.get.range()
.isInstanceOf[ChangelogRange.TimestampRange])
// SQL
sql(s"SELECT * FROM $fullTableName " +
"CHANGES FROM TIMESTAMP '2024-01-01 00:00:00' " +
"TO TIMESTAMP '2024-12-31 23:59:59'").collect()
- assert(catalog.lastChangelogInfo.get.range()
+ assert(catalog.lastChangelogContext.get.range()
.isInstanceOf[ChangelogRange.TimestampRange])
}
@@ -599,7 +599,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
.format("memory").queryName("cdc_stream_opts_df").start()
try {
q1.processAllAvailable()
- assert(catalog.lastChangelogInfo.get.computeUpdates() === true)
+ assert(catalog.lastChangelogContext.get.computeUpdates() === true)
} finally {
q1.stop()
}
@@ -612,7 +612,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
.format("memory").queryName("cdc_stream_opts_sql").start()
try {
q2.processAllAvailable()
- assert(catalog.lastChangelogInfo.get.computeUpdates() === true)
+ assert(catalog.lastChangelogContext.get.computeUpdates() === true)
} finally {
q2.stop()
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala
index 0d8f573cc448..082be2ac22c8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala
@@ -196,18 +196,77 @@ class ChangelogResolutionSuite extends SharedSparkSession
{
parameters = Map("relationId" -> "`x`"))
}
- test("CHANGES clause passes changelogInfo to catalog") {
+ test("CHANGES clause passes changelogContext to catalog") {
sql(s"SELECT * FROM $cdcCatalogName.test_table CHANGES FROM VERSION 1 TO
VERSION 5")
val cat = spark.sessionState.catalogManager
.catalog(cdcCatalogName)
.asInstanceOf[InMemoryChangelogCatalog]
- val info = cat.lastChangelogInfo
+ val info = cat.lastChangelogContext
assert(info.isDefined)
val range = info.get.range().asInstanceOf[ChangelogRange.VersionRange]
assert(range.startingVersion() == "1")
assert(range.endingVersion().get() == "5")
}
+ test("user-defined options are forwarded to loadChangelog") {
+ val cat = spark.sessionState.catalogManager
+ .catalog(cdcCatalogName)
+ .asInstanceOf[InMemoryChangelogCatalog]
+
+ spark.read
+ .option("startingVersion", "1")
+ .option("customOption", "customValue")
+ .changes(s"$cdcCatalogName.test_table")
+
+ val opts = cat.lastOptions
+ assert(opts.isDefined)
+ assert(opts.get.get("customOption") == "customValue")
+ assert(opts.get.get("startingVersion") == "1")
+ }
+
+ test("user-defined options are forwarded to loadChangelog - SQL WITH
clause") {
+ val cat = spark.sessionState.catalogManager
+ .catalog(cdcCatalogName)
+ .asInstanceOf[InMemoryChangelogCatalog]
+
+ sql(s"SELECT * FROM $cdcCatalogName.test_table CHANGES FROM VERSION 1 " +
+ "WITH ('customOption' = 'customValue')").queryExecution.analyzed
+
+ val opts = cat.lastOptions
+ assert(opts.isDefined)
+ assert(opts.get.get("customOption") == "customValue")
+ }
+
+ test("user-defined options are forwarded to loadChangelog -
DataStreamReader") {
+ val cat = spark.sessionState.catalogManager
+ .catalog(cdcCatalogName)
+ .asInstanceOf[InMemoryChangelogCatalog]
+
+ spark.readStream
+ .option("startingVersion", "1")
+ .option("customOption", "customValue")
+ .changes(s"$cdcCatalogName.test_table")
+ .queryExecution.analyzed
+
+ val opts = cat.lastOptions
+ assert(opts.isDefined)
+ assert(opts.get.get("customOption") == "customValue")
+ assert(opts.get.get("startingVersion") == "1")
+ }
+
+ test("user-defined options are forwarded to loadChangelog - streaming SQL") {
+ val cat = spark.sessionState.catalogManager
+ .catalog(cdcCatalogName)
+ .asInstanceOf[InMemoryChangelogCatalog]
+
+ sql(s"SELECT * FROM STREAM $cdcCatalogName.test_table CHANGES FROM VERSION
1 " +
+ "WITH ('customOption' = 'customValue')").queryExecution.analyzed
+
+ val opts = cat.lastOptions
+ assert(opts.isDefined)
+ assert(opts.get.get("customOption") == "customValue")
+ }
+
//
===========================================================================
// Streaming post-processing
//
===========================================================================
@@ -306,9 +365,9 @@ class ChangelogResolutionSuite extends SharedSparkSession {
// Generic changelog schema validation
//
===========================================================================
- private def stubInfo(): ChangelogInfo = new ChangelogInfo(
+ private def stubInfo(): ChangelogContext = new ChangelogContext(
new ChangelogRange.VersionRange("1", java.util.Optional.of("2"), true,
true),
- ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS,
+ ChangelogContext.DeduplicationMode.DROP_CARRYOVERS,
false)
private def cl(name: String, cols: (String,
org.apache.spark.sql.types.DataType)*)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]