[jira] [Updated] (FLINK-35315) MemoryManagerConcurrentModReleaseTest executes more than 15 minutes

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35315:

Fix Version/s: 1.20.0

> MemoryManagerConcurrentModReleaseTest executes more than 15 minutes
> ---
>
> Key: FLINK-35315
> URL: https://issues.apache.org/jira/browse/FLINK-35315
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Tests
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.20.0
>
> Attachments: image-2024-05-09-11-53-10-037.png
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395=results]
>  
> It seems 
> MemoryManagerConcurrentModReleaseTest.testConcurrentModificationWhileReleasing
>  executes more than 15 minutes.
> The root cause may be {color:#e1dfdd}ConcurrentModificationException{color}
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=10060]
>  
> !image-2024-05-09-11-53-10-037.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35315) MemoryManagerConcurrentModReleaseTest executes more than 15 minutes

2024-05-08 Thread Rui Fan (Jira)
Rui Fan created FLINK-35315:
---

 Summary: MemoryManagerConcurrentModReleaseTest executes more than 
15 minutes
 Key: FLINK-35315
 URL: https://issues.apache.org/jira/browse/FLINK-35315
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network, Tests
Affects Versions: 1.20.0
Reporter: Rui Fan
 Attachments: image-2024-05-09-11-53-10-037.png

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395=results]

 

It seems 

MemoryManagerConcurrentModReleaseTest.testConcurrentModificationWhileReleasing 
executes more than 15 minutes.

The root cause may be {color:#e1dfdd}ConcurrentModificationException{color}

 

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=10060]

 

!image-2024-05-09-11-53-10-037.png!

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework [flink]

2024-05-08 Thread via GitHub


1996fanrui commented on code in PR #24653:
URL: https://github.com/apache/flink/pull/24653#discussion_r1594926835


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java:
##
@@ -232,7 +232,9 @@ public FsMergingCheckpointStorageAccess 
toFileMergingStorage(
 FileMergingSnapshotManager mergingSnapshotManager, Environment 
environment)
 throws IOException {
 return new FsMergingCheckpointStorageAccess(
-fileSystem,
+/* Multiple subtask/threads would share one output stream,
+SafetyNetWrapperFileSystem cannot be used to prevent different 
threads from interfering with each other when exiting. */
+
FileSystem.getUnguardedFileSystem(checkpointsDirectory.toUri()),

Review Comment:
   > FileMergingSnapshotManager takes this responsibility, 
FileMergingSnapshotManager would close all held physical files and streams when 
the task executor close/shutdown.
   
   Sorry, I'm not familiar with `FileMergingSnapshotManager`, thanks for the 
clarification. Sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34212][autoscaler] Autoscaler Standalone cleans up stopped jobs to prevent memory leaks [flink-kubernetes-operator]

2024-05-08 Thread via GitHub


1996fanrui commented on code in PR #824:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/824#discussion_r1594925851


##
flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java:
##
@@ -227,6 +229,160 @@ public void cleanup(JobID jobID) {
 }
 }
 
+@Test
+void testCleanupAfterStopped() throws Exception {
+var eventCollector = new TestingEventCollector>();
+
+var job1 = createJobAutoScalerContext();
+var job2 = createJobAutoScalerContext();
+var scaleCounter = new ConcurrentHashMap();
+var cleanupCounter = new ConcurrentHashMap();
+
+var jobList = new ArrayList>();
+
+try (var autoscalerExecutor =
+new StandaloneAutoscalerExecutor<>(
+new Configuration(),
+() -> jobList,
+eventCollector,
+new JobAutoScaler<>() {
+@Override
+public void scale(JobAutoScalerContext 
context) {
+scaleCounter.put(
+context.getJobKey(),
+
scaleCounter.getOrDefault(context.getJobKey(), 0) + 1);
+}
+
+@Override
+public void cleanup(JobID jobID) {
+cleanupCounter.put(
+jobID, 
cleanupCounter.getOrDefault(jobID, 0) + 1);
+}
+})) {
+
+// Test for empty job list.
+autoscalerExecutor.scaling();
+Thread.sleep(100);

Review Comment:
   Good suggestion!
   
   Updated with your idea, and I changed some code for 
`StandaloneAutoscalerExecutorTest#testCleanupForStoppedJobAfterScaling`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework [flink]

2024-05-08 Thread via GitHub


fredia commented on code in PR #24653:
URL: https://github.com/apache/flink/pull/24653#discussion_r1594925847


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java:
##
@@ -232,7 +232,9 @@ public FsMergingCheckpointStorageAccess 
toFileMergingStorage(
 FileMergingSnapshotManager mergingSnapshotManager, Environment 
environment)
 throws IOException {
 return new FsMergingCheckpointStorageAccess(
-fileSystem,
+/* Multiple subtask/threads would share one output stream,
+SafetyNetWrapperFileSystem cannot be used to prevent different 
threads from interfering with each other when exiting. */
+
FileSystem.getUnguardedFileSystem(checkpointsDirectory.toUri()),

Review Comment:
   > I'm thinking could checkpointsDirectory as the parameter of 
FsMergingCheckpointStorageAccess constructor?
   
    Agreed, I will update the constructor as your suggestion.
   
   > Also, I have a concern here: When we use GuardedFileSystem, all streams 
can be closed properly when the corresponding thread is exiting. After this 
change, if we use UnguardedFileSystem, it may cause stream leak.
   
   `FileMergingSnapshotManager`  takes this responsibility, 
`FileMergingSnapshotManager` would close all held physical files and streams 
when the task executor close/shutdown.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35293][hive] Hive source supports dynamic parallelism inference [flink]

2024-05-08 Thread via GitHub


flinkbot commented on PR #24764:
URL: https://github.com/apache/flink/pull/24764#issuecomment-2101872572

   
   ## CI report:
   
   * 263d2baa4e11fc1c11fb0efcec298ba5c79e8b4a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35293][hive] Hive source supports dynamic parallelism inference [flink]

2024-05-08 Thread via GitHub


SinBex commented on PR #24764:
URL: https://github.com/apache/flink/pull/24764#issuecomment-2101871143

   @zhuzhurk Could you please help to review this pr in your free time? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35314) Add Flink CDC pipeline transform user document

2024-05-08 Thread Wenkai Qi (Jira)
Wenkai Qi created FLINK-35314:
-

 Summary: Add Flink CDC pipeline transform user document
 Key: FLINK-35314
 URL: https://issues.apache.org/jira/browse/FLINK-35314
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: Wenkai Qi


The document outline is as follows:
 # Definition
 # Parameters
 # Metadata Fields
 # Functions
 # Example
 # Problem

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35293) FLIP-445: Support dynamic parallelism inference for HiveSource

2024-05-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35293:
---
Labels: pull-request-available  (was: )

> FLIP-445: Support dynamic parallelism inference for HiveSource
> --
>
> Key: FLINK-35293
> URL: https://issues.apache.org/jira/browse/FLINK-35293
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.20.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
>
> [FLIP-379|https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs]
>  introduces dynamic source parallelism inference, which, compared to static 
> inference, utilizes runtime information to more accurately determine the 
> source parallelism. The FileSource already possesses the capability for 
> dynamic parallelism inference. As a follow-up task to FLIP-379, this FLIP 
> plans to implement the dynamic parallelism inference interface for 
> HiveSource, and also switches the default static parallelism inference to 
> dynamic parallelism inference.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35293][hive] Hive source supports dynamic parallelism inference [flink]

2024-05-08 Thread via GitHub


SinBex opened a new pull request, #24764:
URL: https://github.com/apache/flink/pull/24764

   ## What is the purpose of the change
   
   
[FLIP-379](https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs)
 introduces dynamic source parallelism inference, which, compared to static 
