[jira] [Created] (FLINK-35572) flink db2 cdc default value error

2024-06-11 Thread junxin lai (Jira)
junxin  lai created FLINK-35572:
---

 Summary: flink db2 cdc default value error 
 Key: FLINK-35572
 URL: https://issues.apache.org/jira/browse/FLINK-35572
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: junxin  lai


I am using flink db2-cdc to sync database in real time,but fails to handle 
default values in schema when is making the snapshot.

After digging deeper into the problem, I found that this seems to be a bug in 
debezium and was fixed in 
2.0.0.CR1([https://issues.redhat.com/browse/DBZ-4990]). The latest flink3.1 
uses debezium version 1.9.8.Final. 

The default value is a common configuration in DB2. Is there a way we can 
backport this patch to 1.9.8.Final? 
!https://private-user-images.githubusercontent.com/18555755/338830194-2959745b-0952-4a27-a741-c03d13c47061.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTgxNzUyODYsIm5iZiI6MTcxODE3NDk4NiwicGF0aCI6Ii8xODU1NTc1NS8zMzg4MzAxOTQtMjk1OTc0NWItMDk1Mi00YTI3LWE3NDEtYzAzZDEzYzQ3MDYxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA2MTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNjEyVDA2NDk0NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPThmMTk2ZWY2ZjRiM2U1MTE5ZDI5NGRiOThmZDBkMTk2ZGQ4YzUwNGZjMzQxNDEwNGExMWNiZmJmMzM2ZmIyYzMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.vehLYcbVqKM-exZU4E_DifFRfmWACAKFD_9Wo1z_0So!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35571) ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation

2024-06-11 Thread Grace Grimwood (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grace Grimwood updated FLINK-35571:
---
Attachment: 20240612_181148_mvn-clean-package_flink-runtime.log

> ProfilingServiceTest.testRollingDeletion intermittently fails due to improper 
> test isolation
> 
>
> Key: FLINK-35571
> URL: https://issues.apache.org/jira/browse/FLINK-35571
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: *Git revision:*
> {code:bash}
> $ git show
> commit b8d527166e095653ae3ff5c0431bf27297efe229 (HEAD -> master)
> {code}
> *Java info:*
> {code:bash}
> $ java -version
> openjdk version "17.0.11" 2024-04-16
> OpenJDK Runtime Environment Temurin-17.0.11+9 (build 17.0.11+9)
> OpenJDK 64-Bit Server VM Temurin-17.0.11+9 (build 17.0.11+9, mixed mode)
> {code}
> {code:bash}
> $ sdk current
> Using:
> java: 17.0.11-tem
> maven: 3.8.6
> scala: 2.12.19
> {code}
> *OS info:*
> {code:bash}
> $ uname -av
> Darwin MacBook-Pro 23.5.0 Darwin Kernel Version 23.5.0: Wed May  1 20:14:38 
> PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 arm64
> {code}
> *Hardware info:*
> {code:bash}
> $ sysctl -a | grep -e 'machdep\.cpu\.brand_string\:' -e 
> 'machdep\.cpu\.core_count\:' -e 'hw\.memsize\:'
> hw.memsize: 34359738368
> machdep.cpu.core_count: 12
> machdep.cpu.brand_string: Apple M2 Pro
> {code}
>Reporter: Grace Grimwood
>Priority: Major
> Attachments: 20240612_181148_mvn-clean-package_flink-runtime.log
>
>
> *Symptom:*
> The test *{{ProfilingServiceTest.testRollingDeletion}}* fails with the 
> following error:
> {code:java}
> [ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 25.32 
> s <<< FAILURE! -- in 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest
> [ERROR] 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion
>  -- Time elapsed: 9.264 s <<< FAILURE!
> org.opentest4j.AssertionFailedError: expected: <3> but was: <6>
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at 
> org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
>   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
>   at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
>   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:175)
>   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
>   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> {code}
> The number of extra files found varies from failure to failure.
> *Cause:*
> Many of the tests in *{{ProfilingServiceTest}}* rely on a specific 
> configuration of the *{{ProfilingService}}* instance, but 
> *{{ProfilingService.getInstance}}* does not check whether an existing 
> instance's config matches the provided config before returning it. Because of 
> this, and because JUnit does not guarantee a specific ordering of tests 
> (unless they are specifically annotated), it is possible for these tests to 
> receive an instance that does not behave in the expected way and therefore 
> fail.
> *Analysis:*
> In troubleshooting the test failure, we tried adding an extra assertion to 
> *{{ProfilingServiceTest.setUp}}* to validate the directories being written to 
> were correct:
> {code:java}
> Assertions.assertEquals(tempDir.toString(), 
> profilingService.getProfilingResultDir());
> {code}
> That assert produced the following failure:
> {code:java}
> org.opentest4j.AssertionFailedError: expected: 
>  
> but was: 
> {code}
> This failure shows that the *{{ProfilingService}}* returned by 
> *{{ProfilingService.getInstance}}* in the setup is not using the correct 
> directory, and therefore cannot be the correct instance for this test class 
> because it has the wrong config.
> This is because the static method *{{ProfilingServi

[jira] [Updated] (FLINK-35571) ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation

2024-06-11 Thread Grace Grimwood (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grace Grimwood updated FLINK-35571:
---
Attachment: (was: 
20240612_181148_mvn-clean-package_flink-runtime_also-make.log)

> ProfilingServiceTest.testRollingDeletion intermittently fails due to improper 
> test isolation
> 
>
> Key: FLINK-35571
> URL: https://issues.apache.org/jira/browse/FLINK-35571
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: *Git revision:*
> {code:bash}
> $ git show
> commit b8d527166e095653ae3ff5c0431bf27297efe229 (HEAD -> master)
> {code}
> *Java info:*
> {code:bash}
> $ java -version
> openjdk version "17.0.11" 2024-04-16
> OpenJDK Runtime Environment Temurin-17.0.11+9 (build 17.0.11+9)
> OpenJDK 64-Bit Server VM Temurin-17.0.11+9 (build 17.0.11+9, mixed mode)
> {code}
> {code:bash}
> $ sdk current
> Using:
> java: 17.0.11-tem
> maven: 3.8.6
> scala: 2.12.19
> {code}
> *OS info:*
> {code:bash}
> $ uname -av
> Darwin MacBook-Pro 23.5.0 Darwin Kernel Version 23.5.0: Wed May  1 20:14:38 
> PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 arm64
> {code}
> *Hardware info:*
> {code:bash}
> $ sysctl -a | grep -e 'machdep\.cpu\.brand_string\:' -e 
> 'machdep\.cpu\.core_count\:' -e 'hw\.memsize\:'
> hw.memsize: 34359738368
> machdep.cpu.core_count: 12
> machdep.cpu.brand_string: Apple M2 Pro
> {code}
>Reporter: Grace Grimwood
>Priority: Major
>
> *Symptom:*
> The test *{{ProfilingServiceTest.testRollingDeletion}}* fails with the 
> following error:
> {code:java}
> [ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 25.32 
> s <<< FAILURE! -- in 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest
> [ERROR] 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion
>  -- Time elapsed: 9.264 s <<< FAILURE!
> org.opentest4j.AssertionFailedError: expected: <3> but was: <6>
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at 
> org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
>   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
>   at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
>   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:175)
>   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
>   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> {code}
> The number of extra files found varies from failure to failure.
> *Cause:*
> Many of the tests in *{{ProfilingServiceTest}}* rely on a specific 
> configuration of the *{{ProfilingService}}* instance, but 
> *{{ProfilingService.getInstance}}* does not check whether an existing 
> instance's config matches the provided config before returning it. Because of 
> this, and because JUnit does not guarantee a specific ordering of tests 
> (unless they are specifically annotated), it is possible for these tests to 
> receive an instance that does not behave in the expected way and therefore 
> fail.
> *Analysis:*
> In troubleshooting the test failure, we tried adding an extra assertion to 
> *{{ProfilingServiceTest.setUp}}* to validate the directories being written to 
> were correct:
> {code:java}
> Assertions.assertEquals(tempDir.toString(), 
> profilingService.getProfilingResultDir());
> {code}
> That assert produced the following failure:
> {code:java}
> org.opentest4j.AssertionFailedError: expected: 
>  
> but was: 
> {code}
> This failure shows that the *{{ProfilingService}}* returned by 
> *{{ProfilingService.getInstance}}* in the setup is not using the correct 
> directory, and therefore cannot be the correct instance for this test class 
> because it has the wrong config.
> This is because the static method *{{ProfilingService.getInstance}}* attempts 
> to reuse any existing ins

Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]

2024-06-11 Thread via GitHub


yuxiqian commented on code in PR #3398:
URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1635883374


##
flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh:
##
@@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then
   exit 1
 fi
 
-# Setup Flink related configurations
-# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
-_FLINK_HOME_DETERMINED=1

Review Comment:
   IIUC it was used to tell Flink's `config.sh` not to override given 
`FLINK_HOME` variable, and should be set before calling `. 
$FLINK_HOME/bin/config.sh`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]

2024-06-11 Thread via GitHub


yuxiqian commented on code in PR #3398:
URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1635883374


##
flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh:
##
@@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then
   exit 1
 fi
 
-# Setup Flink related configurations
-# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
-_FLINK_HOME_DETERMINED=1

Review Comment:
   I think it was used to tell Flink's `config.sh` not to override given 
`FLINK_HOME` variable, and should be set before calling `. 
$FLINK_HOME/bin/config.sh`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35571) ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation

2024-06-11 Thread Grace Grimwood (Jira)
Grace Grimwood created FLINK-35571:
--

 Summary: ProfilingServiceTest.testRollingDeletion intermittently 
fails due to improper test isolation
 Key: FLINK-35571
 URL: https://issues.apache.org/jira/browse/FLINK-35571
 Project: Flink
  Issue Type: Bug
  Components: Tests
 Environment: *Git revision:*
{code:bash}
$ git show
commit b8d527166e095653ae3ff5c0431bf27297efe229 (HEAD -> master)
{code}

*Java info:*
{code:bash}
$ java -version
openjdk version "17.0.11" 2024-04-16
OpenJDK Runtime Environment Temurin-17.0.11+9 (build 17.0.11+9)
OpenJDK 64-Bit Server VM Temurin-17.0.11+9 (build 17.0.11+9, mixed mode)
{code}

{code:bash}
$ sdk current
Using:
java: 17.0.11-tem
maven: 3.8.6
scala: 2.12.19
{code}

*OS info:*
{code:bash}
$ uname -av
Darwin MacBook-Pro 23.5.0 Darwin Kernel Version 23.5.0: Wed May  1 20:14:38 PDT 
2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 arm64
{code}

*Hardware info:*
{code:bash}
$ sysctl -a | grep -e 'machdep\.cpu\.brand_string\:' -e 
'machdep\.cpu\.core_count\:' -e 'hw\.memsize\:'
hw.memsize: 34359738368
machdep.cpu.core_count: 12
machdep.cpu.brand_string: Apple M2 Pro
{code}
Reporter: Grace Grimwood
 Attachments: 
20240612_181148_mvn-clean-package_flink-runtime_also-make.log

*Symptom:*
The test *{{ProfilingServiceTest.testRollingDeletion}}* fails with the 
following error:
{code:java}
[ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 25.32 s 
<<< FAILURE! -- in org.apache.flink.runtime.util.profiler.ProfilingServiceTest
[ERROR] 
org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion 
-- Time elapsed: 9.264 s <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <3> but was: <6>
at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at 
org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
at 
org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:175)
at 
org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
{code}
The number of extra files found varies from failure to failure.

*Cause:*
Many of the tests in *{{ProfilingServiceTest}}* rely on a specific 
configuration of the *{{ProfilingService}}* instance, but 
*{{ProfilingService.getInstance}}* does not check whether an existing 
instance's config matches the provided config before returning it. Because of 
this, and because JUnit does not guarantee a specific ordering of tests (unless 
they are specifically annotated), it is possible for these tests to receive an 
instance that does not behave in the expected way and therefore fail.

*Analysis:*
In troubleshooting the test failure, we tried adding an extra assertion to 
*{{ProfilingServiceTest.setUp}}* to validate the directories being written to 
were correct:
{code:java}
Assertions.assertEquals(tempDir.toString(), 
profilingService.getProfilingResultDir());
{code}
That assert produced the following failure:
{code:java}
org.opentest4j.AssertionFailedError: expected: 
 but 
was: 
{code}
This failure shows that the *{{ProfilingService}}* returned by 
*{{ProfilingService.getInstance}}* in the setup is not using the correct 
directory, and therefore cannot be the correct instance for this test class 
because it has the wrong config.

This is because the static method *{{ProfilingService.getInstance}}* attempts 
to reuse any existing instance of *{{ProfilingService}}* before it creates a 
new one and disregards any differences in config in doing so, which means that 
if another test instantiates a *{{ProfilingService}}* with different config 
first and does not close it, that previous instance will be provided to 
*{{ProfilingServiceTest}}* rather than the new instance those tests seem to 
expect. This only happens with the first test run in this

Re: [PR] [FLINK-35121][pipeline-connector][cdc-base] CDC pipeline connector provide ability to verify requiredOptions and optionalOptions [flink-cdc]

2024-06-11 Thread via GitHub


yuxiqian commented on PR #3412:
URL: https://github.com/apache/flink-cdc/pull/3412#issuecomment-2162196171

   Seems there's some overlap between this PR and #3382, @loserwang1024 mind if 
I cherry pick your commit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]

2024-06-11 Thread via GitHub


joyCurry30 commented on code in PR #3398:
URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1635870700


##
flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh:
##
@@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then
   exit 1
 fi
 
-# Setup Flink related configurations
-# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
-_FLINK_HOME_DETERMINED=1

Review Comment:
   The variable _FLINK_HOME_DETERMINED is not used. Is this variable used 
somewhere in the code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35533][runtime] Support Flink hybrid shuffle integration with Apache Celeborn [flink]

2024-06-11 Thread via GitHub


TanYuxin-tyx commented on code in PR #24900:
URL: https://github.com/apache/flink/pull/24900#discussion_r1635866275


##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -490,6 +490,24 @@ public enum CompressionCodec {
 + " is configured. The new mode is 
currently in an experimental phase. It can be set to false to fallback to the 
legacy mode "
 + " if something unexpected. Once the new 
mode reaches a stable state, the legacy mode as well as the option will be 
removed.");
 
+/** The option to configure the tiered factory creator remote class name 
for hybrid shuffle. */
+@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+@Experimental
+public static final ConfigOption
+NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME =
+
key("taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class")
+.stringType()
+.noDefaultValue()
+.withDescription(
+"The option configures the class that is 
responsible for creating an "
++ "external remote tier factory 
for hybrid shuffle. Note that "
++ "only Celeborn can be accepted 
as the remote shuffle tier "

Review Comment:
   Ok, after some discussions offline, we decide to update the descriptions as 
above to avoid showing that `only Apache Celeborn`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35121][pipeline-connector][cdc-base] CDC pipeline connector provide ability to verify requiredOptions and optionalOptions [flink-cdc]

2024-06-11 Thread via GitHub


loserwang1024 opened a new pull request, #3412:
URL: https://github.com/apache/flink-cdc/pull/3412

   CDC pipeline connector provide ability to verify requiredOptions and 
optionalOptions, nor will be  hard to handle compatibility later。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35533][runtime] Support Flink hybrid shuffle integration with Apache Celeborn [flink]

