[jira] [Created] (FLINK-35572) flink db2 cdc default value error
junxin lai created FLINK-35572: --- Summary: flink db2 cdc default value error Key: FLINK-35572 URL: https://issues.apache.org/jira/browse/FLINK-35572 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: junxin lai I am using flink db2-cdc to sync database in real time,but fails to handle default values in schema when is making the snapshot. After digging deeper into the problem, I found that this seems to be a bug in debezium and was fixed in 2.0.0.CR1([https://issues.redhat.com/browse/DBZ-4990]). The latest flink3.1 uses debezium version 1.9.8.Final. The default value is a common configuration in DB2. Is there a way we can backport this patch to 1.9.8.Final? !https://private-user-images.githubusercontent.com/18555755/338830194-2959745b-0952-4a27-a741-c03d13c47061.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTgxNzUyODYsIm5iZiI6MTcxODE3NDk4NiwicGF0aCI6Ii8xODU1NTc1NS8zMzg4MzAxOTQtMjk1OTc0NWItMDk1Mi00YTI3LWE3NDEtYzAzZDEzYzQ3MDYxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA2MTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNjEyVDA2NDk0NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPThmMTk2ZWY2ZjRiM2U1MTE5ZDI5NGRiOThmZDBkMTk2ZGQ4YzUwNGZjMzQxNDEwNGExMWNiZmJmMzM2ZmIyYzMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.vehLYcbVqKM-exZU4E_DifFRfmWACAKFD_9Wo1z_0So! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35571) ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation
[ https://issues.apache.org/jira/browse/FLINK-35571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grace Grimwood updated FLINK-35571: --- Attachment: 20240612_181148_mvn-clean-package_flink-runtime.log > ProfilingServiceTest.testRollingDeletion intermittently fails due to improper > test isolation > > > Key: FLINK-35571 > URL: https://issues.apache.org/jira/browse/FLINK-35571 > Project: Flink > Issue Type: Bug > Components: Tests > Environment: *Git revision:* > {code:bash} > $ git show > commit b8d527166e095653ae3ff5c0431bf27297efe229 (HEAD -> master) > {code} > *Java info:* > {code:bash} > $ java -version > openjdk version "17.0.11" 2024-04-16 > OpenJDK Runtime Environment Temurin-17.0.11+9 (build 17.0.11+9) > OpenJDK 64-Bit Server VM Temurin-17.0.11+9 (build 17.0.11+9, mixed mode) > {code} > {code:bash} > $ sdk current > Using: > java: 17.0.11-tem > maven: 3.8.6 > scala: 2.12.19 > {code} > *OS info:* > {code:bash} > $ uname -av > Darwin MacBook-Pro 23.5.0 Darwin Kernel Version 23.5.0: Wed May 1 20:14:38 > PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 arm64 > {code} > *Hardware info:* > {code:bash} > $ sysctl -a | grep -e 'machdep\.cpu\.brand_string\:' -e > 'machdep\.cpu\.core_count\:' -e 'hw\.memsize\:' > hw.memsize: 34359738368 > machdep.cpu.core_count: 12 > machdep.cpu.brand_string: Apple M2 Pro > {code} >Reporter: Grace Grimwood >Priority: Major > Attachments: 20240612_181148_mvn-clean-package_flink-runtime.log > > > *Symptom:* > The test *{{ProfilingServiceTest.testRollingDeletion}}* fails with the > following error: > {code:java} > [ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 25.32 > s <<< FAILURE! -- in > org.apache.flink.runtime.util.profiler.ProfilingServiceTest > [ERROR] > org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion > -- Time elapsed: 9.264 s <<< FAILURE! > org.opentest4j.AssertionFailedError: expected: <3> but was: <6> > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) > at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145) > at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531) > at > org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:175) > at > org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > {code} > The number of extra files found varies from failure to failure. > *Cause:* > Many of the tests in *{{ProfilingServiceTest}}* rely on a specific > configuration of the *{{ProfilingService}}* instance, but > *{{ProfilingService.getInstance}}* does not check whether an existing > instance's config matches the provided config before returning it. Because of > this, and because JUnit does not guarantee a specific ordering of tests > (unless they are specifically annotated), it is possible for these tests to > receive an instance that does not behave in the expected way and therefore > fail. > *Analysis:* > In troubleshooting the test failure, we tried adding an extra assertion to > *{{ProfilingServiceTest.setUp}}* to validate the directories being written to > were correct: > {code:java} > Assertions.assertEquals(tempDir.toString(), > profilingService.getProfilingResultDir()); > {code} > That assert produced the following failure: > {code:java} > org.opentest4j.AssertionFailedError: expected: > > but was: > {code} > This failure shows that the *{{ProfilingService}}* returned by > *{{ProfilingService.getInstance}}* in the setup is not using the correct > directory, and therefore cannot be the correct instance for this test class > because it has the wrong config. > This is because the static method *{{ProfilingServi
[jira] [Updated] (FLINK-35571) ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation
[ https://issues.apache.org/jira/browse/FLINK-35571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grace Grimwood updated FLINK-35571: --- Attachment: (was: 20240612_181148_mvn-clean-package_flink-runtime_also-make.log) > ProfilingServiceTest.testRollingDeletion intermittently fails due to improper > test isolation > > > Key: FLINK-35571 > URL: https://issues.apache.org/jira/browse/FLINK-35571 > Project: Flink > Issue Type: Bug > Components: Tests > Environment: *Git revision:* > {code:bash} > $ git show > commit b8d527166e095653ae3ff5c0431bf27297efe229 (HEAD -> master) > {code} > *Java info:* > {code:bash} > $ java -version > openjdk version "17.0.11" 2024-04-16 > OpenJDK Runtime Environment Temurin-17.0.11+9 (build 17.0.11+9) > OpenJDK 64-Bit Server VM Temurin-17.0.11+9 (build 17.0.11+9, mixed mode) > {code} > {code:bash} > $ sdk current > Using: > java: 17.0.11-tem > maven: 3.8.6 > scala: 2.12.19 > {code} > *OS info:* > {code:bash} > $ uname -av > Darwin MacBook-Pro 23.5.0 Darwin Kernel Version 23.5.0: Wed May 1 20:14:38 > PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 arm64 > {code} > *Hardware info:* > {code:bash} > $ sysctl -a | grep -e 'machdep\.cpu\.brand_string\:' -e > 'machdep\.cpu\.core_count\:' -e 'hw\.memsize\:' > hw.memsize: 34359738368 > machdep.cpu.core_count: 12 > machdep.cpu.brand_string: Apple M2 Pro > {code} >Reporter: Grace Grimwood >Priority: Major > > *Symptom:* > The test *{{ProfilingServiceTest.testRollingDeletion}}* fails with the > following error: > {code:java} > [ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 25.32 > s <<< FAILURE! -- in > org.apache.flink.runtime.util.profiler.ProfilingServiceTest > [ERROR] > org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion > -- Time elapsed: 9.264 s <<< FAILURE! > org.opentest4j.AssertionFailedError: expected: <3> but was: <6> > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) > at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145) > at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531) > at > org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:175) > at > org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > {code} > The number of extra files found varies from failure to failure. > *Cause:* > Many of the tests in *{{ProfilingServiceTest}}* rely on a specific > configuration of the *{{ProfilingService}}* instance, but > *{{ProfilingService.getInstance}}* does not check whether an existing > instance's config matches the provided config before returning it. Because of > this, and because JUnit does not guarantee a specific ordering of tests > (unless they are specifically annotated), it is possible for these tests to > receive an instance that does not behave in the expected way and therefore > fail. > *Analysis:* > In troubleshooting the test failure, we tried adding an extra assertion to > *{{ProfilingServiceTest.setUp}}* to validate the directories being written to > were correct: > {code:java} > Assertions.assertEquals(tempDir.toString(), > profilingService.getProfilingResultDir()); > {code} > That assert produced the following failure: > {code:java} > org.opentest4j.AssertionFailedError: expected: > > but was: > {code} > This failure shows that the *{{ProfilingService}}* returned by > *{{ProfilingService.getInstance}}* in the setup is not using the correct > directory, and therefore cannot be the correct instance for this test class > because it has the wrong config. > This is because the static method *{{ProfilingService.getInstance}}* attempts > to reuse any existing ins
Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]
yuxiqian commented on code in PR #3398: URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1635883374 ## flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh: ## @@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then exit 1 fi -# Setup Flink related configurations -# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it -_FLINK_HOME_DETERMINED=1 Review Comment: IIUC it was used to tell Flink's `config.sh` not to override given `FLINK_HOME` variable, and should be set before calling `. $FLINK_HOME/bin/config.sh`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]
yuxiqian commented on code in PR #3398: URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1635883374 ## flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh: ## @@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then exit 1 fi -# Setup Flink related configurations -# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it -_FLINK_HOME_DETERMINED=1 Review Comment: I think it was used to tell Flink's `config.sh` not to override given `FLINK_HOME` variable, and should be set before calling `. $FLINK_HOME/bin/config.sh`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35571) ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation
Grace Grimwood created FLINK-35571: -- Summary: ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation Key: FLINK-35571 URL: https://issues.apache.org/jira/browse/FLINK-35571 Project: Flink Issue Type: Bug Components: Tests Environment: *Git revision:* {code:bash} $ git show commit b8d527166e095653ae3ff5c0431bf27297efe229 (HEAD -> master) {code} *Java info:* {code:bash} $ java -version openjdk version "17.0.11" 2024-04-16 OpenJDK Runtime Environment Temurin-17.0.11+9 (build 17.0.11+9) OpenJDK 64-Bit Server VM Temurin-17.0.11+9 (build 17.0.11+9, mixed mode) {code} {code:bash} $ sdk current Using: java: 17.0.11-tem maven: 3.8.6 scala: 2.12.19 {code} *OS info:* {code:bash} $ uname -av Darwin MacBook-Pro 23.5.0 Darwin Kernel Version 23.5.0: Wed May 1 20:14:38 PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 arm64 {code} *Hardware info:* {code:bash} $ sysctl -a | grep -e 'machdep\.cpu\.brand_string\:' -e 'machdep\.cpu\.core_count\:' -e 'hw\.memsize\:' hw.memsize: 34359738368 machdep.cpu.core_count: 12 machdep.cpu.brand_string: Apple M2 Pro {code} Reporter: Grace Grimwood Attachments: 20240612_181148_mvn-clean-package_flink-runtime_also-make.log *Symptom:* The test *{{ProfilingServiceTest.testRollingDeletion}}* fails with the following error: {code:java} [ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 25.32 s <<< FAILURE! -- in org.apache.flink.runtime.util.profiler.ProfilingServiceTest [ERROR] org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion -- Time elapsed: 9.264 s <<< FAILURE! org.opentest4j.AssertionFailedError: expected: <3> but was: <6> at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531) at org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:175) at org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) {code} The number of extra files found varies from failure to failure. *Cause:* Many of the tests in *{{ProfilingServiceTest}}* rely on a specific configuration of the *{{ProfilingService}}* instance, but *{{ProfilingService.getInstance}}* does not check whether an existing instance's config matches the provided config before returning it. Because of this, and because JUnit does not guarantee a specific ordering of tests (unless they are specifically annotated), it is possible for these tests to receive an instance that does not behave in the expected way and therefore fail. *Analysis:* In troubleshooting the test failure, we tried adding an extra assertion to *{{ProfilingServiceTest.setUp}}* to validate the directories being written to were correct: {code:java} Assertions.assertEquals(tempDir.toString(), profilingService.getProfilingResultDir()); {code} That assert produced the following failure: {code:java} org.opentest4j.AssertionFailedError: expected: but was: {code} This failure shows that the *{{ProfilingService}}* returned by *{{ProfilingService.getInstance}}* in the setup is not using the correct directory, and therefore cannot be the correct instance for this test class because it has the wrong config. This is because the static method *{{ProfilingService.getInstance}}* attempts to reuse any existing instance of *{{ProfilingService}}* before it creates a new one and disregards any differences in config in doing so, which means that if another test instantiates a *{{ProfilingService}}* with different config first and does not close it, that previous instance will be provided to *{{ProfilingServiceTest}}* rather than the new instance those tests seem to expect. This only happens with the first test run in this
Re: [PR] [FLINK-35121][pipeline-connector][cdc-base] CDC pipeline connector provide ability to verify requiredOptions and optionalOptions [flink-cdc]
yuxiqian commented on PR #3412: URL: https://github.com/apache/flink-cdc/pull/3412#issuecomment-2162196171 Seems there's some overlap between this PR and #3382, @loserwang1024 mind if I cherry pick your commit? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]
joyCurry30 commented on code in PR #3398: URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1635870700 ## flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh: ## @@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then exit 1 fi -# Setup Flink related configurations -# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it -_FLINK_HOME_DETERMINED=1 Review Comment: The variable _FLINK_HOME_DETERMINED is not used. Is this variable used somewhere in the code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35533][runtime] Support Flink hybrid shuffle integration with Apache Celeborn [flink]
TanYuxin-tyx commented on code in PR #24900: URL: https://github.com/apache/flink/pull/24900#discussion_r1635866275 ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -490,6 +490,24 @@ public enum CompressionCodec { + " is configured. The new mode is currently in an experimental phase. It can be set to false to fallback to the legacy mode " + " if something unexpected. Once the new mode reaches a stable state, the legacy mode as well as the option will be removed."); +/** The option to configure the tiered factory creator remote class name for hybrid shuffle. */ +@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) +@Experimental +public static final ConfigOption +NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME = + key("taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class") +.stringType() +.noDefaultValue() +.withDescription( +"The option configures the class that is responsible for creating an " ++ "external remote tier factory for hybrid shuffle. Note that " ++ "only Celeborn can be accepted as the remote shuffle tier " Review Comment: Ok, after some discussions offline, we decide to update the descriptions as above to avoid showing that `only Apache Celeborn`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35121][pipeline-connector][cdc-base] CDC pipeline connector provide ability to verify requiredOptions and optionalOptions [flink-cdc]
loserwang1024 opened a new pull request, #3412: URL: https://github.com/apache/flink-cdc/pull/3412 CDC pipeline connector provide ability to verify requiredOptions and optionalOptions, nor will be hard to handle compatibility later。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35533][runtime] Support Flink hybrid shuffle integration with Apache Celeborn [flink]
TanYuxin-tyx commented on code in PR #24900: URL: https://github.com/apache/flink/pull/24900#discussion_r1635860182 ## flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java: ## @@ -80,8 +83,14 @@ public NettyShuffleDescriptor buildRemote() { } public NettyShuffleDescriptor buildLocal() { +List tierShuffleDescriptors = new ArrayList<>(); +tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE); +tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE); Review Comment: Since the memory tier and the disk tier is used by default in the test, each tier needs its tier shuffle descriptors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][cdc-connector][mysql] Fix NoClassDefFoundError when create new table in mysql cdc source [flink-cdc]
yuxiqian commented on PR #3036: URL: https://github.com/apache/flink-cdc/pull/3036#issuecomment-2162171729 Hi @pengmide, could you please rebase this PR with latest `master` branch before it could be merged? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]
fredia commented on code in PR #24924: URL: https://github.com/apache/flink/pull/24924#discussion_r1635853720 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java: ## @@ -167,22 +166,25 @@ private void verifyStateHandleType(String checkpointPath, boolean fileMergingEna // Check keyed state handle List keyedStateHandles = new ArrayList<>(subtaskState.getManagedKeyedState()); -keyedStateHandles.addAll(subtaskState.getRawKeyedState()); for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { -Assertions.assertInstanceOf( -IncrementalRemoteKeyedStateHandle.class, keyedStateHandle); +assertThat(keyedStateHandle) Review Comment: IIUC, this bug can be reproduced if we perform several checkpoints before restoring to generate placeholder handle, would you like to change this ITCase to make it easier to reproduce? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on PR #24919: URL: https://github.com/apache/flink/pull/24919#issuecomment-2162158973 @gaborgsomogyi I have addressed the comment, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on PR #24919: URL: https://github.com/apache/flink/pull/24919#issuecomment-2162158609 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35121) CDC pipeline connector should verify requiredOptions and optionalOptions
[ https://issues.apache.org/jira/browse/FLINK-35121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854253#comment-17854253 ] Hongshun Wang commented on FLINK-35121: --- [~kwafor] , Sorry, I have done it > CDC pipeline connector should verify requiredOptions and optionalOptions > > > Key: FLINK-35121 > URL: https://issues.apache.org/jira/browse/FLINK-35121 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Hongshun Wang >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > At present, though we provide > org.apache.flink.cdc.common.factories.Factory#requiredOptions and > org.apache.flink.cdc.common.factories.Factory#optionalOptions, but both are > not used anywhere. This means not verifying requiredOptions and > optionalOptions. > Thus, like what DynamicTableFactory does, provide > FactoryHelper to help verify requiredOptions and optionalOptions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Revert [FLINK-34123][FLINK-35068]: Add built-in dedicated serialization support for Maps, Lists, Sets, and Collections [flink]
flinkbot commented on PR #24925: URL: https://github.com/apache/flink/pull/24925#issuecomment-2162103966 ## CI report: * e5d84501e6a2781205494bf429c29e1afe0d4031 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]
flinkbot commented on PR #24924: URL: https://github.com/apache/flink/pull/24924#issuecomment-2162103785 ## CI report: * 9ce0a6f2bb299224ed170ce0944de94703253b81 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MailboxProcessor#processMailsWhenDefaultActionUnavailable avoids allocating Optional [flink]
flinkbot commented on PR #24923: URL: https://github.com/apache/flink/pull/24923#issuecomment-2162103603 ## CI report: * 88f9a4cf7a8e79d861755262f84f91a4b1b0d026 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35068) Introduce built-in serialization support for Set
[ https://issues.apache.org/jira/browse/FLINK-35068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854247#comment-17854247 ] Zhanghao Chen commented on FLINK-35068: --- Will revert it as it breaks state-compatibility. If a user defines a map/list-typed value in state, the default serializer is changed from Kryo to dedicated serializer, breaking compatibility. Furthermore, since RocksDBStateBackend does not support serialization format change for the keys in MapState, we cannot implement a fully-compatible solution for this case. > Introduce built-in serialization support for Set > > > Key: FLINK-35068 > URL: https://issues.apache.org/jira/browse/FLINK-35068 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System >Affects Versions: 1.20.0 >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > Introduce built-in serialization support for {{{}Set{}}}, another common Java > collection type. We'll need to add a new built-in serializer for it > ({{{}MultiSetTypeInformation{}}} utilizes {{MapSerializer}} underneath, but > it could be more efficient for common {{{}Set{}}}). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Revert FLINK-34123&35068: Add built-in dedicated serialization support for Maps, Lists, Sets, and Collections [flink]
X-czh opened a new pull request, #24925: URL: https://github.com/apache/flink/pull/24925 ## What is the purpose of the change Reverting previous changes that break state compatibility. [FLINK-34123](https://issues.apache.org/jira/browse/FLINK-34123) & [FLINK-35068](https://issues.apache.org/jira/browse/FLINK-35068) introduced built-in dedicated serialization support for Maps, Sets, Lists, and Collections. However, if a user defines a collection-typed value in state, the default serializer is changed from Kryo to dedicated serializer, breaking compatibility. Furthermore, since RocksDBStateBackend does not support serialization format change for the keys in MapState, we cannot implement a fully-compatible solution for this case. ## Brief change log Revert the following three commits: 1. [FLINK-34123][core][type] Introduce built-in serialization support for Map, List, and Collection 2. [FLINK-34123][docs][type] Add doc for built-in serialization support for Map, List, and Collection 3. [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34123) Introduce built-in serialization support for Map and List
[ https://issues.apache.org/jira/browse/FLINK-34123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854242#comment-17854242 ] Zhanghao Chen commented on FLINK-34123: --- Will revert it as it breaks state-compatibility. If a user defines a map/list-typed value in state, the default serializer is changed from Kryo to dedicated serializer, breaking compatibility. Furthermore, since RocksDBStateBackend does not support serialization format change for the keys in MapState, we cannot implement a fully-compatible solution for this case. > Introduce built-in serialization support for Map and List > - > > Key: FLINK-34123 > URL: https://issues.apache.org/jira/browse/FLINK-34123 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System >Affects Versions: 1.20.0 >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > Introduce built-in serialization support for Map and List, two common > collection types for which Flink already have custom serializers implemented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35570) Consider PlaceholderStreamStateHandle in checkpoint file merging
[ https://issues.apache.org/jira/browse/FLINK-35570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35570: --- Labels: pull-request-available (was: ) > Consider PlaceholderStreamStateHandle in checkpoint file merging > > > Key: FLINK-35570 > URL: https://issues.apache.org/jira/browse/FLINK-35570 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > > In checkpoint file merging, we should take {{PlaceholderStreamStateHandle}} > into account during lifecycle, since it can be a file merged one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]
Zakelly opened a new pull request, #24924: URL: https://github.com/apache/flink/pull/24924 ## What is the purpose of the change In checkpoint file merging, we should take `PlaceholderStreamStateHandle` into account during lifecycle, since it can be a file merged one. ## Brief change log - take `PlaceholderStreamStateHandle` into account in `reusePreviousStateHandle` and `couldReuseStateHandle` ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35570) Consider PlaceholderStreamStateHandle in checkpoint file merging
Zakelly Lan created FLINK-35570: --- Summary: Consider PlaceholderStreamStateHandle in checkpoint file merging Key: FLINK-35570 URL: https://issues.apache.org/jira/browse/FLINK-35570 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: Zakelly Lan Assignee: Zakelly Lan In checkpoint file merging, we should take {{PlaceholderStreamStateHandle}} into account during lifecycle, since it can be a file merged one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MailboxProcessor#processMailsWhenDefaultActionUnavailable avoids allocating Optional [flink]
schlosna opened a new pull request, #24923: URL: https://github.com/apache/flink/pull/24923 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32091][checkpoint] Add file size metrics for file-merging [flink]
flinkbot commented on PR #24922: URL: https://github.com/apache/flink/pull/24922#issuecomment-2162053937 ## CI report: * 3405d0cc2c400c9c2d3a596b600b2ebda07af2f9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable async profiler [flink-benchmarks]
Zakelly merged PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32091) Add necessary metrics for file-merging
[ https://issues.apache.org/jira/browse/FLINK-32091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32091: --- Labels: pull-request-available (was: ) > Add necessary metrics for file-merging > -- > > Key: FLINK-32091 > URL: https://issues.apache.org/jira/browse/FLINK-32091 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Zakelly Lan >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-32091][checkpoint] Add file size metrics for file-merging [flink]
fredia opened a new pull request, #24922: URL: https://github.com/apache/flink/pull/24922 ## What is the purpose of the change Add file size and file count metrics for file-merging. ## Brief change log - Add `logicalFileCount`, `logicalFileSize`, `physicalFileCount`, `physicalFileSize` metrics. ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). This change added tests and can be verified as follows: - `FileMergingMetricsTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed
[ https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854225#comment-17854225 ] Zakelly Lan commented on FLINK-35569: - Thanks [~qingyue] , I'll take a look. > SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging > failed > -- > > Key: FLINK-35569 > URL: https://issues.apache.org/jira/browse/FLINK-35569 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Build System / CI >Affects Versions: 1.20.0 >Reporter: Jane Chan >Assignee: Zakelly Lan >Priority: Major > > [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181] > The parameterized test is failed when RestoreMode is "CLAIM" and > fileMergingAcrossBoundary is false. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed
[ https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan reassigned FLINK-35569: --- Assignee: Zakelly Lan > SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging > failed > -- > > Key: FLINK-35569 > URL: https://issues.apache.org/jira/browse/FLINK-35569 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Build System / CI >Affects Versions: 1.20.0 >Reporter: Jane Chan >Assignee: Zakelly Lan >Priority: Major > > [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181] > The parameterized test is failed when RestoreMode is "CLAIM" and > fileMergingAcrossBoundary is false. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed
[ https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854224#comment-17854224 ] Jane Chan commented on FLINK-35569: --- Hi [~Zakelly], would you mind sparing some time to take a look? > SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging > failed > -- > > Key: FLINK-35569 > URL: https://issues.apache.org/jira/browse/FLINK-35569 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Build System / CI >Affects Versions: 1.20.0 >Reporter: Jane Chan >Priority: Major > > [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181] > The parameterized test is failed when RestoreMode is "CLAIM" and > fileMergingAcrossBoundary is false. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed
[ https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan updated FLINK-35569: -- Description: [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181] The parameterized test is failed when RestoreMode is "CLAIM" and fileMergingAcrossBoundary is false. > SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging > failed > -- > > Key: FLINK-35569 > URL: https://issues.apache.org/jira/browse/FLINK-35569 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Build System / CI >Affects Versions: 1.20.0 >Reporter: Jane Chan >Priority: Major > > [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181] > The parameterized test is failed when RestoreMode is "CLAIM" and > fileMergingAcrossBoundary is false. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed
Jane Chan created FLINK-35569: - Summary: SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed Key: FLINK-35569 URL: https://issues.apache.org/jira/browse/FLINK-35569 Project: Flink Issue Type: Bug Components: Build System / Azure Pipelines, Build System / CI Affects Versions: 1.20.0 Reporter: Jane Chan -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34545][cdc-pipeline-connector]Add OceanBase pipeline connector to Flink CDC [flink-cdc]
yuxiqian commented on PR #3360: URL: https://github.com/apache/flink-cdc/pull/3360#issuecomment-2161996630 Hi @yuanoOo, thanks for your great contribution! Since @whhe is familiar with OceanBase, do you have time to take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35234][hotfix][cdc-common] Fix NullPointerException of org.apache.flink.cdc.common.configuration.ConfigurationUtils#convertToString [flink-cdc]
Jiabao-Sun commented on code in PR #3255: URL: https://github.com/apache/flink-cdc/pull/3255#discussion_r1635733171 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java: ## @@ -130,7 +130,9 @@ static Duration convertToDuration(Object o) { } static String convertToString(Object o) { -if (o.getClass() == String.class) { +if (o == null) { Review Comment: The inner null check seems redundant now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]
1996fanrui commented on PR #24895: URL: https://github.com/apache/flink/pull/24895#issuecomment-2161973417 It seems one test related to timer fails, it maybe caused by this PR. `StreamTaskCancellationTest.testCancelTaskShouldPreventAdditionalEventTimeTimersFromBeingFired:272->testCancelTaskShouldPreventAdditionalTimersFromBeingFired:300 » IllegalState` https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60198&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=10477 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35568) Add imagePullSecrets for FlinkDeployment spec
Gang Huang created FLINK-35568: -- Summary: Add imagePullSecrets for FlinkDeployment spec Key: FLINK-35568 URL: https://issues.apache.org/jira/browse/FLINK-35568 Project: Flink Issue Type: Improvement Reporter: Gang Huang I am confused that how to configure imagePullSecrets for a private dockerhub website, since there maybe are no related parameters found in the official docs (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35568) Add the imagePullSecrets property for FlinkDeployment spec
[ https://issues.apache.org/jira/browse/FLINK-35568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gang Huang updated FLINK-35568: --- Summary: Add the imagePullSecrets property for FlinkDeployment spec (was: Add imagePullSecrets for FlinkDeployment spec) > Add the imagePullSecrets property for FlinkDeployment spec > -- > > Key: FLINK-35568 > URL: https://issues.apache.org/jira/browse/FLINK-35568 > Project: Flink > Issue Type: Improvement >Reporter: Gang Huang >Priority: Blocker > > I am confused that how to configure imagePullSecrets for a private dockerhub > website, since there maybe are no related parameters found in the official > docs > (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name
[ https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-35540. Resolution: Fixed > flink-cdc-pipeline-connector-mysql lost table which database and table with > the same name > - > > Key: FLINK-35540 > URL: https://issues.apache.org/jira/browse/FLINK-35540 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: linqigeng >Assignee: linqigeng >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > h1. Description > When the parameter of 'tables' in mysql pipeline job contains a table which > database and table are with the same name like 'app.app', the job will fail > and the error meaasge is like: > {code:java} > java.lang.IllegalArgumentException: Cannot find any table by the option > 'tables' = app.app {code} > h1. How to reproduce > Create database and table all named `{{{}app`{}}}, then submit a pipeline job > like this YAML defined: > {code:java} > source: > type: mysql > hostname: localhost > port: 3306 > username: root > password: 123456 > tables: app.app > server-id: 5400-5404 > server-time-zone: UTC > sink: > type: doris > fenodes: 127.0.0.1:8030 > username: root > password: "" > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1pipeline: > name: Sync MySQL Database to Doris > parallelism: 2 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name
[ https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854012#comment-17854012 ] Leonard Xu edited comment on FLINK-35540 at 6/12/24 1:51 AM: - master: c958daf8ab417d16fc7fc0b59f341e8f9cf372e7 release-3.1: 7287eaceca29d105bf5a7c74d75945a42a051016 was (Author: leonard xu): master: c958daf8ab417d16fc7fc0b59f341e8f9cf372e7 release-3.1: TODO > flink-cdc-pipeline-connector-mysql lost table which database and table with > the same name > - > > Key: FLINK-35540 > URL: https://issues.apache.org/jira/browse/FLINK-35540 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: linqigeng >Assignee: linqigeng >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > h1. Description > When the parameter of 'tables' in mysql pipeline job contains a table which > database and table are with the same name like 'app.app', the job will fail > and the error meaasge is like: > {code:java} > java.lang.IllegalArgumentException: Cannot find any table by the option > 'tables' = app.app {code} > h1. How to reproduce > Create database and table all named `{{{}app`{}}}, then submit a pipeline job > like this YAML defined: > {code:java} > source: > type: mysql > hostname: localhost > port: 3306 > username: root > password: 123456 > tables: app.app > server-id: 5400-5404 > server-time-zone: UTC > sink: > type: doris > fenodes: 127.0.0.1:8030 > username: root > password: "" > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1pipeline: > name: Sync MySQL Database to Doris > parallelism: 2 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]
leonardBang merged PR #3409: URL: https://github.com/apache/flink-cdc/pull/3409 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core [flink]
1996fanrui commented on PR #24881: URL: https://github.com/apache/flink/pull/24881#issuecomment-2161888308 > Thanks @GOODBOY008 for the quick update, LGTM now. Hi @1996fanrui, could you help double check this PR? Thank @Jiabao-Sun for the ping, I will check it these 2 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35242) Add per-type schema evolution behavior configuration
[ https://issues.apache.org/jira/browse/FLINK-35242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-35242: Description: > Update: Changed `fine grained` terminology to avoid confusion between > fine-grained job resource management feature. Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or throwing an exception. However such configuration strategy doesn't cover all user cases and requires want more fine-grained strategy configuration. This ticket suggests adding one more strategy "try_evolve" or "evolve_when_available". It's basically like "evolving" option, but doesn't throw an exception if such operation fails, which provides more flexibility. Also, this ticket suggests allowing user to configure per-schema-event strategy, so users could evolve some types of event (like rename column) and reject some unwelcomed events (like truncate table, remove column). was: Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or throwing an exception. However such configuration strategy doesn't cover all user cases and requires want more fine-grained strategy configuration. This ticket suggests adding one more strategy "try_evolve" or "evolve_when_available". It's basically like "evolving" option, but doesn't throw an exception if such operation fails, which provides more flexibility. Also, this ticket suggests allowing user to configure per-schema-event strategy, so users could evolve some types of event (like rename column) and reject some dangerous events (like truncate table, remove column). Summary: Add per-type schema evolution behavior configuration (was: Add fine-grained schema evolution strategy) > Add per-type schema evolution behavior configuration > > > Key: FLINK-35242 > URL: https://issues.apache.org/jira/browse/FLINK-35242 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > Original Estimate: 336h > Remaining Estimate: 336h > > > Update: Changed `fine grained` terminology to avoid confusion between > > fine-grained job resource management feature. > Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or > throwing an exception. However such configuration strategy doesn't cover all > user cases and requires want more fine-grained strategy configuration. > This ticket suggests adding one more strategy "try_evolve" or > "evolve_when_available". It's basically like "evolving" option, but doesn't > throw an exception if such operation fails, which provides more flexibility. > Also, this ticket suggests allowing user to configure per-schema-event > strategy, so users could evolve some types of event (like rename column) and > reject some unwelcomed events (like truncate table, remove column). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]
flinkbot commented on PR #24921: URL: https://github.com/apache/flink/pull/24921#issuecomment-2161796100 ## CI report: * 84a66fe40a34e3e74b68beee473005b903a69fe3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35140) Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35140: --- Labels: pull-request-available (was: ) > Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19 > > > Key: FLINK-35140 > URL: https://issues.apache.org/jira/browse/FLINK-35140 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Opensearch >Reporter: Danny Cranmer >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > [https://github.com/apache/flink-connector-opensearch] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]
snuyanzin opened a new pull request, #24921: URL: https://github.com/apache/flink/pull/24921 ## What is the purpose of the change Add docs for new Opensearch connector release In fact docs are same for both v1 and v2, so just referencing to one branch (also references to multiple branches are not supported here IIRC) ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no ) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35140) Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-35140: Summary: Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19 (was: Release flink-connector-opensearch vX.X.X for Flink 1.19) > Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19 > > > Key: FLINK-35140 > URL: https://issues.apache.org/jira/browse/FLINK-35140 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Opensearch >Reporter: Danny Cranmer >Assignee: Sergey Nuyanzin >Priority: Major > > [https://github.com/apache/flink-connector-opensearch] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Enable ci for v1.2 [flink-connector-opensearch]
snuyanzin merged PR #47: URL: https://github.com/apache/flink-connector-opensearch/pull/47 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding Opensearch Connector v1.2.0 [flink-web]
snuyanzin merged PR #740: URL: https://github.com/apache/flink-web/pull/740 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding Opensearch Connector v2.0.0 [flink-web]
snuyanzin merged PR #741: URL: https://github.com/apache/flink-web/pull/741 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35116) Upgrade JOSDK dependency to 4.8.3
[ https://issues.apache.org/jira/browse/FLINK-35116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi closed FLINK-35116. -- Resolution: Fixed Thanks [~mateczagany] ! Indeed. > Upgrade JOSDK dependency to 4.8.3 > - > > Key: FLINK-35116 > URL: https://issues.apache.org/jira/browse/FLINK-35116 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Márton Balassi >Assignee: Márton Balassi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.9.0 > > > This bring a much needed fix for the operator HA behaviour: > https://github.com/operator-framework/java-operator-sdk/releases/tag/v4.8.3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column
[ https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141 ] Pouria Pirzadeh edited comment on FLINK-20539 at 6/11/24 6:09 PM: -- [~xuyangzhong] [~martijnvisser] Thanks for looking into the issue and merging the fix. I tried below query on 1.19 and I am still hitting (a similar) issue. I need to use CAST so I can create a Row with named fields. The query works fine without CAST, so I believe the same issue (i.e. Data type mismatch between Calcite and Flink's type factory) is happening here too. {code:java} Table t1 = tableEnv.sqlQuery( "SELECT " + "CAST(ROW(name, price) AS ROW) AS col " + "FROM orders"); tableEnv.createTemporaryView("t1", t1); tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code} Here is the error: {code:java} Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL converted type: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL rel: LogicalProject(col=[$0]) LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL]) LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3]) LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3]) LogicalProject(name=[$0], price=[$1], ts=[$2], rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*]) LogicalTableScan(table=[[anonymous_datastream_source$1, metadata=[rowtime]]]) {code} was (Author: pouria): [~martijnvisser] Thanks for looking into the issue and merging the fix. I tried below query on 1.19 and I am still hitting (a similar) issue. I need to use CAST so I can create a Row with named fields. The query works fine without CAST, so I believe the same issue (i.e. Data type mismatch between Calcite and Flink's type factory) is happening here too. {code:java} Table t1 = tableEnv.sqlQuery( "SELECT " + "CAST(ROW(name, price) AS ROW) AS col " + "FROM orders"); tableEnv.createTemporaryView("t1", t1); tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code} Here is the error: {code:java} Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL converted type: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL rel: LogicalProject(col=[$0]) LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL]) LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3]) LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3]) LogicalProject(name=[$0], price=[$1], ts=[$2], rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*]) LogicalTableScan(table=[[anonymous_datastream_source$1, metadata=[rowtime]]]) {code} > Type mismatch when using ROW in computed column > --- > > Key: FLINK-20539 > URL: https://issues.apache.org/jira/browse/FLINK-20539 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: xuyang >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.19.0, 1.18.2 > > > The following SQL: > {code} > env.executeSql( > "CREATE TABLE Orders (\n" > + "order_number BIGINT,\n" > + "priceINT,\n" > + "first_name STRING,\n" > + "last_nameSTRING,\n" > + "buyer_name AS ROW(first_name, last_name)\n" > + ") WITH (\n" > + " 'connector' = 'datagen'\n" > + ")"); > env.executeSql("SELECT * FROM Orders").print(); > {code} > Fails with: > {code} > Exception in thread "main" java.lang.AssertionError: Conversion to relational > algebra failed to preserve datatypes: > validated type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT > NULL buyer_name) NOT NULL > converted type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-
[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column
[ https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141 ] Pouria Pirzadeh edited comment on FLINK-20539 at 6/11/24 6:08 PM: -- [~martijnvisser] Thanks for looking into the issue and merging the fix. I tried below query on 1.19 and I am still hitting (a similar) issue. I need to use CAST so I can create a Row with named fields. The query works fine without CAST, so I believe the same issue (i.e. Data type mismatch between Calcite and Flink's type factory) is happening here too. {code:java} Table t1 = tableEnv.sqlQuery( "SELECT " + "CAST(ROW(name, price) AS ROW) AS col " + "FROM orders"); tableEnv.createTemporaryView("t1", t1); tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code} Here is the error: {code:java} Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL converted type: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL rel: LogicalProject(col=[$0]) LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL]) LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3]) LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3]) LogicalProject(name=[$0], price=[$1], ts=[$2], rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*]) LogicalTableScan(table=[[anonymous_datastream_source$1, metadata=[rowtime]]]) {code} was (Author: pouria): [~martijnvisser] Thanks for looking into the issue and merging the fix. I tried below query on 1.19 and I am still hitting (a similar) issue (See below). I need to use CAST so I can create a Row with named fields. The query works fine without CAST, so I believe the same issue (i.e. Data type mismatch between Calcite and Flink's type factory) is happening here too. {code:java} Table t1 = tableEnv.sqlQuery( "SELECT " + "CAST(ROW(name, price) AS ROW) AS col " + "FROM orders"); tableEnv.createTemporaryView("t1", t1); tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code} Here is the error: {code:java} Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL converted type: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL rel: LogicalProject(col=[$0]) LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL]) LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3]) LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3]) LogicalProject(name=[$0], price=[$1], ts=[$2], rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*]) LogicalTableScan(table=[[anonymous_datastream_source$1, metadata=[rowtime]]]) {code} > Type mismatch when using ROW in computed column > --- > > Key: FLINK-20539 > URL: https://issues.apache.org/jira/browse/FLINK-20539 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: xuyang >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.19.0, 1.18.2 > > > The following SQL: > {code} > env.executeSql( > "CREATE TABLE Orders (\n" > + "order_number BIGINT,\n" > + "priceINT,\n" > + "first_name STRING,\n" > + "last_nameSTRING,\n" > + "buyer_name AS ROW(first_name, last_name)\n" > + ") WITH (\n" > + " 'connector' = 'datagen'\n" > + ")"); > env.executeSql("SELECT * FROM Orders").print(); > {code} > Fails with: > {code} > Exception in thread "main" java.lang.AssertionError: Conversion to relational > algebra failed to preserve datatypes: > validated type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT > NULL buyer_name) NOT NULL > converted type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE"
[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column
[ https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141 ] Pouria Pirzadeh edited comment on FLINK-20539 at 6/11/24 6:08 PM: -- [~martijnvisser] Thanks for looking into the issue and merging the fix. I tried below query on 1.19 and I am still hitting (a similar) issue (See below). I need to use CAST so I can create a Row with named fields. The query works fine without CAST, so I believe the same issue (i.e. Data type mismatch between Calcite and Flink's type factory) is happening here too. {code:java} Table t1 = tableEnv.sqlQuery( "SELECT " + "CAST(ROW(name, price) AS ROW) AS col " + "FROM orders"); tableEnv.createTemporaryView("t1", t1); tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code} Here is the error: {code:java} Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL converted type: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL rel: LogicalProject(col=[$0]) LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL]) LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3]) LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3]) LogicalProject(name=[$0], price=[$1], ts=[$2], rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*]) LogicalTableScan(table=[[anonymous_datastream_source$1, metadata=[rowtime]]]) {code} was (Author: pouria): [~martijnvisser] Thanks for looking into the issue and merging the fix. I tried below query on 1.19 and I am still hitting (a similar) issue (See below). I need to use CAST so I can create a Row with named fields. The query works fine without CAST, so I believe the same issue (i.e. Data type mismatch between Calcite and Flink's type factory) is happening here too. ``` Table t1 = tableEnv.sqlQuery( "SELECT " + "CAST(ROW(name, price) AS ROW) AS col " + "FROM orders"); tableEnv.createTemporaryView("t1", t1); tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); ``` Here is the error: ``` Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL converted type: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL rel: LogicalProject(col=[$0]) LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL]) LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3]) LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3]) LogicalProject(name=[$0], price=[$1], ts=[$2], rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*]) LogicalTableScan(table=[[*anonymous_datastream_source$1*, metadata=[rowtime]]]) ``` > Type mismatch when using ROW in computed column > --- > > Key: FLINK-20539 > URL: https://issues.apache.org/jira/browse/FLINK-20539 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: xuyang >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.19.0, 1.18.2 > > > The following SQL: > {code} > env.executeSql( > "CREATE TABLE Orders (\n" > + "order_number BIGINT,\n" > + "priceINT,\n" > + "first_name STRING,\n" > + "last_nameSTRING,\n" > + "buyer_name AS ROW(first_name, last_name)\n" > + ") WITH (\n" > + " 'connector' = 'datagen'\n" > + ")"); > env.executeSql("SELECT * FROM Orders").print(); > {code} > Fails with: > {code} > Exception in thread "main" java.lang.AssertionError: Conversion to relational > algebra failed to preserve datatypes: > validated type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT > NULL buyer_name) NOT NULL > converted type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE
[jira] [Commented] (FLINK-20539) Type mismatch when using ROW in computed column
[ https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141 ] Pouria Pirzadeh commented on FLINK-20539: - [~martijnvisser] Thanks for looking into the issue and merging the fix. I tried below query on 1.19 and I am still hitting (a similar) issue (See below). I need to use CAST so I can create a Row with named fields. The query works fine without CAST, so I believe the same issue (i.e. Data type mismatch between Calcite and Flink's type factory) is happening here too. ``` Table t1 = tableEnv.sqlQuery( "SELECT " + "CAST(ROW(name, price) AS ROW) AS col " + "FROM orders"); tableEnv.createTemporaryView("t1", t1); tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); ``` Here is the error: ``` Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL converted type: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL rel: LogicalProject(col=[$0]) LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL]) LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3]) LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3]) LogicalProject(name=[$0], price=[$1], ts=[$2], rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*]) LogicalTableScan(table=[[*anonymous_datastream_source$1*, metadata=[rowtime]]]) ``` > Type mismatch when using ROW in computed column > --- > > Key: FLINK-20539 > URL: https://issues.apache.org/jira/browse/FLINK-20539 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: xuyang >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.19.0, 1.18.2 > > > The following SQL: > {code} > env.executeSql( > "CREATE TABLE Orders (\n" > + "order_number BIGINT,\n" > + "priceINT,\n" > + "first_name STRING,\n" > + "last_nameSTRING,\n" > + "buyer_name AS ROW(first_name, last_name)\n" > + ") WITH (\n" > + " 'connector' = 'datagen'\n" > + ")"); > env.executeSql("SELECT * FROM Orders").print(); > {code} > Fails with: > {code} > Exception in thread "main" java.lang.AssertionError: Conversion to relational > algebra failed to preserve datatypes: > validated type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT > NULL buyer_name) NOT NULL > converted type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT NULL buyer_name) NOT > NULL > rel: > LogicalProject(order_number=[$0], price=[$1], first_name=[$2], > last_name=[$3], buyer_name=[ROW($2, $3)]) > LogicalTableScan(table=[[default_catalog, default_database, Orders]]) > at > org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35116) Upgrade JOSDK dependency to 4.8.3
[ https://issues.apache.org/jira/browse/FLINK-35116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854084#comment-17854084 ] Mate Czagany commented on FLINK-35116: -- I think this can be closed as the PR has already been merged to main 4ec4b319c2ba9927c32372c595017840768a67ee > Upgrade JOSDK dependency to 4.8.3 > - > > Key: FLINK-35116 > URL: https://issues.apache.org/jira/browse/FLINK-35116 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Márton Balassi >Assignee: Márton Balassi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.9.0 > > > This bring a much needed fix for the operator HA behaviour: > https://github.com/operator-framework/java-operator-sdk/releases/tag/v4.8.3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan closed FLINK-35473. - > FLIP-457: Improve Table/SQL Configuration for Flink 2.0 > --- > > Key: FLINK-35473 > URL: https://issues.apache.org/jira/browse/FLINK-35473 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > This is the parent task for > [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854068#comment-17854068 ] Jane Chan edited comment on FLINK-35473 at 6/11/24 2:10 PM: Fixed in master 93d49ff6eb9f61cb2450d0b25732f4d8923b840d, b7b1fc2c29995135b9005f07e385986a40c65621, fbf0f28fef737d47b45815d3f77c6a842167c3e8, fbacf22a057e52c06a10988c308dfb31afbbcb12, 6dbe7bf5c306551836ec89c70f9aaab317f55e10, 526f9b034763fd022a52fe84b2c3227c59a78df1 was (Author: qingyue): Fixed in master 93d49ff6eb9f61cb2450d0b25732f4d8923b840d, b7b1fc2c29995135b9005f07e385986a40c65621, fbf0f28fef737d47b45815d3f77c6a842167c3e8, fbacf22a057e52c06a10988c308dfb31afbbcb12, 6dbe7bf5c306551836ec89c70f9aaab317f55e10, 526f9b034763fd022a52fe84b2c3227c59a78df1 > FLIP-457: Improve Table/SQL Configuration for Flink 2.0 > --- > > Key: FLINK-35473 > URL: https://issues.apache.org/jira/browse/FLINK-35473 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > This is the parent task for > [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854068#comment-17854068 ] Jane Chan edited comment on FLINK-35473 at 6/11/24 2:10 PM: Fixed in master 93d49ff6eb9f61cb2450d0b25732f4d8923b840d, b7b1fc2c29995135b9005f07e385986a40c65621, fbf0f28fef737d47b45815d3f77c6a842167c3e8, fbacf22a057e52c06a10988c308dfb31afbbcb12, 6dbe7bf5c306551836ec89c70f9aaab317f55e10, 526f9b034763fd022a52fe84b2c3227c59a78df1 was (Author: qingyue): Fixed in master 93d49ff6eb9f61cb2450d0b25732f4d8923b840d, b7b1fc2c29995135b9005f07e385986a40c65621, fbf0f28fef737d47b45815d3f77c6a842167c3e8, fbacf22a057e52c06a10988c308dfb31afbbcb12, 6dbe7bf5c306551836ec89c70f9aaab317f55e10, 526f9b034763fd022a52fe84b2c3227c59a78df1 > FLIP-457: Improve Table/SQL Configuration for Flink 2.0 > --- > > Key: FLINK-35473 > URL: https://issues.apache.org/jira/browse/FLINK-35473 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > This is the parent task for > [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35266) Add e2e tests for FlinkStateSnapshot CRs
[ https://issues.apache.org/jira/browse/FLINK-35266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mate Czagany updated FLINK-35266: - Fix Version/s: (was: kubernetes-operator-1.9.0) > Add e2e tests for FlinkStateSnapshot CRs > > > Key: FLINK-35266 > URL: https://issues.apache.org/jira/browse/FLINK-35266 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Mate Czagany >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan resolved FLINK-35473. --- Resolution: Fixed Fixed in master 93d49ff6eb9f61cb2450d0b25732f4d8923b840d, b7b1fc2c29995135b9005f07e385986a40c65621, fbf0f28fef737d47b45815d3f77c6a842167c3e8, fbacf22a057e52c06a10988c308dfb31afbbcb12, 6dbe7bf5c306551836ec89c70f9aaab317f55e10, 526f9b034763fd022a52fe84b2c3227c59a78df1 > FLIP-457: Improve Table/SQL Configuration for Flink 2.0 > --- > > Key: FLINK-35473 > URL: https://issues.apache.org/jira/browse/FLINK-35473 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > This is the parent task for > [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35267) Create documentation for FlinkStateSnapshot CR
[ https://issues.apache.org/jira/browse/FLINK-35267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mate Czagany updated FLINK-35267: - Fix Version/s: (was: kubernetes-operator-1.9.0) > Create documentation for FlinkStateSnapshot CR > -- > > Key: FLINK-35267 > URL: https://issues.apache.org/jira/browse/FLINK-35267 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Mate Czagany >Priority: Major > > This should cover the new features and migration from the now deprecated > methods of taking snapshots. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35265) Implement FlinkStateSnapshot custom resource
[ https://issues.apache.org/jira/browse/FLINK-35265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mate Czagany updated FLINK-35265: - Fix Version/s: (was: kubernetes-operator-1.9.0) > Implement FlinkStateSnapshot custom resource > > > Key: FLINK-35265 > URL: https://issues.apache.org/jira/browse/FLINK-35265 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Mate Czagany >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35263) FLIP-446: Kubernetes Operator State Snapshot CRD
[ https://issues.apache.org/jira/browse/FLINK-35263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mate Czagany updated FLINK-35263: - Fix Version/s: (was: kubernetes-operator-1.9.0) > FLIP-446: Kubernetes Operator State Snapshot CRD > > > Key: FLINK-35263 > URL: https://issues.apache.org/jira/browse/FLINK-35263 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Mate Czagany >Priority: Major > > Described in > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-446%3A+Kubernetes+Operator+State+Snapshot+CRD] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]
LadyForest closed pull request #24889: [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 URL: https://github.com/apache/flink/pull/24889 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26951) Add HASH supported in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-26951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854060#comment-17854060 ] lincoln lee commented on FLINK-26951: - Thanks [~kartikeypant] for volunteering this! Before implementation, we should make it clear that which hash algorithm will be used. This hash function is mainly for hive compatibility purpose, so we need to clarify the details to ensure it is compatible with hive. > Add HASH supported in SQL & Table API > - > > Key: FLINK-26951 > URL: https://issues.apache.org/jira/browse/FLINK-26951 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: dalongliu >Priority: Major > Fix For: 1.20.0 > > > Returns a hash value of the arguments. > Syntax: > {code:java} > hash(expr1, ...) {code} > Arguments: > * {{{}exprN{}}}: An expression of any type. > Returns: > An INTEGER. > Examples: > {code:java} > > SELECT hash('Flink', array(123), 2); > -1321691492 {code} > See more: > * [Spark|https://spark.apache.org/docs/latest/api/sql/index.html#hash] > * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854054#comment-17854054 ] lincoln lee commented on FLINK-12450: - Thank you everyone! Assigned to you [~kartikeypant]. > [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table > API and SQL > --- > > Key: FLINK-12450 > URL: https://issues.apache.org/jira/browse/FLINK-12450 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhanchun Zhang >Assignee: Kartikey Pant >Priority: Major > Labels: auto-unassigned, stale-assigned > > BIT_LSHIFT, Shifts a long number to the left > BIT_RSHIFT, Shifts a long number to the right -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee reassigned FLINK-12450: --- Assignee: Kartikey Pant (was: Ran Tao) > [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table > API and SQL > --- > > Key: FLINK-12450 > URL: https://issues.apache.org/jira/browse/FLINK-12450 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhanchun Zhang >Assignee: Kartikey Pant >Priority: Major > Labels: auto-unassigned, stale-assigned > > BIT_LSHIFT, Shifts a long number to the left > BIT_RSHIFT, Shifts a long number to the right -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35378) [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction
[ https://issues.apache.org/jira/browse/FLINK-35378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-35378: --- Summary: [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction (was: [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunc) > [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction > --- > > Key: FLINK-35378 > URL: https://issues.apache.org/jira/browse/FLINK-35378 > Project: Flink > Issue Type: Technical Debt > Components: API / Core >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > > https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=303794871&draftShareId=af4ace88-98b7-4a53-aece-cd67d2f91a15&; -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
gaborgsomogyi commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634861389 ## flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java: ## @@ -285,7 +291,14 @@ private static KeyManagerFactory getKeyManagerFactory( : SecurityOptions.SSL_REST_KEY_PASSWORD, SecurityOptions.SSL_KEY_PASSWORD); -KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); +// do not use getAndCheckOption here as there is no fallback option and a default is +// specified +String keystoreType = +internal +? config.get(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE) +: config.get(SecurityOptions.SSL_REST_KEYSTORE_TYPE); Review Comment: Just a clarification for other reviewers, since there is default value it just doesn't make sense to provide fallback. ## flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java: ## @@ -59,13 +68,35 @@ public TrustManager[] trustManagers() { .fingerprints(sslCertFingerprints) .build(); -trustManagerFactory.init(loadKeystore(sslTrustStore, sslTrustStorePassword)); +trustManagerFactory.init( +loadKeystore(sslTrustStore, sslTrustStorePassword, sslTrustStoreType)); return trustManagerFactory.getTrustManagers(); -} catch (GeneralSecurityException e) { +} catch (GeneralSecurityException | IOException e) { // replicate exception handling from SSLEngineProvider throw new RemoteTransportException( "Server SSL connection could not be established because SSL context could not be constructed", e); } } + +@Override +public KeyStore loadKeystore(String filename, String password) { +try { +return loadKeystore(filename, password, sslKeyStoreType); +} catch (IOException | GeneralSecurityException e) { +throw new RemoteTransportException( +"Server SSL connection could not be established because SSL context could not be constructed", Review Comment: Here we can be more specific: "Server SSL connection could not be established because keystore could not be loaded" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
gaborgsomogyi commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634845033 ## flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java: ## @@ -59,13 +71,38 @@ public TrustManager[] trustManagers() { .fingerprints(sslCertFingerprints) .build(); -trustManagerFactory.init(loadKeystore(sslTrustStore, sslTrustStorePassword)); +trustManagerFactory.init( +loadKeystore(sslTrustStore, sslTrustStorePassword, sslTrustStoreType)); return trustManagerFactory.getTrustManagers(); -} catch (GeneralSecurityException e) { +} catch (GeneralSecurityException | IOException e) { // replicate exception handling from SSLEngineProvider throw new RemoteTransportException( "Server SSL connection could not be established because SSL context could not be constructed", e); } } + +@Override +public KeyStore loadKeystore(String filename, String password) { +try { +return loadKeystore(filename, password, sslKeyStoreType); +} catch (IOException +| CertificateException +| NoSuchAlgorithmException +| KeyStoreException e) { +throw new RemoteTransportException( Review Comment: I've checked out the code and played with it. I agree that's what we can do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35541) Introduce retry limiting for AWS connector sinks
[ https://issues.apache.org/jira/browse/FLINK-35541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksandr Pilipenko updated FLINK-35541: Affects Version/s: aws-connector-4.3.0 > Introduce retry limiting for AWS connector sinks > > > Key: FLINK-35541 > URL: https://issues.apache.org/jira/browse/FLINK-35541 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / AWS, Connectors / DynamoDB, Connectors / > Firehose, Connectors / Kinesis >Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0 >Reporter: Aleksandr Pilipenko >Priority: Major > > Currently if the record write operation in the sink consistently fails with > retriable error, sinks will retry indefinitely. In case when cause of the > error is not resolved this may lead to poison pill. > > Proposal here is to add a configurable retry limit for each record. Users can > specify a maximum retry per record, and the sink will fail once the retry > limit is reached. > > We will implement this for all AWS connectors: > * DDBSink > * Firehose Sink > * Kinesis Sink > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
hlteoh37 commented on code in PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; +import org.apache.flink.types.Row; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter; +import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.beans.BeanInfo; +import java.beans.IntrospectionException; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter +implements ElementConverter { + +private final CompositeType typeInfo; +private final boolean ignoreNulls; +private final TableSchema tableSchema; + +/** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) { +this(typeInfo, true); +} + +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, boolean ignoreNulls) { + +try { +this.typeInfo = typeInfo; +this.ignoreNulls = ignoreNulls; +this.tableSchema = createTableSchema(typeInfo); +} catch (IntrospectionException | IllegalStateException | IllegalArgumentException e) { +throw new FlinkRuntimeException("Failed to extract DynamoDb table schema", e); +} +} + +@Override +public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) { +Preconditions.checkNotNull(tableSchema, "TableSchema is not initialized"); +try { +return DynamoDbWriteRequest.builder() +.setType(DynamoDbWriteRequestType.PUT) +.setItem(tableSchema.itemToMap(input, ignoreNulls)) +.build(); +} catch (ClassCastExcept
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
hlteoh37 commented on code in PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; +import org.apache.flink.types.Row; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter; +import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.beans.BeanInfo; +import java.beans.IntrospectionException; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter +implements ElementConverter { + +private final CompositeType typeInfo; +private final boolean ignoreNulls; +private final TableSchema tableSchema; + +/** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) { +this(typeInfo, true); +} + +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, boolean ignoreNulls) { + +try { +this.typeInfo = typeInfo; +this.ignoreNulls = ignoreNulls; +this.tableSchema = createTableSchema(typeInfo); +} catch (IntrospectionException | IllegalStateException | IllegalArgumentException e) { +throw new FlinkRuntimeException("Failed to extract DynamoDb table schema", e); +} +} + +@Override +public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) { +Preconditions.checkNotNull(tableSchema, "TableSchema is not initialized"); +try { +return DynamoDbWriteRequest.builder() +.setType(DynamoDbWriteRequestType.PUT) +.setItem(tableSchema.itemToMap(input, ignoreNulls)) +.build(); +} catch (ClassCastExcept
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
hlteoh37 commented on code in PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; +import org.apache.flink.types.Row; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter; +import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.beans.BeanInfo; +import java.beans.IntrospectionException; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter +implements ElementConverter { + +private final CompositeType typeInfo; +private final boolean ignoreNulls; +private final TableSchema tableSchema; + +/** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) { +this(typeInfo, true); +} + +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, boolean ignoreNulls) { + +try { +this.typeInfo = typeInfo; +this.ignoreNulls = ignoreNulls; +this.tableSchema = createTableSchema(typeInfo); +} catch (IntrospectionException | IllegalStateException | IllegalArgumentException e) { +throw new FlinkRuntimeException("Failed to extract DynamoDb table schema", e); +} +} + +@Override +public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) { +Preconditions.checkNotNull(tableSchema, "TableSchema is not initialized"); +try { +return DynamoDbWriteRequest.builder() +.setType(DynamoDbWriteRequestType.PUT) +.setItem(tableSchema.itemToMap(input, ignoreNulls)) +.build(); +} catch (ClassCastExcept
[PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]
qg-lin opened a new pull request, #3409: URL: https://github.com/apache/flink-cdc/pull/3409 https://issues.apache.org/jira/browse/FLINK-35540 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 [flink]
pan3793 commented on PR #24905: URL: https://github.com/apache/flink/pull/24905#issuecomment-2160590805 ping @JingGe, is there any thing I can do before merging? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
ferenc-csaky commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { Review Comment: I did not touch this method, I just moved the `getWriterType` above this to keep the method visibility order in the file intact, because that method became package-private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
ferenc-csaky commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { Review Comment: I did not touched this method, I just moved the `getWriterType` above this to keep the method visibility order in the file intact, because that method became package-private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
ferenc-csaky commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1634722169 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { +uncompactedName = uncompactedName.substring(1); +} +return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); Review Comment: I did not touched this method, I just moved the getWriterType above this to keep the method visibility order in the file intact, because that method became package-private. ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { +uncompactedName = uncompactedName.substring(1); +} +return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); Review Comment: I did not touch this method, I just moved the getWriterType above this to keep the method visibility order in the file intact, because that method became package-private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]
morazow commented on PR #24426: URL: https://github.com/apache/flink/pull/24426#issuecomment-2160556166 Thanks @HuangXingBo for the update! I have changed the GitHub actions to run the `auditwheel` repair after installing `patchelf` in Linux. Could you please have another look? You can find the build wheel artifacts for this change here: https://github.com/morazow/flink/actions/runs/9464510932 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp
[ https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-34908. Resolution: Fixed > [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision > for timestamp > --- > > Key: FLINK-34908 > URL: https://issues.apache.org/jira/browse/FLINK-34908 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Xin Gong >Assignee: Xin Gong >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > flink cdc pipeline will decide timestamp zone by config of pipeline. I found > mysql2doris and mysql2starracks will specific datetime format > -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we > should't set fixed datetime format, just return LocalDateTime object. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp
[ https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852752#comment-17852752 ] Leonard Xu edited comment on FLINK-34908 at 6/11/24 11:23 AM: -- master: e2ccc836a056c16974e4956190bdce249705b7ee 3.1: 1112987572e487a79a1bbecf460705aa6153e0bb was (Author: leonard xu): master: e2ccc836a056c16974e4956190bdce249705b7ee 3.1: todo > [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision > for timestamp > --- > > Key: FLINK-34908 > URL: https://issues.apache.org/jira/browse/FLINK-34908 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Xin Gong >Assignee: Xin Gong >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > flink cdc pipeline will decide timestamp zone by config of pipeline. I found > mysql2doris and mysql2starracks will specific datetime format > -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we > should't set fixed datetime format, just return LocalDateTime object. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
leonardBang merged PR #3407: URL: https://github.com/apache/flink-cdc/pull/3407 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35545) Miss 3.1.0 version in snapshot flink-cdc doc version list
[ https://issues.apache.org/jira/browse/FLINK-35545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853114#comment-17853114 ] Leonard Xu edited comment on FLINK-35545 at 6/11/24 11:22 AM: -- master:4efb1d78ca778abeae142facfa99440f22a88b25 release-3.1:050c28649c0bd0068b5e7fe62331b257574572f2 release-3.0:a02b9dc019195ea317739a63738b0d5eaf1a6671 was (Author: leonard xu): master:4efb1d78ca778abeae142facfa99440f22a88b25 release-3.1:93d5ee98da19bb878754bbc3780a3e23033ed331 release-3.0:a02b9dc019195ea317739a63738b0d5eaf1a6671 > Miss 3.1.0 version in snapshot flink-cdc doc version list > - > > Key: FLINK-35545 > URL: https://issues.apache.org/jira/browse/FLINK-35545 > Project: Flink > Issue Type: Bug > Components: Documentation, Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Zhongqiang Gong >Assignee: Zhongqiang Gong >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > Attachments: image-2024-06-08-10-07-06-403.png, screenshot-1.png > > > Link : [https://nightlies.apache.org/flink/flink-cdc-docs-master/] > Miss 3.0.1 version in version list: > > !screenshot-1.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]
leonardBang merged PR #3408: URL: https://github.com/apache/flink-cdc/pull/3408 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]
GOODBOY008 commented on PR #3408: URL: https://github.com/apache/flink-cdc/pull/3408#issuecomment-2160480202 @leonardBang PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name
[ https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854012#comment-17854012 ] Leonard Xu commented on FLINK-35540: master: c958daf8ab417d16fc7fc0b59f341e8f9cf372e7 release-3.1: TODO > flink-cdc-pipeline-connector-mysql lost table which database and table with > the same name > - > > Key: FLINK-35540 > URL: https://issues.apache.org/jira/browse/FLINK-35540 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: linqigeng >Assignee: linqigeng >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > h1. Description > When the parameter of 'tables' in mysql pipeline job contains a table which > database and table are with the same name like 'app.app', the job will fail > and the error meaasge is like: > {code:java} > java.lang.IllegalArgumentException: Cannot find any table by the option > 'tables' = app.app {code} > h1. How to reproduce > Create database and table all named `{{{}app`{}}}, then submit a pipeline job > like this YAML defined: > {code:java} > source: > type: mysql > hostname: localhost > port: 3306 > username: root > password: 123456 > tables: app.app > server-id: 5400-5404 > server-time-zone: UTC > sink: > type: doris > fenodes: 127.0.0.1:8030 > username: root > password: "" > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1pipeline: > name: Sync MySQL Database to Doris > parallelism: 2 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name
[ https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-35540: -- Assignee: linqigeng > flink-cdc-pipeline-connector-mysql lost table which database and table with > the same name > - > > Key: FLINK-35540 > URL: https://issues.apache.org/jira/browse/FLINK-35540 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: linqigeng >Assignee: linqigeng >Priority: Major > Labels: pull-request-available > > h1. Description > When the parameter of 'tables' in mysql pipeline job contains a table which > database and table are with the same name like 'app.app', the job will fail > and the error meaasge is like: > {code:java} > java.lang.IllegalArgumentException: Cannot find any table by the option > 'tables' = app.app {code} > h1. How to reproduce > Create database and table all named `{{{}app`{}}}, then submit a pipeline job > like this YAML defined: > {code:java} > source: > type: mysql > hostname: localhost > port: 3306 > username: root > password: 123456 > tables: app.app > server-id: 5400-5404 > server-time-zone: UTC > sink: > type: doris > fenodes: 127.0.0.1:8030 > username: root > password: "" > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1pipeline: > name: Sync MySQL Database to Doris > parallelism: 2 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name
[ https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-35540: --- Fix Version/s: cdc-3.2.0 cdc-3.1.1 > flink-cdc-pipeline-connector-mysql lost table which database and table with > the same name > - > > Key: FLINK-35540 > URL: https://issues.apache.org/jira/browse/FLINK-35540 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: linqigeng >Assignee: linqigeng >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > h1. Description > When the parameter of 'tables' in mysql pipeline job contains a table which > database and table are with the same name like 'app.app', the job will fail > and the error meaasge is like: > {code:java} > java.lang.IllegalArgumentException: Cannot find any table by the option > 'tables' = app.app {code} > h1. How to reproduce > Create database and table all named `{{{}app`{}}}, then submit a pipeline job > like this YAML defined: > {code:java} > source: > type: mysql > hostname: localhost > port: 3306 > username: root > password: 123456 > tables: app.app > server-id: 5400-5404 > server-time-zone: UTC > sink: > type: doris > fenodes: 127.0.0.1:8030 > username: root > password: "" > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1pipeline: > name: Sync MySQL Database to Doris > parallelism: 2 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]
GOODBOY008 opened a new pull request, #3408: URL: https://github.com/apache/flink-cdc/pull/3408 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35540][cdc-common][cdc-connector][mysql] fix lost table when database and table are with the same name [flink-cdc]
leonardBang merged PR #3396: URL: https://github.com/apache/flink-cdc/pull/3396 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-35559) Shading issue cause class conflict
[ https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh resolved FLINK-35559. - Resolution: Fixed > Shading issue cause class conflict > -- > > Key: FLINK-35559 > URL: https://issues.apache.org/jira/browse/FLINK-35559 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS, Connectors / Firehose, Connectors / > Kinesis >Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: pull-request-available > Fix For: aws-connector-4.4.0 > > Attachments: Screenshot 2024-06-08 at 18.19.30.png > > > Incorrect shading configuration causes ClassCastException during exception > handling when job package flink-connector-kinesis with > flink-connector-aws-kinesis-firehose. > {code:java} > java.lang.ClassCastException: class > software.amazon.awssdk.services.firehose.model.FirehoseException cannot be > cast to class > org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException > (software.amazon.awssdk.services.firehose.model.FirehoseException and > org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException > are in unnamed module of loader 'app') > at > org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62) > ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53) > ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218) > ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189) > ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) > ~[?:?] > at > software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79) > ~[utils-2.20.144.jar:?] > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) > ~[?:?] > at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56) > ~[sdk-core-2.20.144.jar:?] > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) > ~[?:?] > at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java
[jira] [Updated] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error
[ https://issues.apache.org/jira/browse/FLINK-35567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongshun Wang updated FLINK-35567: -- Fix Version/s: cdc-3.2.0 (was: cdc-3.1.1) > CDC BinaryWriter cast NullableSerializerWrapper error > -- > > Key: FLINK-35567 > URL: https://issues.apache.org/jira/browse/FLINK-35567 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 >Reporter: Hongshun Wang >Priority: Major > Fix For: cdc-3.2.0 > > > Current, we will generate data type serializers by > org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]), > which will put into a > NullableSerializerWrapper. > {code:java} > //代码占位符 > public BinaryRecordDataGenerator(DataType[] dataTypes) { > this( > dataTypes, > Arrays.stream(dataTypes) > .map(InternalSerializers::create) > .map(NullableSerializerWrapper::new) > .toArray(TypeSerializer[]::new)); > } {code} > However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast > NullableSerializerWrapper to > ArrayDataSerializer/TypeSerializer/TypeSerializer. > A exception will be thrown: > {code:java} > java.lang.ClassCastException: > org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be > cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer > at > org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134) > at > org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error
Hongshun Wang created FLINK-35567: - Summary: CDC BinaryWriter cast NullableSerializerWrapper error Key: FLINK-35567 URL: https://issues.apache.org/jira/browse/FLINK-35567 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: Hongshun Wang Fix For: cdc-3.1.1 Current, we will generate data type serializers by org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]), which will put into a NullableSerializerWrapper. {code:java} //代码占位符 public BinaryRecordDataGenerator(DataType[] dataTypes) { this( dataTypes, Arrays.stream(dataTypes) .map(InternalSerializers::create) .map(NullableSerializerWrapper::new) .toArray(TypeSerializer[]::new)); } {code} However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast NullableSerializerWrapper to ArrayDataSerializer/TypeSerializer/TypeSerializer. A exception will be thrown: {code:java} java.lang.ClassCastException: org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer at org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134) at org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error
[ https://issues.apache.org/jira/browse/FLINK-35567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongshun Wang updated FLINK-35567: -- Affects Version/s: cdc-3.1.1 (was: cdc-3.2.0) > CDC BinaryWriter cast NullableSerializerWrapper error > -- > > Key: FLINK-35567 > URL: https://issues.apache.org/jira/browse/FLINK-35567 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 >Reporter: Hongshun Wang >Priority: Major > Fix For: cdc-3.1.1 > > > Current, we will generate data type serializers by > org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]), > which will put into a > NullableSerializerWrapper. > {code:java} > //代码占位符 > public BinaryRecordDataGenerator(DataType[] dataTypes) { > this( > dataTypes, > Arrays.stream(dataTypes) > .map(InternalSerializers::create) > .map(NullableSerializerWrapper::new) > .toArray(TypeSerializer[]::new)); > } {code} > However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast > NullableSerializerWrapper to > ArrayDataSerializer/TypeSerializer/TypeSerializer. > A exception will be thrown: > {code:java} > java.lang.ClassCastException: > org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be > cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer > at > org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134) > at > org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35566) Consider promoting TypeSerializer from PublicEvolving to Public
Martijn Visser created FLINK-35566: -- Summary: Consider promoting TypeSerializer from PublicEvolving to Public Key: FLINK-35566 URL: https://issues.apache.org/jira/browse/FLINK-35566 Project: Flink Issue Type: Technical Debt Components: API / Core Reporter: Martijn Visser While working on implementing FLINK-35378, I ran into the problem that TypeSerializer is still on PublicEvolving since Flink 1.0. We should consider annotating this as Public. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset
[ https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Naci Simsek updated FLINK-35565: Description: h2. Summary Flink batch job gets into an infinite fetch loop and could not gracefully finish if the connected Kafka topic is empty and starting offset value in Flink job is lower than the current start/end offset of the related topic. See below for details: h2. How to reproduce Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events from Kafka topic. Related Kafka topic is empty, there are no events, and the offset value is as below: *15* !image-2024-06-11-11-19-09-889.png|width=895,height=256! Flink job uses a *specific starting offset* value, which is +*less*+ than the current offset of the topic/partition. See below, it set as “4” {code:java} package naci.grpId; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.kafka.common.TopicPartition; import java.util.HashMap; import java.util.Map; public class KafkaSource_Print { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); // Define the specific offsets for the partitions Map specificOffsets = new HashMap<>(); specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start from offset 4 for partition 0 KafkaSource kafkaSource = KafkaSource .builder() .setBootstrapServers("localhost:9093") // Make sure the port is correct .setTopics("topic_test") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets)) .setBounded(OffsetsInitializer.latest()) .build(); DataStream stream = env.fromSource( kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source" ); stream.print(); env.execute("Flink KafkaSource test job"); } }{code} Here are the initial logs printed related to the offset, as soon as the job gets submitted: {code:java} 2024-05-30 12:15:50,010 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, StoppingOffset: 15]] 2024-05-30 12:15:50,069 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, StoppingOffset: 15]]] 2024-05-30 12:15:50,074 TRACE org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - Seeking starting offsets to specified offsets: {topic_test-0=4} 2024-05-30 12:15:50,074 INFO org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=KafkaSource--2381765882724812354-0, groupId=null] Seeking to offset 4 for partition topic_test-0 2024-05-30 12:15:50,075 DEBUG org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - SplitsChange handling result: [topic_test-0, start:4, stop: 15] 2024-05-30 12:15:50,075 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished running task AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, StoppingOffset: 15]]] 2024-05-30 12:15:50,075 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run FetchTask{code} Since the starting offset {color:#ff}*4*{color} is *out of range* for the Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task manager logs: {code:java} 2024-05-30 12:15:50,193 INFO org.apache.kafka.clients.consumer.internals.Fetcher [] - [Consumer clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: null)], epoch=0}} is out of range for partition topic_test-0, resetting offset 2024-05-30 12:15:50,195 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset for partition topic_test-0 to position FetchPosition{offset=15, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nacis