inference, utilizes runtime information to more accurately determine the source 
parallelism. The FileSource already possesses the capability for dynamic 
parallelism inference. As a follow-up task to FLIP-379, this FLIP plans to 
implement the dynamic parallelism inference interface for HiveSource, and also 
switches the default static parallelism inference to dynamic parallelism 
inference.
   
   
   ## Brief change log
 - *Enable HiveSource with dynamic parallelism inference capability.*
 - *Switch the default parallelism inference mode of HiveSource to dynamic 
inference and introduce a new configuration 
'table.exec.hive.infer-source-parallelism.mode' to select the mode, while 
deprecating the original configuration 
'table.exec.hive.infer-source-parallelism'.*
   
   
   ## Verifying this change
 - *Added unit tests for HiveSource#inferParallelism.*
 - *Verify functional and performance changes on actual clusters through 
TPC-DS.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes )
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35270) Enrich information in logs, making it easier for debugging

2024-05-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-35270.
---
Resolution: Fixed

547e4b53ebe36c39066adcf3a98123a1f7890c15

> Enrich information in logs, making it easier for debugging
> --
>
> Key: FLINK-35270
> URL: https://issues.apache.org/jira/browse/FLINK-35270
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Haifei Chen
>Assignee: Haifei Chen
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.20.0
>
>
> Good logs helps debug a lot in production environment
> Therefore, it'll be better to show more information in logs  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35270]Enrich information in logs, making it easier for debugging [flink]

2024-05-08 Thread via GitHub


zhuzhurk merged PR #24747:
URL: https://github.com/apache/flink/pull/24747


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35270) Enrich information in logs, making it easier for debugging

2024-05-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-35270:
---

Assignee: Haifei Chen

> Enrich information in logs, making it easier for debugging
> --
>
> Key: FLINK-35270
> URL: https://issues.apache.org/jira/browse/FLINK-35270
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Haifei Chen
>Assignee: Haifei Chen
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.20.0
>
>
> Good logs helps debug a lot in production environment
> Therefore, it'll be better to show more information in logs  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35270) Enrich information in logs, making it easier for debugging

2024-05-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-35270:

Component/s: API / Core

> Enrich information in logs, making it easier for debugging
> --
>
> Key: FLINK-35270
> URL: https://issues.apache.org/jira/browse/FLINK-35270
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Haifei Chen
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.20.0
>
>
> Good logs helps debug a lot in production environment
> Therefore, it'll be better to show more information in logs  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35270) Enrich information in logs, making it easier for debugging

2024-05-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-35270:

Labels: pull-request-available starter  (was: pull-request-available)

> Enrich information in logs, making it easier for debugging
> --
>
> Key: FLINK-35270
> URL: https://issues.apache.org/jira/browse/FLINK-35270
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haifei Chen
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.20.0
>
>
> Good logs helps debug a lot in production environment
> Therefore, it'll be better to show more information in logs  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35270) Enrich information in logs, making it easier for debugging

2024-05-08 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-35270:

Fix Version/s: 1.20.0

> Enrich information in logs, making it easier for debugging
> --
>
> Key: FLINK-35270
> URL: https://issues.apache.org/jira/browse/FLINK-35270
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haifei Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Good logs helps debug a lot in production environment
> Therefore, it'll be better to show more information in logs  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35313) Add upsert changelog mode to avoid UPDATE_BEFORE records push down

2024-05-08 Thread JJJJude (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844843#comment-17844843
 ] 

ude edited comment on FLINK-35313 at 5/9/24 3:14 AM:
-

migrate from 
[https://github.com/apache/flink-cdc/issues/1898|https://github.com/apache/flink-cdc/issues/1898]


was (Author: wczhu):
migrate from[ 
https://github.com/apache/flink-cdc/issues/1898|https://github.com/apache/flink-cdc/issues/1898]

> Add upsert changelog mode to avoid UPDATE_BEFORE records push down
> --
>
> Key: FLINK-35313
> URL: https://issues.apache.org/jira/browse/FLINK-35313
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: ude
>Priority: Major
>
> I try to use flink sql to write mysql cdc-data into redis as a dimension 
> table for other business use. When executing {{UPDATE}} DML, the cdc-data 
> will be converted into {{-D (UPDATE_BEFORE)}} and {{+I (UPDATE_AFTER)}} two 
> records to sink redis. However, delete first will cause other data streams to 
> be lost(NULL) when join data, which is unacceptable.
> I think we can add support for [upser changelog 
> mode|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion]
>  by adding changelogMode option with mandatory primary key 
> configuration.Basically, with {{changelogMode=upsert}} we will avoid 
> {{UPDATE_BEFORE}} rows and we will require a primary key for the table.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35313) Add upsert changelog mode to avoid UPDATE_BEFORE records push down

2024-05-08 Thread JJJJude (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844843#comment-17844843
 ] 

ude commented on FLINK-35313:
-

migrate from[ 
https://github.com/apache/flink-cdc/issues/1898|https://github.com/apache/flink-cdc/issues/1898]

> Add upsert changelog mode to avoid UPDATE_BEFORE records push down
> --
>
> Key: FLINK-35313
> URL: https://issues.apache.org/jira/browse/FLINK-35313
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: ude
>Priority: Major
>
> I try to use flink sql to write mysql cdc-data into redis as a dimension 
> table for other business use. When executing {{UPDATE}} DML, the cdc-data 
> will be converted into {{-D (UPDATE_BEFORE)}} and {{+I (UPDATE_AFTER)}} two 
> records to sink redis. However, delete first will cause other data streams to 
> be lost(NULL) when join data, which is unacceptable.
> I think we can add support for [upser changelog 
> mode|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion]
>  by adding changelogMode option with mandatory primary key 
> configuration.Basically, with {{changelogMode=upsert}} we will avoid 
> {{UPDATE_BEFORE}} rows and we will require a primary key for the table.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35313) Add upsert changelog mode to avoid UPDATE_BEFORE records push down

2024-05-08 Thread JJJJude (Jira)
ude created FLINK-35313:
---

 Summary: Add upsert changelog mode to avoid UPDATE_BEFORE records 
push down
 Key: FLINK-35313
 URL: https://issues.apache.org/jira/browse/FLINK-35313
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: ude


I try to use flink sql to write mysql cdc-data into redis as a dimension table 
for other business use. When executing {{UPDATE}} DML, the cdc-data will be 
converted into {{-D (UPDATE_BEFORE)}} and {{+I (UPDATE_AFTER)}} two records to 
sink redis. However, delete first will cause other data streams to be 
lost(NULL) when join data, which is unacceptable.
I think we can add support for [upser changelog 
mode|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion]
 by adding changelogMode option with mandatory primary key 
configuration.Basically, with {{changelogMode=upsert}} we will avoid 
{{UPDATE_BEFORE}} rows and we will require a primary key for the table.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax [flink]

2024-05-08 Thread via GitHub


liyubin117 commented on PR #24763:
URL: https://github.com/apache/flink/pull/24763#issuecomment-2101830016

   @LadyForest Hi, I have finished the feature, Looking forward to your review, 
thanks :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34212][autoscaler] Autoscaler Standalone cleans up stopped jobs to prevent memory leaks [flink-kubernetes-operator]

2024-05-08 Thread via GitHub


RocMarshal commented on code in PR #824:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/824#discussion_r1594866016


##
flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java:
##
@@ -227,6 +229,160 @@ public void cleanup(JobID jobID) {
 }
 }
 
