Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
PatrickRen commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2146945078 @loserwang1024 Could you cherry-pick this commit to release-3.1 branch? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
PatrickRen merged PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2138528049 > Hi @loserwang1024, could you please backport this patch to `release-3.1` branch so that it could be released with CDC 3.1.1? I'd like to, but it haven't been merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
yuxiqian commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2136650327 Hi @loserwang1024, could you please backport this patch to `release-3.1` branch so that it could be released with CDC 3.1.1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2106754276 @PatrickRen , CC, Would you like to help me review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 commented on code in PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1594884935 ## flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java: ## @@ -71,7 +72,9 @@ public void translate( } } -private void sinkTo( +/** Only visible for test */ +@VisibleForTesting +protected void sinkTo( Review Comment: Thanks for your advice, it seems better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 commented on code in PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1594878296 ## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.translator; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; + +/** A test for {@link DataSinkTranslator} */ +public class DataSinkTranslatorTest { + +@Test +public void testPreWriteWithoutCommitSink() { +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +ArrayList mockEvents = Lists.newArrayList(new EmptyEvent(), new EmptyEvent()); +DataStreamSource inputStream = env.fromCollection(mockEvents); +DataSinkTranslator translator = new DataSinkTranslator(); + +String uid = ""; Review Comment: I've tried it before, but it shows that : java.lang.IllegalArgumentException: Node hash must be a 32 character String that describes a hex code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
pvary commented on code in PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1593954989 ## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.translator; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; + +/** A test for {@link DataSinkTranslator} */ +public class DataSinkTranslatorTest { + +@Test +public void testPreWriteWithoutCommitSink() { +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +ArrayList mockEvents = Lists.newArrayList(new EmptyEvent(), new EmptyEvent()); +DataStreamSource inputStream = env.fromCollection(mockEvents); +DataSinkTranslator translator = new DataSinkTranslator(); + +String uid = ""; +MockPreWriteWithoutCommitSink mockPreWriteWithoutCommitSink = +new MockPreWriteWithoutCommitSink(uid); +translator.sinkTo( +inputStream, +mockPreWriteWithoutCommitSink, +"testPreWriteWithoutCommitSink", +new OperatorID()); +OneInputTransformation oneInputTransformation = +(OneInputTransformation) env.getTransformations().get(0); +Transformation reblanceTransformation = oneInputTransformation.getInputs().get(0); +Assert.assertEquals(uid, reblanceTransformation.getUserProvidedNodeHash()); Review Comment: Maybe a comment, like: ``` // Check if the `addPreWriteTopology` is called, and the uid is set when the transformation added ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
pvary commented on code in PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1593952991 ## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.translator; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; + +/** A test for {@link DataSinkTranslator} */ +public class DataSinkTranslatorTest { + +@Test +public void testPreWriteWithoutCommitSink() { +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +ArrayList mockEvents = Lists.newArrayList(new EmptyEvent(), new EmptyEvent()); +DataStreamSource inputStream = env.fromCollection(mockEvents); +DataSinkTranslator translator = new DataSinkTranslator(); + +String uid = ""; Review Comment: nit: Maybe some more descriptive content, like ``` String uid = "Uid set by the addPreWriteTopology topology"; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
pvary commented on code in PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1593951427 ## flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java: ## @@ -71,7 +72,9 @@ public void translate( } } -private void sinkTo( +/** Only visible for test */ +@VisibleForTesting +protected void sinkTo( Review Comment: nit: The comment says the same, as the annotation, so it is not needed. We can leave this as package private (slightly lower privileges than `protected`. ``` @VisibleForTesting void sinkTo( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2099986993 > Could you define your own one in the test itself? Then you have free hands what it does, and does not... Done it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
pvary commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2090855121 > > Could you please add a test case to prevent later code changes to revert this fix? > > I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but not TwoPhaseCommittingSink now unless i mock one. Could you define your own one in the test itself? Then you have free hands what it does, and does not... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2089709636 > Could you please add a test case to prevent later code changes to revert this fix? I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but not TwoPhaseCommittingSink now unless i mock one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
pvary commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2065143602 Good catch @loserwang1024! Could you please add a test case to prevent later code changes to revert this fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2062883633 @PatrickRen , CC -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 opened a new pull request, #3233: URL: https://github.com/apache/flink-cdc/pull/3233 Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored. ``` private void sinkTo( DataStream input, Sink sink, String sinkName, OperatorID schemaOperatorID) { DataStream stream = input; // Pre write topology if (sink instanceof WithPreWriteTopology) { stream = ((WithPreWriteTopology) sink).addPreWriteTopology(stream); } if (sink instanceof TwoPhaseCommittingSink) { addCommittingTopology(sink, stream, sinkName, schemaOperatorID); } else { input.transform( SINK_WRITER_PREFIX + sinkName, CommittableMessageTypeInfo.noOutput(), new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); } } ``` (ps: the modify of StarRocksUtils just apply spotless) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org