2024-06-11 Thread via GitHub


TanYuxin-tyx commented on code in PR #24900:
URL: https://github.com/apache/flink/pull/24900#discussion_r1635860182


##
flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java:
##
@@ -80,8 +83,14 @@ public NettyShuffleDescriptor buildRemote() {
 }
 
 public NettyShuffleDescriptor buildLocal() {
+List tierShuffleDescriptors = new ArrayList<>();
+tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE);
+tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE);

Review Comment:
   Since the memory tier and the disk tier is used by default in the test, each 
tier needs its tier shuffle descriptors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][cdc-connector][mysql] Fix NoClassDefFoundError when create new table in mysql cdc source [flink-cdc]

2024-06-11 Thread via GitHub


yuxiqian commented on PR #3036:
URL: https://github.com/apache/flink-cdc/pull/3036#issuecomment-2162171729

   Hi @pengmide, could you please rebase this PR with latest `master` branch 
before it could be merged? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]

2024-06-11 Thread via GitHub


fredia commented on code in PR #24924:
URL: https://github.com/apache/flink/pull/24924#discussion_r1635853720


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java:
##
@@ -167,22 +166,25 @@ private void verifyStateHandleType(String checkpointPath, 
boolean fileMergingEna
 // Check keyed state handle
 List keyedStateHandles =
 new ArrayList<>(subtaskState.getManagedKeyedState());
-keyedStateHandles.addAll(subtaskState.getRawKeyedState());
 for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
-Assertions.assertInstanceOf(
-IncrementalRemoteKeyedStateHandle.class, 
keyedStateHandle);
+assertThat(keyedStateHandle)

Review Comment:
   IIUC, this bug can be reproduced if we perform several checkpoints before 
restoring to generate placeholder handle, would you like to change this ITCase 
to make it easier to reproduce?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-11 Thread via GitHub


ammar-master commented on PR #24919:
URL: https://github.com/apache/flink/pull/24919#issuecomment-2162158973

   @gaborgsomogyi I have addressed the comment, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-11 Thread via GitHub


ammar-master commented on PR #24919:
URL: https://github.com/apache/flink/pull/24919#issuecomment-2162158609

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35121) CDC pipeline connector should verify requiredOptions and optionalOptions

