[
https://issues.apache.org/jira/browse/FLINK-37691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dawid Wysakowicz reassigned FLINK-37691:
----------------------------------------
Assignee: Dawid Wysakowicz
> Can't consume changelog stream with upsert-kafka connector
> ----------------------------------------------------------
>
> Key: FLINK-37691
> URL: https://issues.apache.org/jira/browse/FLINK-37691
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.1.0
> Reporter: Gunnar Morling
> Assignee: Dawid Wysakowicz
> Priority: Major
>
> I read from a Kafka topic with data change events using the Kafka SQL
> connector using changelog semantics and write those events to another Kafka
> topic using the Upsert Kafka SQL connector. This works as expected with Flink
> 1.20 and 2.0.0 (the Debezium events on the source topic are emitted as flat
> upsert-style records on the sink topic), but fails as of 2.1-SNAPSHOT:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Unsupported to visit node StreamPhysicalDropUpdateBefore. The node either
> should not be pushed through the changelog normalize or is not supported yet.
> at
> org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:119)
> at
> org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:90)
> at
> org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.isRequired(ChangelogNormalizeRequirementResolver.java:74)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1164)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitChildren$4(FlinkChangelogModeInferenceProgram.scala:1208)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:1207)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitSink$2(FlinkChangelogModeInferenceProgram.scala:1253)
> at scala.collection.immutable.List.flatMap(List.scala:366)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitSink(FlinkChangelogModeInferenceProgram.scala:1253)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1031)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.$anonfun$optimize$2(FlinkChangelogModeInferenceProgram.scala:103)
> at scala.collection.immutable.List.flatMap(List.scala:366)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:101)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:47)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.immutable.Range.foreach(Range.scala:158)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:395)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1373)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:951)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1181)
> at
> org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:59)
> at
> dev.morling.demos.cdcingest.KafkaChangelogToUpsertJob.main(KafkaChangelogToUpsertJob.java:59)
> {code}
> Here's my job definition:
> {code}
> public class KafkaChangelogToUpsertJob {
> public static void main(String[] args) {
> Configuration configuration =
> Configuration.fromMap(Map.of("table.exec.source.cdc-events-duplicate",
> "true"));
> EnvironmentSettings settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .withConfiguration(configuration)
> .build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
> tableEnv.executeSql("""
> CREATE TABLE authors_source (
> id BIGINT,
> first_name STRING,
> last_name STRING,
> biography STRING,
> registered BIGINT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'dbserver1.inventory.authors',
> 'properties.bootstrap.servers' =
> 'localhost:9092',
> 'scan.startup.mode' = 'earliest-offset',
> 'value.format' = 'debezium-json'
> );
> """);
> tableEnv.executeSql("""
> CREATE TABLE authors_sink (
> id BIGINT,
> first_name STRING,
> last_name STRING,
> biography STRING,
> registered BIGINT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'authors_processed',
> 'properties.bootstrap.servers' =
> 'localhost:9092',
> 'key.format' = 'json',
> 'value.format' = 'json'
> );
> """);
> Table authors = tableEnv.sqlQuery("SELECT id, first_name,
> last_name, biography, registered FROM authors_source");
> authors.insertInto("authors_sink").execute();
> authors.execute().print();
> }
> {code}
> This might be a regression caused by FLINK-37475.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)