+@Test
+void testCleanupAfterStopped() throws Exception {
+var eventCollector = new TestingEventCollector>();
+
+var job1 = createJobAutoScalerContext();
+var job2 = createJobAutoScalerContext();
+var scaleCounter = new ConcurrentHashMap();
+var cleanupCounter = new ConcurrentHashMap();
+
+var jobList = new ArrayList>();
+
+try (var autoscalerExecutor =
+new StandaloneAutoscalerExecutor<>(
+new Configuration(),
+() -> jobList,
+eventCollector,
+new JobAutoScaler<>() {
+@Override
+public void scale(JobAutoScalerContext 
context) {
+scaleCounter.put(
+context.getJobKey(),
+
scaleCounter.getOrDefault(context.getJobKey(), 0) + 1);
+}
+
+@Override
+public void cleanup(JobID jobID) {
+cleanupCounter.put(
+jobID, 
cleanupCounter.getOrDefault(jobID, 0) + 1);
+}
+})) {
+
+// Test for empty job list.
+autoscalerExecutor.scaling();
+Thread.sleep(100);

Review Comment:
   Could we use the CompletableFuture to overwrite some lines like this one?
   
   I made a rough draft.
   
[draft-patch.txt](https://github.com/apache/flink-kubernetes-operator/files/15256511/draft-patch.txt)
   
   But I have to admit that the draft still is a bit of redundant. Because the 
draft use the `completableFuture` by introducing an useless `return value` of 
some methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax [flink]

2024-05-08 Thread via GitHub


flinkbot commented on PR #24763:
URL: https://github.com/apache/flink/pull/24763#issuecomment-2101826274

   
   ## CI report:
   
   * 4ff585eb2e42856f6cf6d76e5ff4ac39937d99b2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35040:

Attachment: (was: result5.html)

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35040:

Attachment: (was: result6.html)

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35040:

Attachment: (was: result4.html)

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35040:

Attachment: (was: result2.html)

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35040:

Attachment: (was: result3.html)

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35040:

Attachment: (was: result1.html)

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35164) Support `ALTER CATALOG RESET` syntax

2024-05-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35164:
---
Labels: pull-request-available  (was: )

> Support `ALTER CATALOG RESET` syntax
> 
>
> Key: FLINK-35164
> URL: https://issues.apache.org/jira/browse/FLINK-35164
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-18-23-26-59-854.png
>
>
> h3. ALTER CATALOG catalog_name RESET (key1, key2, ...)
> Reset one or more properties to its default value in the specified catalog.
> !image-2024-04-18-23-26-59-854.png|width=781,height=527!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax [flink]

2024-05-08 Thread via GitHub


liyubin117 opened a new pull request, #24763:
URL: https://github.com/apache/flink/pull/24763

   ## What is the purpose of the change
   
   Reset one or more properties to its default value in the specified catalog.
   
   ## Brief change log
   
   * ALTER CATALOG catalog_name SET (key1=val1, ...)
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
   flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35040:

Attachment: result4.html
result5.html
result1.html
result2.html
result6.html
result3.html

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, result1.html, result2.html, result3.html, 
> result4.html, result5.html, result6.html, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34916) Support `ALTER CATALOG SET` syntax

2024-05-08 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan resolved FLINK-34916.
---
Fix Version/s: 1.20.0
   Resolution: Fixed

Fixed in master 4611817591c38019c27ffad6d8cdc68292f079a4

> Support `ALTER CATALOG SET` syntax
> --
>
> Key: FLINK-34916
> URL: https://issues.apache.org/jira/browse/FLINK-34916
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-03-22-18-30-33-182.png
>
>
> Set one or more properties in the specified catalog. If a particular property 
> is already set in the catalog, override the old value with the new one.
> !image-2024-03-22-18-30-33-182.png|width=736,height=583!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34916) Support `ALTER CATALOG SET` syntax

2024-05-08 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan closed FLINK-34916.
-

> Support `ALTER CATALOG SET` syntax
> --
>
> Key: FLINK-34916
> URL: https://issues.apache.org/jira/browse/FLINK-34916
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-03-22-18-30-33-182.png
>
>
> Set one or more properties in the specified catalog. If a particular property 
> is already set in the catalog, override the old value with the new one.
> !image-2024-03-22-18-30-33-182.png|width=736,height=583!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34916][table] Support `ALTER CATALOG SET` syntax [flink]

2024-05-08 Thread via GitHub


LadyForest merged PR #24735:
URL: https://github.com/apache/flink/pull/24735


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35312) Insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_

2024-05-08 Thread vmaster.cc (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844837#comment-17844837
 ] 

vmaster.cc commented on FLINK-35312:


> An exception occurred in the change event producer. This connector will be 
> restarted

After restart, this may lead to following error?

2024-05-07 15:11:46.919 [debezium-engine] ERROR 
com.ververica.cdc.debezium.internal.Handover - Reporting error:
java.lang.IllegalStateException: Retrieve schema history failed, the schema 
records for engine 040e0553-c4b1-41aa-a05b-ec14759e16b5 has been removed, this 
might because the debezium engine has been shutdown due to other errors.
at 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory(DatabaseHistoryUtil.java:77)
 ~[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory.configure(FlinkDatabaseSchemaHistory.java:82)
 ~[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.getDatabaseHistory(HistorizedRelationalDatabaseConnectorConfig.java:105)
 ~[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
io.debezium.relational.HistorizedRelationalDatabaseSchema.(HistorizedRelationalDatabaseSchema.java:39)
 ~[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
io.debezium.connector.sqlserver.SqlServerDatabaseSchema.(SqlServerDatabaseSchema.java:34)
 ~[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:84)
 ~[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130) 
~[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:207)
 ~[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:148) 
~[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:788) 
[flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
 [flink-sql-connector-sqlserver-cdc.jar:2.2.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_382]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_382]

> Insufficient number of arguments were supplied for the procedure or function 
> cdc.fn_cdc_get_all_changes_
> 
>
> Key: FLINK-35312
> URL: https://issues.apache.org/jira/browse/FLINK-35312
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> h3. Flink version
> 1.17.0
> h3. Flink CDC version
> 2.4.1
> h3. Database and its version
> sql server 2014
> h3. Minimal reproduce step
> 1
> h3. What did you expect to see?
> Caused by: java.lang.RuntimeException: SplitFetcher thread 22 received 
> unexpected exception while polling the records
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Caused by: org.apache.kafka.connect.errors.RetriableException: An exception 
> occurred in the change event producer. This connector will be restarted.
> at 
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
> at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:458)
> at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138)
> at 
> com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$LsnSplitReadTask.execute(SqlServerStreamFetchTask.java:161)
> at 
> com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask.execute(SqlServerScanFetchTask.java:123)
> at 
> com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:95)
> ... 5 more
> Caused by: 

Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


loserwang1024 commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1594884935


##
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java:
##
@@ -71,7 +72,9 @@ public void translate(
 }
 }
 
-private void sinkTo(
+/** Only visible for test */
+@VisibleForTesting
+protected void sinkTo(

Review Comment:
   Thanks for your advice, it seems better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35312) Insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_

2024-05-08 Thread yux (Jira)
yux created FLINK-35312:
---

 Summary: Insufficient number of arguments were supplied for the 
procedure or function cdc.fn_cdc_get_all_changes_
 Key: FLINK-35312
 URL: https://issues.apache.org/jira/browse/FLINK-35312
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


h3. Flink version

1.17.0
h3. Flink CDC version

2.4.1
h3. Database and its version

sql server 2014
h3. Minimal reproduce step

1
h3. What did you expect to see?

Caused by: java.lang.RuntimeException: SplitFetcher thread 22 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.connect.errors.RetriableException: An exception 
occurred in the change event producer. This connector will be restarted.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:458)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138)
at 
com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$LsnSplitReadTask.execute(SqlServerStreamFetchTask.java:161)
at 
com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask.execute(SqlServerScanFetchTask.java:123)
at 
com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:95)
... 5 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: An insufficient 
number of arguments were supplied for the procedure or function 
cdc.fn_cdc_get_all_changes_ ... .
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:265)
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5471)
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1794)
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1052)
at 
io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:269)
at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606)
at 
io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:251)
... 9 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework [flink]