2024-06-11 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854253#comment-17854253
 ] 

Hongshun Wang commented on FLINK-35121:
---

[~kwafor] , Sorry, I have done it

> CDC pipeline connector should verify requiredOptions and optionalOptions
> 
>
> Key: FLINK-35121
> URL: https://issues.apache.org/jira/browse/FLINK-35121
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> At present, though we provide 
> org.apache.flink.cdc.common.factories.Factory#requiredOptions and 
> org.apache.flink.cdc.common.factories.Factory#optionalOptions, but both are 
> not used anywhere. This means not verifying requiredOptions and 
> optionalOptions.
> Thus, like what DynamicTableFactory does, provide 
> FactoryHelper to help verify requiredOptions and optionalOptions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Revert [FLINK-34123][FLINK-35068]: Add built-in dedicated serialization support for Maps, Lists, Sets, and Collections [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24925:
URL: https://github.com/apache/flink/pull/24925#issuecomment-2162103966

   
   ## CI report:
   
   * e5d84501e6a2781205494bf429c29e1afe0d4031 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24924:
URL: https://github.com/apache/flink/pull/24924#issuecomment-2162103785

   
   ## CI report:
   
   * 9ce0a6f2bb299224ed170ce0944de94703253b81 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MailboxProcessor#processMailsWhenDefaultActionUnavailable avoids allocating Optional [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24923:
URL: https://github.com/apache/flink/pull/24923#issuecomment-2162103603

   
   ## CI report:
   
   * 88f9a4cf7a8e79d861755262f84f91a4b1b0d026 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35068) Introduce built-in serialization support for Set

2024-06-11 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854247#comment-17854247
 ] 

Zhanghao Chen commented on FLINK-35068:
---

Will revert it as it breaks state-compatibility. If a user defines a 
map/list-typed value in state, the default serializer is changed from Kryo to 
dedicated serializer, breaking compatibility. Furthermore, since 
RocksDBStateBackend does not support serialization format change for the keys 
in MapState, we cannot implement a fully-compatible solution for this case.

> Introduce built-in serialization support for Set
> 
>
> Key: FLINK-35068
> URL: https://issues.apache.org/jira/browse/FLINK-35068
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Introduce built-in serialization support for {{{}Set{}}}, another common Java 
> collection type. We'll need to add a new built-in serializer for it 
> ({{{}MultiSetTypeInformation{}}} utilizes {{MapSerializer}} underneath, but 
> it could be more efficient for common {{{}Set{}}}).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Revert FLINK-34123&35068: Add built-in dedicated serialization support for Maps, Lists, Sets, and Collections [flink]

2024-06-11 Thread via GitHub


X-czh opened a new pull request, #24925:
URL: https://github.com/apache/flink/pull/24925

   ## What is the purpose of the change
   
   Reverting previous changes that break state compatibility.
   
   [FLINK-34123](https://issues.apache.org/jira/browse/FLINK-34123) & 
[FLINK-35068](https://issues.apache.org/jira/browse/FLINK-35068) introduced 
built-in dedicated serialization support for Maps, Sets, Lists, and 
Collections. However, if a user defines a collection-typed value in state, the 
default serializer is changed from Kryo to dedicated serializer, breaking 
compatibility. Furthermore, since RocksDBStateBackend does not support 
serialization format change for the keys in MapState, we cannot implement a 
fully-compatible solution for this case.
   
   ## Brief change log
   
   Revert the following three commits:
   1. [FLINK-34123][core][type] Introduce built-in serialization support for 
Map, List, and Collection
   2. [FLINK-34123][docs][type] Add doc for built-in serialization support for 
Map, List, and Collection
   3. [FLINK-35068][core][type] Introduce built-in serialization support for 
java.util.Set
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34123) Introduce built-in serialization support for Map and List

2024-06-11 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854242#comment-17854242
 ] 

Zhanghao Chen commented on FLINK-34123:
---

Will revert it as it breaks state-compatibility. If a user defines a 
map/list-typed value in state, the default serializer is changed from Kryo to 
dedicated serializer, breaking compatibility. Furthermore, since 
RocksDBStateBackend does not support serialization format change for the keys 
in MapState, we cannot implement a fully-compatible solution for this case.

> Introduce built-in serialization support for Map and List
> -
>
> Key: FLINK-34123
> URL: https://issues.apache.org/jira/browse/FLINK-34123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Introduce built-in serialization support for Map and List, two common 
> collection types for which Flink already have custom serializers implemented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35570) Consider PlaceholderStreamStateHandle in checkpoint file merging

2024-06-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35570:
---
Labels: pull-request-available  (was: )

> Consider PlaceholderStreamStateHandle in checkpoint file merging
> 
>
> Key: FLINK-35570
> URL: https://issues.apache.org/jira/browse/FLINK-35570
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> In checkpoint file merging, we should take {{PlaceholderStreamStateHandle}} 
> into account during lifecycle, since it can be a file merged one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]

2024-06-11 Thread via GitHub


Zakelly opened a new pull request, #24924:
URL: https://github.com/apache/flink/pull/24924

   ## What is the purpose of the change
   
   In checkpoint file merging, we should take `PlaceholderStreamStateHandle` 
into account during lifecycle, since it can be a file merged one.
   
   ## Brief change log
   
- take `PlaceholderStreamStateHandle` into account in 
`reusePreviousStateHandle` and `couldReuseStateHandle`
   
   ## Verifying this change
   
   
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35570) Consider PlaceholderStreamStateHandle in checkpoint file merging

2024-06-11 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35570:
---

 Summary: Consider PlaceholderStreamStateHandle in checkpoint file 
merging
 Key: FLINK-35570
 URL: https://issues.apache.org/jira/browse/FLINK-35570
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Zakelly Lan
Assignee: Zakelly Lan


In checkpoint file merging, we should take {{PlaceholderStreamStateHandle}} 
into account during lifecycle, since it can be a file merged one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MailboxProcessor#processMailsWhenDefaultActionUnavailable avoids allocating Optional [flink]

2024-06-11 Thread via GitHub


schlosna opened a new pull request, #24923:
URL: https://github.com/apache/flink/pull/24923

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32091][checkpoint] Add file size metrics for file-merging [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24922:
URL: https://github.com/apache/flink/pull/24922#issuecomment-2162053937

   
   ## CI report:
   
   * 3405d0cc2c400c9c2d3a596b600b2ebda07af2f9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable async profiler [flink-benchmarks]

2024-06-11 Thread via GitHub


Zakelly merged PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32091) Add necessary metrics for file-merging

2024-06-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32091:
---
Labels: pull-request-available  (was: )

> Add necessary metrics for file-merging
> --
>
> Key: FLINK-32091
> URL: https://issues.apache.org/jira/browse/FLINK-32091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-32091][checkpoint] Add file size metrics for file-merging [flink]

2024-06-11 Thread via GitHub


fredia opened a new pull request, #24922:
URL: https://github.com/apache/flink/pull/24922

   
   
   ## What is the purpose of the change
   
   Add file size and file count metrics for file-merging.
   
   
   ## Brief change log
   - Add `logicalFileCount`, `logicalFileSize`, `physicalFileCount`, 
`physicalFileSize` metrics.
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   This change added tests and can be verified as follows:
   - `FileMergingMetricsTest`

   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854225#comment-17854225
 ] 

Zakelly Lan commented on FLINK-35569:
-

Thanks [~qingyue] , I'll take a look.

> SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
>  failed
> --
>
> Key: FLINK-35569
> URL: https://issues.apache.org/jira/browse/FLINK-35569
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Build System / CI
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Zakelly Lan
>Priority: Major
>
> [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]
> The parameterized test is failed when RestoreMode is "CLAIM" and 
> fileMergingAcrossBoundary is false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-35569:
---

Assignee: Zakelly Lan

> SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
>  failed
> --
>
> Key: FLINK-35569
> URL: https://issues.apache.org/jira/browse/FLINK-35569
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Build System / CI
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Zakelly Lan
>Priority: Major
>
> [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]
> The parameterized test is failed when RestoreMode is "CLAIM" and 
> fileMergingAcrossBoundary is false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854224#comment-17854224
 ] 

Jane Chan commented on FLINK-35569:
---

Hi [~Zakelly], would you mind sparing some time to take a look?

> SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
>  failed
> --
>
> Key: FLINK-35569
> URL: https://issues.apache.org/jira/browse/FLINK-35569
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Build System / CI
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Priority: Major
>
> [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]
> The parameterized test is failed when RestoreMode is "CLAIM" and 
> fileMergingAcrossBoundary is false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-35569:
--
Description: 
[https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]

The parameterized test is failed when RestoreMode is "CLAIM" and 
fileMergingAcrossBoundary is false.

> SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
>  failed
> --
>
> Key: FLINK-35569
> URL: https://issues.apache.org/jira/browse/FLINK-35569
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Build System / CI
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Priority: Major
>
> [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]
> The parameterized test is failed when RestoreMode is "CLAIM" and 
> fileMergingAcrossBoundary is false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Jane Chan (Jira)
Jane Chan created FLINK-35569:
-

 Summary: 
SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
 failed
 Key: FLINK-35569
 URL: https://issues.apache.org/jira/browse/FLINK-35569
 Project: Flink
  Issue Type: Bug
  Components: Build System / Azure Pipelines, Build System / CI
Affects Versions: 1.20.0
Reporter: Jane Chan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34545][cdc-pipeline-connector]Add OceanBase pipeline connector to Flink CDC [flink-cdc]

2024-06-11 Thread via GitHub


yuxiqian commented on PR #3360:
URL: https://github.com/apache/flink-cdc/pull/3360#issuecomment-2161996630

   Hi @yuanoOo, thanks for your great contribution! Since @whhe is familiar 
with OceanBase, do you have time to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35234][hotfix][cdc-common] Fix NullPointerException of org.apache.flink.cdc.common.configuration.ConfigurationUtils#convertToString [flink-cdc]

2024-06-11 Thread via GitHub


Jiabao-Sun commented on code in PR #3255:
URL: https://github.com/apache/flink-cdc/pull/3255#discussion_r1635733171


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java:
##
@@ -130,7 +130,9 @@ static Duration convertToDuration(Object o) {
 }
 
 static String convertToString(Object o) {
-if (o.getClass() == String.class) {
+if (o == null) {

Review Comment:
   The inner null check seems redundant now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]

2024-06-11 Thread via GitHub


1996fanrui commented on PR #24895:
URL: https://github.com/apache/flink/pull/24895#issuecomment-2161973417

   It seems one test related to timer fails, it maybe caused by this PR.
   
   
`StreamTaskCancellationTest.testCancelTaskShouldPreventAdditionalEventTimeTimersFromBeingFired:272->testCancelTaskShouldPreventAdditionalTimersFromBeingFired:300
 » IllegalState`
   
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60198&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=10477
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35568) Add imagePullSecrets for FlinkDeployment spec

2024-06-11 Thread Gang Huang (Jira)
Gang Huang created FLINK-35568:
--

 Summary: Add imagePullSecrets for FlinkDeployment spec
 Key: FLINK-35568
 URL: https://issues.apache.org/jira/browse/FLINK-35568
 Project: Flink
  Issue Type: Improvement
Reporter: Gang Huang


I am confused that how to configure imagePullSecrets for a private dockerhub 
website, since there maybe are no related parameters found in the official docs 
(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35568) Add the imagePullSecrets property for FlinkDeployment spec

2024-06-11 Thread Gang Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Huang updated FLINK-35568:
---
Summary: Add the imagePullSecrets property for FlinkDeployment spec  (was: 
Add imagePullSecrets for FlinkDeployment spec)

> Add the imagePullSecrets property for FlinkDeployment spec
> --
>
> Key: FLINK-35568
> URL: https://issues.apache.org/jira/browse/FLINK-35568
> Project: Flink
>  Issue Type: Improvement
>Reporter: Gang Huang
>Priority: Blocker
>
> I am confused that how to configure imagePullSecrets for a private dockerhub 
> website, since there maybe are no related parameters found in the official 
> docs 
> (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-35540.

Resolution: Fixed

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854012#comment-17854012
 ] 

Leonard Xu edited comment on FLINK-35540 at 6/12/24 1:51 AM:
-

master: c958daf8ab417d16fc7fc0b59f341e8f9cf372e7

release-3.1: 7287eaceca29d105bf5a7c74d75945a42a051016


was (Author: leonard xu):
master: c958daf8ab417d16fc7fc0b59f341e8f9cf372e7

release-3.1: TODO

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3409:
URL: https://github.com/apache/flink-cdc/pull/3409


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core [flink]

2024-06-11 Thread via GitHub


1996fanrui commented on PR #24881:
URL: https://github.com/apache/flink/pull/24881#issuecomment-2161888308

   > Thanks @GOODBOY008 for the quick update, LGTM now. Hi @1996fanrui, could 
you help double check this PR?
   
   Thank @Jiabao-Sun for the ping, I will check it these 2 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35242) Add per-type schema evolution behavior configuration

2024-06-11 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-35242:

Description: 
> Update: Changed `fine grained` terminology to avoid confusion between 
> fine-grained job resource management feature.

Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or 
throwing an exception. However such configuration strategy doesn't cover all 
user cases and requires want more fine-grained strategy configuration.

This ticket suggests adding one more strategy "try_evolve" or 
"evolve_when_available". It's basically like "evolving" option, but doesn't 
throw an exception if such operation fails, which provides more flexibility.

Also, this ticket suggests allowing user to configure per-schema-event 
strategy, so users could evolve some types of event (like rename column) and 
reject some unwelcomed events (like truncate table, remove column).

  was:
Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or 
throwing an exception. However such configuration strategy doesn't cover all 
user cases and requires want more fine-grained strategy configuration.

This ticket suggests adding one more strategy "try_evolve" or 
"evolve_when_available". It's basically like "evolving" option, but doesn't 
throw an exception if such operation fails, which provides more flexibility.

Also, this ticket suggests allowing user to configure per-schema-event 
strategy, so users could evolve some types of event (like rename column) and 
reject some dangerous events (like truncate table, remove column).

Summary: Add per-type schema evolution behavior configuration  (was: 
Add fine-grained schema evolution strategy)

> Add per-type schema evolution behavior configuration
> 
>
> Key: FLINK-35242
> URL: https://issues.apache.org/jira/browse/FLINK-35242
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> > Update: Changed `fine grained` terminology to avoid confusion between 
> > fine-grained job resource management feature.
> Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or 
> throwing an exception. However such configuration strategy doesn't cover all 
> user cases and requires want more fine-grained strategy configuration.
> This ticket suggests adding one more strategy "try_evolve" or 
> "evolve_when_available". It's basically like "evolving" option, but doesn't 
> throw an exception if such operation fails, which provides more flexibility.
> Also, this ticket suggests allowing user to configure per-schema-event 
> strategy, so users could evolve some types of event (like rename column) and 
> reject some unwelcomed events (like truncate table, remove column).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24921:
URL: https://github.com/apache/flink/pull/24921#issuecomment-2161796100

   
   ## CI report:
   
   * 84a66fe40a34e3e74b68beee473005b903a69fe3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35140) Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19

2024-06-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35140:
---
Labels: pull-request-available  (was: )

> Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19
> 
>
> Key: FLINK-35140
> URL: https://issues.apache.org/jira/browse/FLINK-35140
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Opensearch
>Reporter: Danny Cranmer
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> [https://github.com/apache/flink-connector-opensearch]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]

2024-06-11 Thread via GitHub


snuyanzin opened a new pull request, #24921:
URL: https://github.com/apache/flink/pull/24921

   
   ## What is the purpose of the change
   Add docs for new Opensearch connector release
   In fact docs are same for both v1 and v2, so just referencing to one branch 
(also references to multiple branches are not supported here IIRC)
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: ( no )
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35140) Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19

2024-06-11 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-35140:

Summary: Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 
1.19  (was: Release flink-connector-opensearch vX.X.X for Flink 1.19)

> Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19
> 
>
> Key: FLINK-35140
> URL: https://issues.apache.org/jira/browse/FLINK-35140
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Opensearch
>Reporter: Danny Cranmer
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> [https://github.com/apache/flink-connector-opensearch]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Enable ci for v1.2 [flink-connector-opensearch]

2024-06-11 Thread via GitHub


snuyanzin merged PR #47:
URL: https://github.com/apache/flink-connector-opensearch/pull/47


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding Opensearch Connector v1.2.0 [flink-web]

2024-06-11 Thread via GitHub


snuyanzin merged PR #740:
URL: https://github.com/apache/flink-web/pull/740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding Opensearch Connector v2.0.0 [flink-web]

2024-06-11 Thread via GitHub


snuyanzin merged PR #741:
URL: https://github.com/apache/flink-web/pull/741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35116) Upgrade JOSDK dependency to 4.8.3

2024-06-11 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-35116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-35116.
--
Resolution: Fixed

Thanks [~mateczagany] ! Indeed.

> Upgrade JOSDK dependency to 4.8.3
> -
>
> Key: FLINK-35116
> URL: https://issues.apache.org/jira/browse/FLINK-35116
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Márton Balassi
>Assignee: Márton Balassi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> This bring a much needed fix for the operator HA behaviour:
> https://github.com/operator-framework/java-operator-sdk/releases/tag/v4.8.3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-11 Thread Pouria Pirzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141
 ] 

Pouria Pirzadeh edited comment on FLINK-20539 at 6/11/24 6:09 PM:
--

[~xuyangzhong]   [~martijnvisser] Thanks for looking into the issue and merging 
the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue.
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}


was (Author: pouria):
[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue.
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-

[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-11 Thread Pouria Pirzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141
 ] 

Pouria Pirzadeh edited comment on FLINK-20539 at 6/11/24 6:08 PM:
--

[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue.
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}


was (Author: pouria):
[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue (See 
below).
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE"

[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-11 Thread Pouria Pirzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141
 ] 

Pouria Pirzadeh edited comment on FLINK-20539 at 6/11/24 6:08 PM:
--

[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue (See 
below).
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}


was (Author: pouria):
[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue (See 
below).
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.


```

Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);

tableEnv.sqlQuery("SELECT * FROM t1").execute().print();

```

Here is the error:
```

Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[*anonymous_datastream_source$1*, 
metadata=[rowtime]]])

```

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE

[jira] [Commented] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-11 Thread Pouria Pirzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141
 ] 

Pouria Pirzadeh commented on FLINK-20539:
-

[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue (See 
below).
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.


```

Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);

tableEnv.sqlQuery("SELECT * FROM t1").execute().print();

```

Here is the error:
```

Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[*anonymous_datastream_source$1*, 
metadata=[rowtime]]])

```

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT NULL buyer_name) NOT 
> NULL
> rel:
> LogicalProject(order_number=[$0], price=[$1], first_name=[$2], 
> last_name=[$3], buyer_name=[ROW($2, $3)])
>   LogicalTableScan(table=[[default_catalog, default_database, Orders]])
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35116) Upgrade JOSDK dependency to 4.8.3

2024-06-11 Thread Mate Czagany (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854084#comment-17854084
 ] 

Mate Czagany commented on FLINK-35116:
--

I think this can be closed as the PR has already been merged to main 
4ec4b319c2ba9927c32372c595017840768a67ee

> Upgrade JOSDK dependency to 4.8.3
> -
>
> Key: FLINK-35116
> URL: https://issues.apache.org/jira/browse/FLINK-35116
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Márton Balassi
>Assignee: Márton Balassi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> This bring a much needed fix for the operator HA behaviour:
> https://github.com/operator-framework/java-operator-sdk/releases/tag/v4.8.3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-06-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan closed FLINK-35473.
-

> FLIP-457: Improve Table/SQL Configuration for Flink 2.0
> ---
>
> Key: FLINK-35473
> URL: https://issues.apache.org/jira/browse/FLINK-35473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This is the parent task for 
> [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-06-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854068#comment-17854068
 ] 

Jane Chan edited comment on FLINK-35473 at 6/11/24 2:10 PM:


Fixed in master
93d49ff6eb9f61cb2450d0b25732f4d8923b840d,
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8,
fbacf22a057e52c06a10988c308dfb31afbbcb12,
6dbe7bf5c306551836ec89c70f9aaab317f55e10,
526f9b034763fd022a52fe84b2c3227c59a78df1


was (Author: qingyue):
Fixed in master
93d49ff6eb9f61cb2450d0b25732f4d8923b840d,
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8,
fbacf22a057e52c06a10988c308dfb31afbbcb12,
6dbe7bf5c306551836ec89c70f9aaab317f55e10,
526f9b034763fd022a52fe84b2c3227c59a78df1

> FLIP-457: Improve Table/SQL Configuration for Flink 2.0
> ---
>
> Key: FLINK-35473
> URL: https://issues.apache.org/jira/browse/FLINK-35473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This is the parent task for 
> [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-06-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854068#comment-17854068
 ] 

Jane Chan edited comment on FLINK-35473 at 6/11/24 2:10 PM:


Fixed in master
93d49ff6eb9f61cb2450d0b25732f4d8923b840d,
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8,
fbacf22a057e52c06a10988c308dfb31afbbcb12,
6dbe7bf5c306551836ec89c70f9aaab317f55e10,
526f9b034763fd022a52fe84b2c3227c59a78df1


was (Author: qingyue):
Fixed in master 93d49ff6eb9f61cb2450d0b25732f4d8923b840d, 
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8, 
fbacf22a057e52c06a10988c308dfb31afbbcb12, 
6dbe7bf5c306551836ec89c70f9aaab317f55e10, 
526f9b034763fd022a52fe84b2c3227c59a78df1

> FLIP-457: Improve Table/SQL Configuration for Flink 2.0
> ---
>
> Key: FLINK-35473
> URL: https://issues.apache.org/jira/browse/FLINK-35473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This is the parent task for 
> [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35266) Add e2e tests for FlinkStateSnapshot CRs

2024-06-11 Thread Mate Czagany (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-35266:
-
Fix Version/s: (was: kubernetes-operator-1.9.0)

> Add e2e tests for FlinkStateSnapshot CRs
> 
>
> Key: FLINK-35266
> URL: https://issues.apache.org/jira/browse/FLINK-35266
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-06-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan resolved FLINK-35473.
---
Resolution: Fixed

Fixed in master 93d49ff6eb9f61cb2450d0b25732f4d8923b840d, 
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8, 
fbacf22a057e52c06a10988c308dfb31afbbcb12, 
6dbe7bf5c306551836ec89c70f9aaab317f55e10, 
526f9b034763fd022a52fe84b2c3227c59a78df1

> FLIP-457: Improve Table/SQL Configuration for Flink 2.0
> ---
>
> Key: FLINK-35473
> URL: https://issues.apache.org/jira/browse/FLINK-35473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This is the parent task for 
> [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35267) Create documentation for FlinkStateSnapshot CR

2024-06-11 Thread Mate Czagany (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-35267:
-
Fix Version/s: (was: kubernetes-operator-1.9.0)

> Create documentation for FlinkStateSnapshot CR
> --
>
> Key: FLINK-35267
> URL: https://issues.apache.org/jira/browse/FLINK-35267
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Priority: Major
>
> This should cover the new features and migration from the now deprecated 
> methods of taking snapshots.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35265) Implement FlinkStateSnapshot custom resource

2024-06-11 Thread Mate Czagany (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-35265:
-
Fix Version/s: (was: kubernetes-operator-1.9.0)

> Implement FlinkStateSnapshot custom resource
> 
>
> Key: FLINK-35265
> URL: https://issues.apache.org/jira/browse/FLINK-35265
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35263) FLIP-446: Kubernetes Operator State Snapshot CRD

2024-06-11 Thread Mate Czagany (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-35263:
-
Fix Version/s: (was: kubernetes-operator-1.9.0)

> FLIP-446: Kubernetes Operator State Snapshot CRD
> 
>
> Key: FLINK-35263
> URL: https://issues.apache.org/jira/browse/FLINK-35263
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Priority: Major
>
> Described in 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-446%3A+Kubernetes+Operator+State+Snapshot+CRD]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]

2024-06-11 Thread via GitHub


LadyForest closed pull request #24889: [Flink-35473][table] Improve Table/SQL 
Configuration for Flink 2.0
URL: https://github.com/apache/flink/pull/24889


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26951) Add HASH supported in SQL & Table API

2024-06-11 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854060#comment-17854060
 ] 

lincoln lee commented on FLINK-26951:
-

Thanks [~kartikeypant] for volunteering this!

Before implementation, we should make it clear that which hash algorithm will 
be used. This hash function is mainly for hive compatibility purpose, so we 
need to clarify the details to ensure it is compatible with hive.

> Add HASH supported in SQL & Table API
> -
>
> Key: FLINK-26951
> URL: https://issues.apache.org/jira/browse/FLINK-26951
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>
> Returns a hash value of the arguments.
> Syntax:
> {code:java}
> hash(expr1, ...) {code}
> Arguments:
>  * {{{}exprN{}}}: An expression of any type.
> Returns:
> An INTEGER.
> Examples:
> {code:java}
> > SELECT hash('Flink', array(123), 2);
>  -1321691492 {code}
> See more:
>  * [Spark|https://spark.apache.org/docs/latest/api/sql/index.html#hash]
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL

2024-06-11 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854054#comment-17854054
 ] 

lincoln lee commented on FLINK-12450:
-

Thank you everyone! Assigned to you [~kartikeypant]. 

> [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table 
> API and SQL
> ---
>
> Key: FLINK-12450
> URL: https://issues.apache.org/jira/browse/FLINK-12450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Assignee: Kartikey Pant
>Priority: Major
>  Labels: auto-unassigned, stale-assigned
>
> BIT_LSHIFT, Shifts a long number to the left
> BIT_RSHIFT, Shifts a long number to the right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL

2024-06-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-12450:
---

Assignee: Kartikey Pant  (was: Ran Tao)

> [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table 
> API and SQL
> ---
>
> Key: FLINK-12450
> URL: https://issues.apache.org/jira/browse/FLINK-12450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Assignee: Kartikey Pant
>Priority: Major
>  Labels: auto-unassigned, stale-assigned
>
> BIT_LSHIFT, Shifts a long number to the left
> BIT_RSHIFT, Shifts a long number to the right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35378) [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction

2024-06-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-35378:
---
Summary: [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate 
SinkFunction  (was: [FLIP-453] Promote Unified Sink API V2 to Public and 
Deprecate SinkFunc)

> [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction
> ---
>
> Key: FLINK-35378
> URL: https://issues.apache.org/jira/browse/FLINK-35378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Core
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=303794871&draftShareId=af4ace88-98b7-4a53-aece-cd67d2f91a15&;



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-11 Thread via GitHub


gaborgsomogyi commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634861389


##
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java:
##
@@ -285,7 +291,14 @@ private static KeyManagerFactory getKeyManagerFactory(
 : SecurityOptions.SSL_REST_KEY_PASSWORD,
 SecurityOptions.SSL_KEY_PASSWORD);
 
-KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+// do not use getAndCheckOption here as there is no fallback option 
and a default is
+// specified
+String keystoreType =
+internal
+? 
config.get(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE)
+: config.get(SecurityOptions.SSL_REST_KEYSTORE_TYPE);

Review Comment:
   Just a clarification for other reviewers, since there is default value it 
just doesn't make sense to provide fallback.



##
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java:
##
@@ -59,13 +68,35 @@ public TrustManager[] trustManagers() {
 .fingerprints(sslCertFingerprints)
 .build();
 
-trustManagerFactory.init(loadKeystore(sslTrustStore, 
sslTrustStorePassword));
+trustManagerFactory.init(
+loadKeystore(sslTrustStore, sslTrustStorePassword, 
sslTrustStoreType));
 return trustManagerFactory.getTrustManagers();
-} catch (GeneralSecurityException e) {
+} catch (GeneralSecurityException | IOException e) {
 // replicate exception handling from SSLEngineProvider
 throw new RemoteTransportException(
 "Server SSL connection could not be established because 
SSL context could not be constructed",
 e);
 }
 }
+
+@Override
+public KeyStore loadKeystore(String filename, String password) {
+try {
+return loadKeystore(filename, password, sslKeyStoreType);
+} catch (IOException | GeneralSecurityException e) {
+throw new RemoteTransportException(
+"Server SSL connection could not be established because 
SSL context could not be constructed",

Review Comment:
   Here we can be more specific: "Server SSL connection could not be 
established because keystore could not be loaded"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-11 Thread via GitHub


gaborgsomogyi commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634845033


##
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java:
##
@@ -59,13 +71,38 @@ public TrustManager[] trustManagers() {
 .fingerprints(sslCertFingerprints)
 .build();
 
-trustManagerFactory.init(loadKeystore(sslTrustStore, 
sslTrustStorePassword));
+trustManagerFactory.init(
+loadKeystore(sslTrustStore, sslTrustStorePassword, 
sslTrustStoreType));
 return trustManagerFactory.getTrustManagers();
-} catch (GeneralSecurityException e) {
+} catch (GeneralSecurityException | IOException e) {
 // replicate exception handling from SSLEngineProvider
 throw new RemoteTransportException(
 "Server SSL connection could not be established because 
SSL context could not be constructed",
 e);
 }
 }
+
+@Override
+public KeyStore loadKeystore(String filename, String password) {
+try {
+return loadKeystore(filename, password, sslKeyStoreType);
+} catch (IOException
+| CertificateException
+| NoSuchAlgorithmException
+| KeyStoreException e) {
+throw new RemoteTransportException(

Review Comment:
   I've checked out the code and played with it. I agree that's what we can do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35541) Introduce retry limiting for AWS connector sinks

2024-06-11 Thread Aleksandr Pilipenko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aleksandr Pilipenko updated FLINK-35541:

Affects Version/s: aws-connector-4.3.0

> Introduce retry limiting for AWS connector sinks
> 
>
> Key: FLINK-35541
> URL: https://issues.apache.org/jira/browse/FLINK-35541
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / AWS, Connectors / DynamoDB, Connectors / 
> Firehose, Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Priority: Major
>
> Currently if the record write operation in the sink consistently fails with 
> retriable error, sinks will retry indefinitely. In case when cause of the 
> error is not resolved this may lead to poison pill.
>  
> Proposal here is to add a configurable retry limit for each record. Users can 
> specify a maximum retry per record, and the sink will fail once the retry 
> limit is reached.
>  
> We will implement this for all AWS connectors:
>  * DDBSink
>  * Firehose Sink
>  * Kinesis Sink
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+
+private final CompositeType typeInfo;
+private final boolean ignoreNulls;
+private final TableSchema tableSchema;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) {
+this(typeInfo, true);
+}
+
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, 
boolean ignoreNulls) {
+
+try {
+this.typeInfo = typeInfo;
+this.ignoreNulls = ignoreNulls;
+this.tableSchema = createTableSchema(typeInfo);
+} catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+}
+}
+
+@Override
+public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(tableSchema.itemToMap(input, ignoreNulls))
+.build();
+} catch (ClassCastExcept

Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+
+private final CompositeType typeInfo;
+private final boolean ignoreNulls;
+private final TableSchema tableSchema;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) {
+this(typeInfo, true);
+}
+
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, 
boolean ignoreNulls) {
+
+try {
+this.typeInfo = typeInfo;
+this.ignoreNulls = ignoreNulls;
+this.tableSchema = createTableSchema(typeInfo);
+} catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+}
+}
+
+@Override
+public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(tableSchema.itemToMap(input, ignoreNulls))
+.build();
+} catch (ClassCastExcept

Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+
+private final CompositeType typeInfo;
+private final boolean ignoreNulls;
+private final TableSchema tableSchema;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) {
+this(typeInfo, true);
+}
+
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, 
boolean ignoreNulls) {
+
+try {
+this.typeInfo = typeInfo;
+this.ignoreNulls = ignoreNulls;
+this.tableSchema = createTableSchema(typeInfo);
+} catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+}
+}
+
+@Override
+public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(tableSchema.itemToMap(input, ignoreNulls))
+.build();
+} catch (ClassCastExcept

[PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]

2024-06-11 Thread via GitHub


qg-lin opened a new pull request, #3409:
URL: https://github.com/apache/flink-cdc/pull/3409

   https://issues.apache.org/jira/browse/FLINK-35540


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 [flink]

2024-06-11 Thread via GitHub


pan3793 commented on PR #24905:
URL: https://github.com/apache/flink/pull/24905#issuecomment-2160590805

   ping @JingGe, is there any thing I can do before merging?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {

Review Comment:
   I did not touch this method, I just moved the `getWriterType` above this to 
keep the method visibility order in the file intact, because that method became 
package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {

Review Comment:
   I did not touched this method, I just moved the `getWriterType` above this 
to keep the method visibility order in the file intact, because that method 
became package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634722169


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {
+uncompactedName = uncompactedName.substring(1);
+}
+return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review Comment:
   I did not touched this method, I just moved the getWriterType above this to 
keep the method visibility order in the file intact, because that method became 
package-private.



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {
+uncompactedName = uncompactedName.substring(1);
+}
+return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review Comment:
   I did not touch this method, I just moved the getWriterType above this to 
keep the method visibility order in the file intact, because that method became 
package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]

2024-06-11 Thread via GitHub


morazow commented on PR #24426:
URL: https://github.com/apache/flink/pull/24426#issuecomment-2160556166

   Thanks @HuangXingBo for the update!
   
   I have changed the GitHub actions to run the `auditwheel` repair after 
installing `patchelf` in Linux. Could you please have another look?
   
   You can find the build wheel artifacts for this change here: 
https://github.com/morazow/flink/actions/runs/9464510932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-06-11 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-34908.

Resolution: Fixed

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-06-11 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852752#comment-17852752
 ] 

Leonard Xu edited comment on FLINK-34908 at 6/11/24 11:23 AM:
--

master: e2ccc836a056c16974e4956190bdce249705b7ee

3.1: 1112987572e487a79a1bbecf460705aa6153e0bb


was (Author: leonard xu):
master: e2ccc836a056c16974e4956190bdce249705b7ee

3.1: todo

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3407:
URL: https://github.com/apache/flink-cdc/pull/3407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35545) Miss 3.1.0 version in snapshot flink-cdc doc version list

2024-06-11 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853114#comment-17853114
 ] 

Leonard Xu edited comment on FLINK-35545 at 6/11/24 11:22 AM:
--

master:4efb1d78ca778abeae142facfa99440f22a88b25

release-3.1:050c28649c0bd0068b5e7fe62331b257574572f2

release-3.0:a02b9dc019195ea317739a63738b0d5eaf1a6671


was (Author: leonard xu):
master:4efb1d78ca778abeae142facfa99440f22a88b25

release-3.1:93d5ee98da19bb878754bbc3780a3e23033ed331

release-3.0:a02b9dc019195ea317739a63738b0d5eaf1a6671

> Miss 3.1.0 version in snapshot flink-cdc doc version list
> -
>
> Key: FLINK-35545
> URL: https://issues.apache.org/jira/browse/FLINK-35545
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
> Attachments: image-2024-06-08-10-07-06-403.png, screenshot-1.png
>
>
> Link : [https://nightlies.apache.org/flink/flink-cdc-docs-master/]
> Miss 3.0.1 version in version list:
>  
> !screenshot-1.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3408:
URL: https://github.com/apache/flink-cdc/pull/3408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]

2024-06-11 Thread via GitHub


GOODBOY008 commented on PR #3408:
URL: https://github.com/apache/flink-cdc/pull/3408#issuecomment-2160480202

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854012#comment-17854012
 ] 

Leonard Xu commented on FLINK-35540:


master: c958daf8ab417d16fc7fc0b59f341e8f9cf372e7

release-3.1: TODO

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-35540:
--

Assignee: linqigeng

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-35540:
---
Fix Version/s: cdc-3.2.0
   cdc-3.1.1

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]

2024-06-11 Thread via GitHub


GOODBOY008 opened a new pull request, #3408:
URL: https://github.com/apache/flink-cdc/pull/3408

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35540][cdc-common][cdc-connector][mysql] fix lost table when database and table are with the same name [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3396:
URL: https://github.com/apache/flink-cdc/pull/3396


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35559) Shading issue cause class conflict

2024-06-11 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh resolved FLINK-35559.
-
Resolution: Fixed

> Shading issue cause class conflict
> --
>
> Key: FLINK-35559
> URL: https://issues.apache.org/jira/browse/FLINK-35559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose, Connectors / 
> Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: aws-connector-4.4.0
>
> Attachments: Screenshot 2024-06-08 at 18.19.30.png
>
>
> Incorrect shading configuration causes ClassCastException during exception 
> handling when job package flink-connector-kinesis with 
> flink-connector-aws-kinesis-firehose.
> {code:java}
> java.lang.ClassCastException: class 
> software.amazon.awssdk.services.firehose.model.FirehoseException cannot be 
> cast to class 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  (software.amazon.awssdk.services.firehose.model.FirehoseException and 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  are in unnamed module of loader 'app')
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>  ~[utils-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>  ~[sdk-core-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java

[jira] [Updated] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error

2024-06-11 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35567:
--
Fix Version/s: cdc-3.2.0
   (was: cdc-3.1.1)

> CDC BinaryWriter cast NullableSerializerWrapper error 
> --
>
> Key: FLINK-35567
> URL: https://issues.apache.org/jira/browse/FLINK-35567
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> Current, we will generate data type serializers by 
> org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]),
>  which will put into a 
> NullableSerializerWrapper.
> {code:java}
> //代码占位符
> public BinaryRecordDataGenerator(DataType[] dataTypes) {
> this(
> dataTypes,
> Arrays.stream(dataTypes)
> .map(InternalSerializers::create)
> .map(NullableSerializerWrapper::new)
> .toArray(TypeSerializer[]::new));
> } {code}
> However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast 
> NullableSerializerWrapper to 
> ArrayDataSerializer/TypeSerializer/TypeSerializer.
> A exception will be thrown:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be 
> cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer
>   at 
> org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134)
> at 
> org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error

2024-06-11 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35567:
-

 Summary: CDC BinaryWriter cast NullableSerializerWrapper error 
 Key: FLINK-35567
 URL: https://issues.apache.org/jira/browse/FLINK-35567
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Hongshun Wang
 Fix For: cdc-3.1.1


Current, we will generate data type serializers by 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]),
 which will put into a 
NullableSerializerWrapper.
{code:java}
//代码占位符
public BinaryRecordDataGenerator(DataType[] dataTypes) {
this(
dataTypes,
Arrays.stream(dataTypes)
.map(InternalSerializers::create)
.map(NullableSerializerWrapper::new)
.toArray(TypeSerializer[]::new));
} {code}
However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast 
NullableSerializerWrapper to 
ArrayDataSerializer/TypeSerializer/TypeSerializer.
A exception will be thrown:
{code:java}
java.lang.ClassCastException: 
org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be 
cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer
at 
org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134)
at 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error

2024-06-11 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35567:
--
Affects Version/s: cdc-3.1.1
   (was: cdc-3.2.0)

> CDC BinaryWriter cast NullableSerializerWrapper error 
> --
>
> Key: FLINK-35567
> URL: https://issues.apache.org/jira/browse/FLINK-35567
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.1.1
>
>
> Current, we will generate data type serializers by 
> org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]),
>  which will put into a 
> NullableSerializerWrapper.
> {code:java}
> //代码占位符
> public BinaryRecordDataGenerator(DataType[] dataTypes) {
> this(
> dataTypes,
> Arrays.stream(dataTypes)
> .map(InternalSerializers::create)
> .map(NullableSerializerWrapper::new)
> .toArray(TypeSerializer[]::new));
> } {code}
> However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast 
> NullableSerializerWrapper to 
> ArrayDataSerializer/TypeSerializer/TypeSerializer.
> A exception will be thrown:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be 
> cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer
>   at 
> org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134)
> at 
> org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35566) Consider promoting TypeSerializer from PublicEvolving to Public

2024-06-11 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-35566:
--

 Summary: Consider promoting TypeSerializer from PublicEvolving to 
Public
 Key: FLINK-35566
 URL: https://issues.apache.org/jira/browse/FLINK-35566
 Project: Flink
  Issue Type: Technical Debt
  Components: API / Core
Reporter: Martijn Visser


While working on implementing FLINK-35378, I ran into the problem that 
TypeSerializer is still on PublicEvolving since Flink 1.0. We should consider 
annotating this as Public. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset

2024-06-11 Thread Naci Simsek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Naci Simsek updated FLINK-35565:

Description: 
h2. Summary

Flink batch job gets into an infinite fetch loop and could not gracefully 
finish if the connected Kafka topic is empty and starting offset value in Flink 
job is lower than the current start/end offset of the related topic. See below 
for details:
h2. How to reproduce

Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
from Kafka topic.

Related Kafka topic is empty, there are no events, and the offset value is as 
below: *15*

!image-2024-06-11-11-19-09-889.png|width=895,height=256!

 

Flink job uses a *specific starting offset* value, which is +*less*+ than the 
current offset of the topic/partition.

See below, it set as “4”

 
{code:java}
package naci.grpId;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.Map;

public class KafkaSource_Print {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// Define the specific offsets for the partitions
Map specificOffsets = new HashMap<>();
specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start 
from offset 4 for partition 0

KafkaSource kafkaSource = KafkaSource
.builder()
.setBootstrapServers("localhost:9093")  // Make sure the port 
is correct
.setTopics("topic_test")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
.setBounded(OffsetsInitializer.latest())
.build();

DataStream stream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
stream.print();

env.execute("Flink KafkaSource test job");
}
}{code}
 

 

Here are the initial logs printed related to the offset, as soon as the job 
gets submitted:

 
{code:java}
2024-05-30 12:15:50,010 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]
2024-05-30 12:15:50,069 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]]
2024-05-30 12:15:50,074 TRACE 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
Seeking starting offsets to specified offsets: {topic_test-0=4}
2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer   
   [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
groupId=null] Seeking to offset 4 for partition topic_test-0
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
SplitsChange handling result: [topic_test-0, start:4, stop: 15]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task AddSplitsTask: [[[Partition: topic_test-0, 
StartingOffset: 4, StoppingOffset: 15]]]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask{code}
 

Since the starting offset {color:#ff}*4*{color} is *out of range* for the 
Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task 
manager logs:

 
{code:java}
2024-05-30 12:15:50,193 INFO  
org.apache.kafka.clients.consumer.internals.Fetcher  [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position 
FetchPosition{offset=4, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
null)], epoch=0}} is out of range for partition topic_test-0, resetting offset
2024-05-30 12:15:50,195 INFO  
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset for 
partition topic_test-0 to position FetchPosition{offset=15, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[nacis

  1   2   >