2024-05-08 Thread via GitHub


1996fanrui commented on code in PR #24653:
URL: https://github.com/apache/flink/pull/24653#discussion_r1594880568


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java:
##
@@ -232,7 +232,9 @@ public FsMergingCheckpointStorageAccess 
toFileMergingStorage(
 FileMergingSnapshotManager mergingSnapshotManager, Environment 
environment)
 throws IOException {
 return new FsMergingCheckpointStorageAccess(
-fileSystem,
+/* Multiple subtask/threads would share one output stream,
+SafetyNetWrapperFileSystem cannot be used to prevent different 
threads from interfering with each other when exiting. */
+
FileSystem.getUnguardedFileSystem(checkpointsDirectory.toUri()),

Review Comment:
   Also, I have a concern here: When we use `GuardedFileSystem`, all streams 
can be closed properly when the corresponding thread is exiting. After this 
change, if we use `UnguardedFileSystem`, it may cause stream leak.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


loserwang1024 commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1594878296


##
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.translator;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/** A test for {@link DataSinkTranslator} */
+public class DataSinkTranslatorTest {
+
+@Test
+public void testPreWriteWithoutCommitSink() {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ArrayList mockEvents = Lists.newArrayList(new EmptyEvent(), new 
EmptyEvent());
+DataStreamSource inputStream = env.fromCollection(mockEvents);
+DataSinkTranslator translator = new DataSinkTranslator();
+
+String uid = "";

Review Comment:
   I've tried it before, but it shows that : 
   java.lang.IllegalArgumentException: Node hash must be a 32 character String 
that describes a hex code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework [flink]

2024-05-08 Thread via GitHub


1996fanrui commented on code in PR #24653:
URL: https://github.com/apache/flink/pull/24653#discussion_r1594875205


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java:
##
@@ -232,7 +232,9 @@ public FsMergingCheckpointStorageAccess 
toFileMergingStorage(
 FileMergingSnapshotManager mergingSnapshotManager, Environment 
environment)
 throws IOException {
 return new FsMergingCheckpointStorageAccess(
-fileSystem,
+/* Multiple subtask/threads would share one output stream,
+SafetyNetWrapperFileSystem cannot be used to prevent different 
threads from interfering with each other when exiting. */
+
FileSystem.getUnguardedFileSystem(checkpointsDirectory.toUri()),

Review Comment:
   I'm thinking could `checkpointsDirectory` as the parameter of 
FsMergingCheckpointStorageAccess constructor?And we initialize the fileSystem 
inside of FsMergingCheckpointStorageAccess constructor. It can avoid all 
callers pass a GuardedFileSystem.
   
   WDYT?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -483,27 +485,42 @@ protected StreamTask(
 }
 
 this.systemTimerService = createTimerService("System Time Trigger 
for " + getName());
-
+final CheckpointStorageAccess finalCheckpointStorageAccess = 
checkpointStorageAccess;
+
+ChannelStateWriter channelStateWriter =
+configuration.isUnalignedCheckpointsEnabled()
+? openChannelStateWriter(
+getName(),
+() -> {
+if (finalCheckpointStorageAccess
+instanceof 
FsMergingCheckpointStorageAccess) {
+return 
finalCheckpointStorageAccess;
+} else {
+return 
checkpointStorage.createCheckpointStorage(

Review Comment:
   Would you mind adding some comments here to explain why we don't pass 
`CheckpointStorageAccess` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35291] Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme [flink-cdc]

2024-05-08 Thread via GitHub


lzshlzsh commented on PR #3289:
URL: https://github.com/apache/flink-cdc/pull/3289#issuecomment-2101795518

   @ruanhang1993 Would you help have a look at this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33463) Support the implementation of dynamic source tables based on the new source

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-33463.
-
Fix Version/s: jdbc-3.3.0
   Resolution: Fixed

> Support the implementation of dynamic source tables based on the new source
> ---
>
> Key: FLINK-33463
> URL: https://issues.apache.org/jira/browse/FLINK-33463
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / JDBC
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Major
>  Labels: pull-request-available
> Fix For: jdbc-3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33463) Support the implementation of dynamic source tables based on the new source

2024-05-08 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844836#comment-17844836
 ] 

Rui Fan commented on FLINK-33463:
-

Merged to main(3.3.0) via:
 * bde28e6a92ffa75ae45bc8df6be55d299ff995a2
 * 50e8d2b16c31420bad475b629b4ab0bf4218a1f1

> Support the implementation of dynamic source tables based on the new source
> ---
>
> Key: FLINK-33463
> URL: https://issues.apache.org/jira/browse/FLINK-33463
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / JDBC
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33463][Connector/JDBC] Support the implementation of dynamic source tables based on the new source [flink-connector-jdbc]

2024-05-08 Thread via GitHub


1996fanrui merged PR #117:
URL: https://github.com/apache/flink-connector-jdbc/pull/117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34212][autoscaler] Autoscaler Standalone cleans up stopped jobs to prevent memory leaks [flink-kubernetes-operator]

2024-05-08 Thread via GitHub


RocMarshal commented on code in PR #824:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/824#discussion_r1594866016


##
flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java:
##
@@ -227,6 +229,160 @@ public void cleanup(JobID jobID) {
 }
 }
 
+@Test
+void testCleanupAfterStopped() throws Exception {
+var eventCollector = new TestingEventCollector>();
+
+var job1 = createJobAutoScalerContext();
+var job2 = createJobAutoScalerContext();
+var scaleCounter = new ConcurrentHashMap();
+var cleanupCounter = new ConcurrentHashMap();
+
+var jobList = new ArrayList>();
+
+try (var autoscalerExecutor =
+new StandaloneAutoscalerExecutor<>(
+new Configuration(),
+() -> jobList,
+eventCollector,
+new JobAutoScaler<>() {
+@Override
+public void scale(JobAutoScalerContext 
context) {
+scaleCounter.put(
+context.getJobKey(),
+
scaleCounter.getOrDefault(context.getJobKey(), 0) + 1);
+}
+
+@Override
+public void cleanup(JobID jobID) {
+cleanupCounter.put(
+jobID, 
cleanupCounter.getOrDefault(jobID, 0) + 1);
+}
+})) {
+
+// Test for empty job list.
+autoscalerExecutor.scaling();
+Thread.sleep(100);

Review Comment:
   Could we use the CompletableFuture to overwrite some lines like this one?
   
   I made a rough draft.
   
[draft-patch.txt](https://github.com/apache/flink-kubernetes-operator/files/15256511/draft-patch.txt)
   
   But I have to admit that the draft still is a bit of redundant. Because the 
draft use the `completableFuture` by introducing a useless `return value` of 
some methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [mysql-cdc] Add upsert changelog mode to avoid UPDATE_BEFORE records … [flink-cdc]

2024-05-08 Thread via GitHub


yuxiqian commented on PR #1907:
URL: https://github.com/apache/flink-cdc/pull/1907#issuecomment-2101764510

   Hi @yeezychao, seems MySQL CI job is failing, could you please take a look? 
https://github.com/apache/flink-cdc/actions/runs/8844257194/job/24756979040?pr=1907
   
   (The markdown lint check is irrelevant to this PR and should be fixed with a 
`git rebase master`.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35232) Support for retry settings on GCS connector

2024-05-08 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-35232.

Fix Version/s: 1.20.0
   Resolution: Done

master (1.20): f2c0c3ddcdd78c1e2876087139e56534fe3f8421

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Vikas M
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * 
> [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
>  * 
> [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
>  * 
> [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
>  * 
> [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
>  * 
> [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]
>  
> Basically the proposal is to be able to tune the timeout via multiplier, 
> maxAttemts + totalTimeout mechanisms.
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector

2024-05-08 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-35232:
-
Affects Version/s: (was: 1.15.3)
   (was: 1.16.2)
   (was: 1.17.1)
   (was: 1.19.0)
   (was: 1.18.1)

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Vikas M
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * 
> [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
>  * 
> [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
>  * 
> [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
>  * 
> [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
>  * 
> [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]
>  
> Basically the proposal is to be able to tune the timeout via multiplier, 
> maxAttemts + totalTimeout mechanisms.
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35232] Add retry settings for GCS connector [flink]

2024-05-08 Thread via GitHub


xintongsong closed pull request #24753: [FLINK-35232] Add retry settings for 
GCS connector
URL: https://github.com/apache/flink/pull/24753


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Module 1 labs - Enrichment and Filtering [flink-training]

2024-05-08 Thread via GitHub


manoellins opened a new pull request, #80:
URL: https://github.com/apache/flink-training/pull/80

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row type [flink]

2024-05-08 Thread via GitHub


ukby1234 commented on PR #24029:
URL: https://github.com/apache/flink/pull/24029#issuecomment-2101240856

   Yes I would love to have review/test on this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.18][FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]

2024-05-08 Thread via GitHub


XComp commented on PR #24604:
URL: https://github.com/apache/flink/pull/24604#issuecomment-2101181186

   I verified that the Minio-based and non-s3 tests ran in CI 
([#1](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58725=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=2833),
 
[#2](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58725=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=3263),
 
[#3](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58725=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=3906),
 
[#4](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58725=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=4249)).
 No odd logging was monitored.  The review happened in the parent PR. No 
conflict appeared during backport.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.19][FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]

2024-05-08 Thread via GitHub


XComp commented on PR #24605:
URL: https://github.com/apache/flink/pull/24605#issuecomment-2101172459

   I verified that the Minio-based and non-s3 tests ran in CI 
([#1](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58726=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=3229),
 
[#2](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58726=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=4047),
 
[#3](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58726=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=4423),
 
[#4](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58726=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=5080))
  The review happened in the parent PR. No conflict appeared during backport.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row type [flink]

2024-05-08 Thread via GitHub


ViktorCosenza commented on PR #24029:
URL: https://github.com/apache/flink/pull/24029#issuecomment-2101016302

   Hi, we're having this exact issue in our team and found this PR fixing it, 
is there a expected timeframe for this to be merged?
   If any help is needed to test/review this I'm happy to help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Lab 1 with filtering and enrichment [flink-training]

2024-05-08 Thread via GitHub


manoellins closed pull request #79: Lab 1 with filtering and enrichment
URL: https://github.com/apache/flink-training/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] flink parquet writer support write nested array or map type [flink]

2024-05-08 Thread via GitHub


ViktorCosenza commented on PR #23881:
URL: https://github.com/apache/flink/pull/23881#issuecomment-2100970888

   Hi, I'm having a issue directly related to the empty "write" methods when 
trying to write a Parquet with an array of row.
   
   I've noticed you fixed the issue for the Map and Array Writer type, but not 
for `RowWriter`. So if you wanted to fix it in there to it's probably a very 
similar fix and in the same file.
   
   
https://github.com/apache/flink/pull/23881/files#diff-268b0e6102435b8b1cbde023bd8842beebf677c25cfeeab64a0c92ab182d6cceR486
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-08 Thread via GitHub


gyfora commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1594287283


##
flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java:
##
@@ -74,7 +75,7 @@ public Configuration getFlinkConfiguration() {
 }
 
 @Override
-public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) {
+public CompletableFuture submitJob(@Nonnull JobGraph jobGraph, 
Pipeline pipeline) {

Review Comment:
   We discussed this with @HuangZhenQiu offline and we should remove this api 
change and the lineage logic will be applied on the future instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35270]Enrich information in logs, making it easier for debugging [flink]

2024-05-08 Thread via GitHub


HCTommy commented on PR #24747:
URL: https://github.com/apache/flink/pull/24747#issuecomment-2100853984

   Hi @zhuzhurk  , kooking for your review again. Tkanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35311) FLIP-454: New Apicurio Avro format

2024-05-08 Thread david radley (Jira)
david radley created FLINK-35311:


 Summary: FLIP-454: New Apicurio Avro format
 Key: FLINK-35311
 URL: https://issues.apache.org/jira/browse/FLINK-35311
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.18.1, 1.19.0, 1.17.2
Reporter: david radley
 Fix For: 2.0.0, 1.20.0


This Jira is for the accepted 
[FLIP-454|https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format].
 It involves changes to 2 repositories, core Flink and the Flink Kafka 
connector  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35310) Replace RBAC verb wildcards with actual verbs

2024-05-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35310:
---
Labels: pull-request-available  (was: )

> Replace RBAC verb wildcards with actual verbs
> -
>
> Key: FLINK-35310
> URL: https://issues.apache.org/jira/browse/FLINK-35310
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
> Environment: Running on Kubernetes using the flink-operator version 
> 1.8.0
>Reporter: Tim
>Priority: Major
>  Labels: pull-request-available
>
> We are deploying the flink operator on a managed Kubernetes cluster which 
> utilizes [Kyverno Policy Management|https://kyverno.io/] and all it's default 
> rules. Not complying to certain rules, leads to a restriction in deploying.
> As we are using Helm to build the manifest files (which is super useful) I 
> recognized that in the RBAC template "wildcards" are being used for all verbs 
> ("*").
> This violates the following Kyverno ruleset: 
> [https://kyverno.io/policies/other/restrict-wildcard-verbs/restrict-wildcard-verbs/]
> Besides that I think that it would also be cleaner to explicitly list the 
> needed verbs instead of just using the star symbol as a wildcard.
> I have already attempted to change this in a fork as a demonstration how it 
> could be changed to be conform. Please take a look and I would greatly 
> appreciate a change in that direction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35310] Replace RBAC verb wildcards with actual verbs [flink-kubernetes-operator]

2024-05-08 Thread via GitHub


timsn opened a new pull request, #826:
URL: https://github.com/apache/flink-kubernetes-operator/pull/826

   
   
   ## What is the purpose of the change
   
   Replace all wildcards in the RBAC HELM template of the flink-operator with 
it's proper verb names.
   
   
   ## Brief change log
   
 - replaced every "*" with a list of verb names suitable to the according 
sections
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35310) Replace RBAC verb wildcards with actual verbs

2024-05-08 Thread Tim (Jira)
Tim created FLINK-35310:
---

 Summary: Replace RBAC verb wildcards with actual verbs
 Key: FLINK-35310
 URL: https://issues.apache.org/jira/browse/FLINK-35310
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
 Environment: Running on Kubernetes using the flink-operator version 
1.8.0
Reporter: Tim


We are deploying the flink operator on a managed Kubernetes cluster which 
utilizes [Kyverno Policy Management|https://kyverno.io/] and all it's default 
rules. Not complying to certain rules, leads to a restriction in deploying.

As we are using Helm to build the manifest files (which is super useful) I 
recognized that in the RBAC template "wildcards" are being used for all verbs 
("*").

This violates the following Kyverno ruleset: 
[https://kyverno.io/policies/other/restrict-wildcard-verbs/restrict-wildcard-verbs/]

Besides that I think that it would also be cleaner to explicitly list the 
needed verbs instead of just using the star symbol as a wildcard.

I have already attempted to change this in a fork as a demonstration how it 
could be changed to be conform. Please take a look and I would greatly 
appreciate a change in that direction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-08 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1594070565


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+Set allPartitionKeys =
+new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+Set unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+unknownPartitionKeys.removeAll(allPartitionKeys);
+if (!unknownPartitionKeys.isEmpty()) {
+throw new TableException(
+String.format(
+"The partition spec contains unknown partition 
keys: %s.",
+unknownPartitionKeys));
+}
+
+// Set job name, runtime mode, checkpoint interval
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT INTO %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.orElseThrow(() -> new TableException("Could not 
happen")));
+}
+
+try {
+// return jobId for one time refresh, user should get the refresh 
job info via desc

Review Comment:
   I want to notice the developer the information about the syntax "alter 
materialized table ... refresh ..." returns before. I'll delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-08 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1594064792


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(

Review Comment:
   Got it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35215) The performance of serializerKryo and serializerKryoWithoutRegistration are regressed

2024-05-08 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844662#comment-17844662
 ] 

Kenneth William Krugler commented on FLINK-35215:
-

Hi [~fanrui] - I added a comment to the PR.

> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed
> -
>
> Key: FLINK-35215
> URL: https://issues.apache.org/jira/browse/FLINK-35215
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-25-14-57-55-231.png, 
> image-2024-04-25-15-00-32-410.png
>
>
> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed[1][2], I checked recent commits, and found FLINK-34954 changed 
> related logic.
>  
> [1] 
> [http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryo=on=on=off=3=50]
> [2] 
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryoWithoutRegistration=on=on=off=3=50
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35215][core] Fix the bug when Kryo serialize length is 0 [flink]

2024-05-08 Thread via GitHub


kkrugler commented on PR #24717:
URL: https://github.com/apache/flink/pull/24717#issuecomment-2100600574

   I'm leaving in 2 days for a 4+ month backpack trip (hiking the PCT), 
otherwise what I'd do is try to hook up a test that uses JMH to accurately 
assess the impact of the change. To respond to the particular comment on why 
performance changed:
   
   > we call bytesRead < count twice
   
   The overhead of two comparisons, versus even one call to `inputStream.read`, 
should be miniscule.
   
   > I'm not sure whether 
[FLINK-34954](https://issues.apache.org/jira/browse/FLINK-34954) breaks any JIT 
optimization
   
   If a JIT optimization would result in skipping calls to `inputStream.read`, 
then yes that could have an impact. But that would be a change in logic.
   
   The one thing I see is in the change to `readBytes()`, where if it's often 
called with a count of 0, then your change skips the `try/catch` block, which 
is non-trivial. But I thought the fix was to avoid a failure for this exact 
case, so it shouldn't be happening in the current benchmark code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27741][table-planner] Fix NPE when use dense_rank() and rank()… [flink]

2024-05-08 Thread via GitHub


snuyanzin commented on PR #19797:
URL: https://github.com/apache/flink/pull/19797#issuecomment-2100597095

   @chenzihao5 thanks for your contribution
   and sorry for the late reply
   
   May i ask you to rebase and then we can make another review iteration?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35198) Support the execution of refresh materialized table

2024-05-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35198:
---
Labels: pull-request-available  (was: )

> Support the execution of refresh materialized table
> ---
>
> Key: FLINK-35198
> URL: https://issues.apache.org/jira/browse/FLINK-35198
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {code:SQL}
> ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH
>  [PARTITION (key1=val1, key2=val2, ...)]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-08 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1594018523


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java:
##
@@ -58,4 +58,9 @@ public void unparse(SqlWriter writer, int leftPrec, int 
rightPrec) {
 writer, getOperator().getLeftPrec(), 
getOperator().getRightPrec());
 }
 }
+
+@Nullable

Review Comment:
   I just notice that in construct, `partitionSpec` is tagged `@Nullable`. I'll 
update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35199) Support the execution of create materialized table in full refresh mode

2024-05-08 Thread Yubin Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844652#comment-17844652
 ] 

Yubin Li commented on FLINK-35199:
--

[~lsy] Hi, This indeed is a valuable feature, Could I take this ?

> Support the execution of create materialized table in full refresh mode
> ---
>
> Key: FLINK-35199
> URL: https://issues.apache.org/jira/browse/FLINK-35199
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>
> In full refresh mode, support creates materialized table and its background 
> refresh workflow:
> {code:SQL}
> CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
>  
> [ ([  ]) ]
>  
> [COMMENT table_comment]
>  
> [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
>  
> [WITH (key1=val1, key2=val2, ...)]
>  
> FRESHNESS = INTERVAL '' { SECOND | MINUTE | HOUR | DAY }
>  
> [REFRESH_MODE = { CONTINUOUS | FULL }]
>  
> AS 
>  
> :
>   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Custom delta ingress embedded [flink-statefun]

2024-05-08 Thread via GitHub


mohammadmahdihn closed pull request #342: Custom delta ingress embedded
URL: https://github.com/apache/flink-statefun/pull/342


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


pvary commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1593954989


##
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.translator;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/** A test for {@link DataSinkTranslator} */
+public class DataSinkTranslatorTest {
+
+@Test
+public void testPreWriteWithoutCommitSink() {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ArrayList mockEvents = Lists.newArrayList(new EmptyEvent(), new 
EmptyEvent());
+DataStreamSource inputStream = env.fromCollection(mockEvents);
+DataSinkTranslator translator = new DataSinkTranslator();
+
+String uid = "";
+MockPreWriteWithoutCommitSink mockPreWriteWithoutCommitSink =
+new MockPreWriteWithoutCommitSink(uid);
+translator.sinkTo(
+inputStream,
+mockPreWriteWithoutCommitSink,
+"testPreWriteWithoutCommitSink",
+new OperatorID());
+OneInputTransformation oneInputTransformation =
+(OneInputTransformation) env.getTransformations().get(0);
+Transformation reblanceTransformation = 
oneInputTransformation.getInputs().get(0);
+Assert.assertEquals(uid, 
reblanceTransformation.getUserProvidedNodeHash());

Review Comment:
   Maybe a comment, like:
   ```
   // Check if the `addPreWriteTopology` is called, and the uid is set when the 
transformation added
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


pvary commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1593952991


##
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.translator;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/** A test for {@link DataSinkTranslator} */
+public class DataSinkTranslatorTest {
+
+@Test
+public void testPreWriteWithoutCommitSink() {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ArrayList mockEvents = Lists.newArrayList(new EmptyEvent(), new 
EmptyEvent());
+DataStreamSource inputStream = env.fromCollection(mockEvents);
+DataSinkTranslator translator = new DataSinkTranslator();
+
+String uid = "";

Review Comment:
   nit: Maybe some more descriptive content, like
   
   ```
   String uid = "Uid set by the addPreWriteTopology topology";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


pvary commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1593951427


##
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java:
##
@@ -71,7 +72,9 @@ public void translate(
 }
 }
 
-private void sinkTo(
+/** Only visible for test */
+@VisibleForTesting
+protected void sinkTo(

Review Comment:
   nit: The comment says the same, as the annotation, so it is not needed.
   We can leave this as package private (slightly lower privileges than 
`protected`.
   
   ```
   @VisibleForTesting
   void sinkTo(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35158) Error handling in StateFuture's callback

2024-05-08 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844638#comment-17844638
 ] 

Yanfei Lei commented on FLINK-35158:


pr [24698|https://github.com/apache/flink/pull/24698] closed via bb0f442

pr [24702|https://github.com/apache/flink/pull/24702] merged via 8475d28

> Error handling in StateFuture's callback
> 
>
> Key: FLINK-35158
> URL: https://issues.apache.org/jira/browse/FLINK-35158
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35158) Error handling in StateFuture's callback

2024-05-08 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei resolved FLINK-35158.

Resolution: Resolved

> Error handling in StateFuture's callback
> 
>
> Key: FLINK-35158
> URL: https://issues.apache.org/jira/browse/FLINK-35158
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35112][python] Fix membership for Row class PyFlink [flink]

2024-05-08 Thread via GitHub


wzorgdrager commented on code in PR #24756:
URL: https://github.com/apache/flink/pull/24756#discussion_r1593928221


##
flink-python/pyflink/common/types.py:
##
@@ -177,7 +177,10 @@ def of_kind(row_kind: RowKind, *args, **kwargs):
 return row
 
 def __contains__(self, item):
-return item in self._values
+if hasattr(self, "_fields"):
+return item in self._fields

Review Comment:
   @dianfu I agree. I added the change now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-08 Thread via GitHub


fredia closed pull request #24698: [FLINK-35158][runtime] Error handling in 
StateFuture's callback
URL: https://github.com/apache/flink/pull/24698


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-08 Thread via GitHub


xintongsong commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1593914163


##
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/NonPartitionedRuntimeContext.java:
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.context;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * A {@link NonPartitionedRuntimeContext} contains all execution information 
unrelated to partition.
+ */
+@Experimental
+public interface NonPartitionedRuntimeContext {}

Review Comment:
   The contexts become confusing.
   
   There are currently 5 contexts, and there relationship and differences are 
unclear.
   - RuntimeContext
   - PartitionedRuntimeContext
   - NonPartitionedRuntimeContext
   - NonPartitionedContext
   - TwoOutputNonPartitionedContext
   
   I think the information / functionalities provided by these contexts can be 
categorized into 3 kinds
   - Only needed in partitioned context (PartitionedRuntimeContext)
   - Only needed in non-partitioned context (NonPartitionedContext & 
TwoOutputNonPartitionedContext)
   - Needed by both partitioned and non partitioned 
(NonPartitionedRuntimeContext)
   
   Therefore, I'd suggest:
   - RuntimeContext for things needed by both partitioned and non-partitioned, 
which is exactly the current NonPartitionedRuntimeContext
   - OneOutputNonPartitionedContext and TwoOutputNonPartitionedContext, both 
extends RuntimeContext, for things only needed in non-partitioned context, 
which are the current NonPartitionedContext & TwoOutputNonPartitionedContext
   - PartitionedContext, extends RuntimeContext, for things only needed in 
partitioned context, which is the current RuntimeContext
   - The current PartitionedRuntimeContext is no longer needed, as is is 
neither directly implemented by any concrete classes, nor extended by multiple 
sub-interfaces/classes.



##
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroupDescriptor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The descriptor that describe the name and the different resource components 
of a slot sharing
+ * group.
+ */
+@Experimental
+public class SlotSharingGroupDescriptor {

Review Comment:
   Not sure about the name `xxxDescriptor`. Maybe we can keep the name but move 
it to another package for v2. Since we are no longer exposing the concept of 
operator, maybe just move it to the new package `o.a.f.api.common`. And please 
make sure we document the purpose of having two SSG classes and their 
differences clearly in JavaDocs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35302][rest] Ignore unknown fields in REST request deserialization [flink]

2024-05-08 Thread via GitHub


gaborgsomogyi commented on PR #24759:
URL: https://github.com/apache/flink/pull/24759#issuecomment-2100398349

   If there are no further comments then merging it in 2 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework [flink]

2024-05-08 Thread via GitHub


fredia commented on PR #24653:
URL: https://github.com/apache/flink/pull/24653#issuecomment-2100331363

   When passing `CheckpointStorageWorkerView` directly to 
`ChannelStateWriteRequestDispatcher`,  `checkpointStorage` 
`UnalignedCheckpointRescaleITCase` might throw exceptions when the task exited.
   
   This is mainly because the task will close the stream held by the current 
thread when it exits, see 
`FileSystemSafetyNet#closeSafetyNetAndGuardedResourcesForThread`(Thanks for 
@1996fanrui's help).
   
   Therefore, I let  
`ChannelStateWriteRequestDispatcher#CheckpointStorageWorkerView` be lazily 
initialized.
   and changed the `filesystem` in `FsMergingCheckpointStorageAccess` to 
unwrapped `filesystem`, to prevent different threads from interfering with each 
other when exiting.
   
   ```
   Caused by: java.io.IOException: Could not open output stream for state 
backend
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:461)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:308)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:284)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at java.io.DataOutputStream.write(DataOutputStream.java:107) 
~[?:1.8.0_292]
at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
~[?:1.8.0_292]
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.getBytes(NetworkBuffer.java:404)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl.writeData(ChannelStateSerializer.java:164)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.lambda$write$2(ChannelStateCheckpointWriter.java:194)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.runWithChecks(ChannelStateCheckpointWriter.java:274)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:189)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:153)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$2(ChannelStateWriteRequest.java:139)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$8(ChannelStateWriteRequest.java:231)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:366)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.handleCheckpointInProgressRequest(ChannelStateWriteRequestDispatcherImpl.java:169)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:119)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:86)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:136)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
... 1 more
   Caused by: java.io.IOException: Cannot register Closeable, registry is 
already closed. Closing argument.
at 
org.apache.flink.util.AbstractAutoCloseableRegistry.registerCloseable(AbstractAutoCloseableRegistry.java:89)
 ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:103)
 ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:131)
 ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 

Re: [PR] [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging [flink]

2024-05-08 Thread via GitHub


Zakelly commented on code in PR #24762:
URL: https://github.com/apache/flink/pull/24762#discussion_r1593698417


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##
@@ -268,4 +269,78 @@ public String toString() {
 "%s-%s(%d/%d)", jobIDString, operatorIDString, 
subtaskIndex, parallelism);
 }
 }
+
+/** Space usage statistics of a managed directory. */
+final class SpaceStat {
+
+AtomicLong physicalFileCount;
+AtomicLong physicalFileSize;
+
+AtomicLong logicalFileCount;
+AtomicLong logicalFileSize;
+
+public SpaceStat() {
+this(0, 0, 0, 0);
+}
+
+public SpaceStat(
+long physicalFileCount,
+long physicalFileSize,
+long logicalFileCount,
+long logicalFileSize) {
+this.physicalFileCount = new AtomicLong(0);

Review Comment:
   Is it `this.physicalFileCount = new AtomicLong(physicalFileCount);` ? 



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -607,6 +619,7 @@ public void restoreStateHandles(
 
fileHandle.getScope()))
 ? 
physicalFileDeleter
 : null;
+spaceStat.onPhysicalFileCreate();

Review Comment:
   I'm afraid this is not true. If the `isManagedByFileMergingManager` returns 
false, there will be no file deleter, so no `spaceStat` will be updated after 
file deletion. I'd suggest skip `spaceStat` if `isManagedByFileMergingManager` 
returns false. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35243][cdc] Support table schema change events & PreSchema backfill [flink-cdc]

2024-05-08 Thread via GitHub


yuxiqian commented on PR #3296:
URL: https://github.com/apache/flink-cdc/pull/3296#issuecomment-2100267362

   @PatrickRen @ruanhang1993 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33463][Connector/JDBC] Support the implementation of dynamic source tables based on the new source [flink-connector-jdbc]

2024-05-08 Thread via GitHub


RocMarshal commented on code in PR #117:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/117#discussion_r1593792023


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -144,7 +146,8 @@ void testJdbcReadProperties() {
 readOptions,
 LookupOptions.MAX_RETRIES.defaultValue(),
 null,
-SCHEMA.toPhysicalRowDataType());
+SCHEMA.toPhysicalRowDataType(),
+FactoryMocks.IDENTIFIER.asSummaryString());

Review Comment:
   @eskabetxe updated!   :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35309) Enable Notice file ci check and fix Notice

2024-05-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35309:
---
Labels: pull-request-available  (was: )

> Enable Notice file ci check and fix Notice 
> ---
>
> Key: FLINK-35309
> URL: https://issues.apache.org/jira/browse/FLINK-35309
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
>
> Changes:
> * Add ci to check Notice file 
> * Fix Notice file issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35309][ci] Enable Notice file ci check and fix Notice [flink-cdc]

2024-05-08 Thread via GitHub


GOODBOY008 opened a new pull request, #3304:
URL: https://github.com/apache/flink-cdc/pull/3304

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35306) Flink cannot compile with jdk17

2024-05-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-35306.
-
Resolution: Fixed

> Flink cannot compile with jdk17
> ---
>
> Key: FLINK-35306
> URL: https://issues.apache.org/jira/browse/FLINK-35306
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Tests
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-05-08-11-48-04-161.png
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59380=results]
>  fails and benchmark with 17 fails as well
>  
> Reason: TypeSerializerUpgradeTestBase.UpgradeVerifier update the 
> schemaCompatibilityMatcher method name to schemaCompatibilityCondition, but 
> some subclasses didn't change it, such as: 
> PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier.
>  
> It belongs to flink-tests-java17 module, and it doesn't compile by default.
>  
> it's caused by
>  * https://issues.apache.org/jira/browse/FLINK-25537
>  * [https://github.com/apache/flink/pull/24603]
>  
> !image-2024-05-08-11-48-04-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35306) Flink cannot compile with jdk17

2024-05-08 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844605#comment-17844605
 ] 

Rui Fan commented on FLINK-35306:
-

Merged to master(1.20) via 1c34ca011cacdbb3b0f48b485eac89dd913d29bf

> Flink cannot compile with jdk17
> ---
>
> Key: FLINK-35306
> URL: https://issues.apache.org/jira/browse/FLINK-35306
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Tests
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-05-08-11-48-04-161.png
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59380=results]
>  fails and benchmark with 17 fails as well
>  
> Reason: TypeSerializerUpgradeTestBase.UpgradeVerifier update the 
> schemaCompatibilityMatcher method name to schemaCompatibilityCondition, but 
> some subclasses didn't change it, such as: 
> PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier.
>  
> It belongs to flink-tests-java17 module, and it doesn't compile by default.
>  
> it's caused by
>  * https://issues.apache.org/jira/browse/FLINK-25537
>  * [https://github.com/apache/flink/pull/24603]
>  
> !image-2024-05-08-11-48-04-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35306][test] Migrate flink-tests-java17 module to JUnit5 and assertj to solve jdk17 compile fails [flink]

2024-05-08 Thread via GitHub


1996fanrui merged PR #24761:
URL: https://github.com/apache/flink/pull/24761


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35306][test] Migrate flink-tests-java17 module to JUnit5 and assertj to solve jdk17 compile fails [flink]

2024-05-08 Thread via GitHub


1996fanrui commented on PR #24761:
URL: https://github.com/apache/flink/pull/24761#issuecomment-2100201974

   Thanks @Zakelly for the review!
   
   CI is green, merging 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35193][table] Support the execution of refresh materialized table [flink]

2024-05-08 Thread via GitHub


lsyldliu commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1593542865


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -65,6 +72,11 @@ public static ResultFetcher callMaterializedTableOperation(
 return callCreateMaterializedTableOperation(
 operationExecutor, handle, 
(CreateMaterializedTableOperation) op);
 }
+if (op instanceof AlterMaterializedTableRefreshOperation) {

Review Comment:
   ```suggestion
   else if (op instanceof AlterMaterializedTableRefreshOperation) {
   ```



##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+Set allPartitionKeys =
+new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+Set unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+unknownPartitionKeys.removeAll(allPartitionKeys);
+if (!unknownPartitionKeys.isEmpty()) {
+throw new TableException(
+String.format(
+"The partition spec contains unknown partition 
keys: %s.",
+unknownPartitionKeys));
+}
+
+// Set job name, runtime mode, checkpoint interval
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT INTO %s SELECT * FROM (%s)",

Review Comment:
   This should be:
   ```INSERT OVERWRITE %s SELECT * FROM (%s)```
   
   Refresh manually should be a table or partition granularity overwrite.



##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+Set allPartitionKeys =
+new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+Set unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+unknownPartitionKeys.removeAll(allPartitionKeys);
+if (!unknownPartitionKeys.isEmpty()) {
+throw new TableException(
+String.format(
+"The partition spec contains unknown 

Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


loserwang1024 commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2099986993

   > Could you define your own one in the test itself? Then you have free hands 
what it does, and does not...
   
   Done it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35309) Enable Notice file ci check and fix Notice

2024-05-08 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-35309:
-

Assignee: Zhongqiang Gong

> Enable Notice file ci check and fix Notice 
> ---
>
> Key: FLINK-35309
> URL: https://issues.apache.org/jira/browse/FLINK-35309
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Blocker
>
> Changes:
> * Add ci to check Notice file 
> * Fix Notice file issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35309) Enable Notice file ci check and fix Notice

2024-05-08 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-35309:
--
Fix Version/s: cdc-3.1.0

> Enable Notice file ci check and fix Notice 
> ---
>
> Key: FLINK-35309
> URL: https://issues.apache.org/jira/browse/FLINK-35309
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Blocker
> Fix For: cdc-3.1.0
>
>
> Changes:
> * Add ci to check Notice file 
> * Fix Notice file issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35309) Enable Notice file ci check and fix Notice

2024-05-08 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-35309:
--
Affects Version/s: cdc-3.1.0
   (was: 3.1.0)

> Enable Notice file ci check and fix Notice 
> ---
>
> Key: FLINK-35309
> URL: https://issues.apache.org/jira/browse/FLINK-35309
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Blocker
>
> Changes:
> * Add ci to check Notice file 
> * Fix Notice file issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35309) Enable Notice file ci check and fix Notice

2024-05-08 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong updated FLINK-35309:

Description: 
Changes:
* Add ci to check Notice file 
* Fix Notice file issue

  was:
Changes:
* * Add ci to check Notice file 
* Fix Notice file issue


> Enable Notice file ci check and fix Notice 
> ---
>
> Key: FLINK-35309
> URL: https://issues.apache.org/jira/browse/FLINK-35309
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: Zhongqiang Gong
>Priority: Blocker
>
> Changes:
> * Add ci to check Notice file 
> * Fix Notice file issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35309) Enable Notice file ci check and fix Notice

2024-05-08 Thread Zhongqiang Gong (Jira)
Zhongqiang Gong created FLINK-35309:
---

 Summary: Enable Notice file ci check and fix Notice 
 Key: FLINK-35309
 URL: https://issues.apache.org/jira/browse/FLINK-35309
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: 3.1.0
Reporter: Zhongqiang Gong


Changes:
* * Add ci to check Notice file 
* Fix Notice file issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-08 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-35296:
---
Fix Version/s: cdc-3.2.0
   (was: 1.18.1)

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-08 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-35296:
---
Affects Version/s: cdc-3.1.0
   (was: 3.1.0)

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33463][Connector/JDBC] Support the implementation of dynamic source tables based on the new source [flink-connector-jdbc]

2024-05-08 Thread via GitHub


eskabetxe commented on code in PR #117:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/117#discussion_r1593526480


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -144,7 +146,8 @@ void testJdbcReadProperties() {
 readOptions,
 LookupOptions.MAX_RETRIES.defaultValue(),
 null,
-SCHEMA.toPhysicalRowDataType());
+SCHEMA.toPhysicalRowDataType(),
+FactoryMocks.IDENTIFIER.asSummaryString());

Review Comment:
   If is overwrite I would say yes, we should drop the attribute, as at the end 
is not used..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >