(flink) branch master updated: [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2.

2024-07-04 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 6b08eb15d09 [FLINK-35731][runtime] Fix incorrect parallelism 
configured detection for Sink V2.
6b08eb15d09 is described below

commit 6b08eb15d09d6c8debb609b8b3a2a241ca855155
Author: JunRuiLee 
AuthorDate: Sun Jun 30 18:39:13 2024 +0800

[FLINK-35731][runtime] Fix incorrect parallelism configured detection for 
Sink V2.

This closes #24998.
---
 .../adaptivebatch/AdaptiveBatchScheduler.java  | 41 +-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  | 12 +++
 .../translators/SinkTransformationTranslator.java  |  4 ++-
 .../SinkTransformationTranslatorITCaseBase.java| 32 +++--
 4 files changed, 69 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index d9578e7be10..6111e69dc41 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -627,15 +627,8 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
 final ExecutionJobVertex jobVertex, List 
inputs) {
 int vertexInitialParallelism = jobVertex.getParallelism();
 ForwardGroup forwardGroup = 
forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId());
-if (!jobVertex.isParallelismDecided()
-&& forwardGroup != null
-&& forwardGroup.isParallelismDecided()) {
-vertexInitialParallelism = forwardGroup.getParallelism();
-log.info(
-"Parallelism of JobVertex: {} ({}) is decided to be {} 
according to forward group's parallelism.",
-jobVertex.getName(),
-jobVertex.getJobVertexId(),
-vertexInitialParallelism);
+if (!jobVertex.isParallelismDecided() && forwardGroup != null) {
+checkState(!forwardGroup.isParallelismDecided());
 }
 
 int vertexMinParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
@@ -674,6 +667,36 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
 
 if (forwardGroup != null && !forwardGroup.isParallelismDecided()) {
 
forwardGroup.setParallelism(parallelismAndInputInfos.getParallelism());
+
+// When the parallelism for a forward group is determined, we 
ensure that the
+// parallelism for all job vertices within that group is also set.
+// This approach ensures that each forward edge produces single 
subpartition.
+//
+// This setting is crucial because the Sink V2 committer relies on 
the interplay
+// between the CommittableSummary and the CommittableWithLineage, 
which are sent by
+// the upstream Sink V2 Writer. The committer expects to receive 
CommittableSummary
+// before CommittableWithLineage.
+//
+// If the number of subpartitions produced by a forward edge is 
greater than one,
+// the ordering of these elements received by the committer cannot 
be assured, which
+// would break the assumption that CommittableSummary is received 
before
+// CommittableWithLineage.
+for (JobVertexID jobVertexId : forwardGroup.getJobVertexIds()) {
+ExecutionJobVertex executionJobVertex = 
getExecutionJobVertex(jobVertexId);
+if (!executionJobVertex.isParallelismDecided()) {
+log.info(
+"Parallelism of JobVertex: {} ({}) is decided to 
be {} according to forward group's parallelism.",
+executionJobVertex.getName(),
+executionJobVertex.getJobVertexId(),
+parallelismAndInputInfos.getParallelism());
+changeJobVertexParallelism(
+executionJobVertex, 
parallelismAndInputInfos.getParallelism());
+} else {
+checkState(
+parallelismAndInputInfos.getParallelism()
+== executionJobVertex.getParallelism());
+}
+}
 }
 
 return parallelismAndInputInfos;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index e1a0a4714f9..2a5da861067 100644
--- 

Re: [I] oracle-connector实时读取只读备库的问题 [flink-cdc]

2024-07-04 Thread via GitHub


xiaofan2022 commented on issue #1315:
URL: https://github.com/apache/flink-cdc/issues/1315#issuecomment-2209869175

   According to the document: 
https://www.infoq.cn/article/pgaknsli9xufej9htolt. Modify debizium 
https://github.com/xiaofan2022/debezium branch: mining_database_1. 9. The usage 
can refer to the IO. Debezium. Oracle. CDCTest. Documentation: 备库挖掘实现.md


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(flink) branch release-1.19 updated: [FLINK-35498][table] Fix unexpected argument name conflicts when extracting method parameter names from UDF

2024-07-04 Thread lincoln
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
 new 2581c26e4f0 [FLINK-35498][table] Fix unexpected argument name 
conflicts when extracting method parameter names from UDF
2581c26e4f0 is described below

commit 2581c26e4f0e38886ba29f36268d7d7657eb8531
Author: Xuyang 
AuthorDate: Thu Jul 4 21:17:02 2024 +0800

[FLINK-35498][table] Fix unexpected argument name conflicts when extracting 
method parameter names from UDF

This closes #25020
---
 .../table/types/extraction/ExtractionUtils.java|  49 -
 .../types/extraction/ExtractionUtilsTest.java  | 110 +
 .../planner/runtime/stream/sql/FunctionITCase.java |  43 
 3 files changed, 198 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index 9854e62eb0c..5968b41fc84 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.types.extraction;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.api.DataTypes;
@@ -58,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -747,7 +749,8 @@ public final class ExtractionUtils {
 return fieldNames;
 }
 
-private static @Nullable List extractExecutableNames(Executable 
executable) {
+@VisibleForTesting
+static @Nullable List extractExecutableNames(Executable 
executable) {
 final int offset;
 if (!Modifier.isStatic(executable.getModifiers())) {
 // remove "this" as first parameter
@@ -824,6 +827,40 @@ public final class ExtractionUtils {
  *   
  * }
  * }
+ *
+ * If a constructor or method has multiple identical local variables 
that are not initialized
+ * like:
+ *
+ * {@code
+ * String localVariable;
+ * if (generic == null) {
+ * localVariable = "null";
+ * } else if (generic < 0) {
+ * localVariable = "negative";
+ * } else if (generic > 0) {
+ * localVariable = "positive";
+ * } else {
+ * localVariable = "zero";
+ * }
+ * }
+ *
+ * Its local variable table is as follows:
+ *
+ * {@code
+ * Start  Length  Slot Name   Signature
+ * 7   3   2 localVariable   Ljava/lang/String;
+ * 22  3   2 localVariable   Ljava/lang/String;
+ * 37  3   2 localVariable   Ljava/lang/String;
+ * 0  69   0 this...;
+ * 0  69   1 generic Ljava/lang/Long;
+ * 43 26   2 localVariable   Ljava/lang/String;
+ * }
+ *
+ * The method parameters are always at the head in the 'slot' list.
+ *
+ * NOTE: the first parameter may be "this" if the function is not 
static. See more at https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html;>3.6. 
Receiving
+ * Arguments
  */
 private static class ParameterExtractor extends ClassVisitor {
 
@@ -831,7 +868,7 @@ public final class ExtractionUtils {
 
 private final String methodDescriptor;
 
-private final List parameterNames = new ArrayList<>();
+private final Map parameterNamesWithIndex = new 
TreeMap<>();
 
 ParameterExtractor(Constructor constructor) {
 super(OPCODE);
@@ -844,7 +881,11 @@ public final class ExtractionUtils {
 }
 
 List getParameterNames() {
-return parameterNames;
+// method parameters are always at the head in the 'index' list
+// NOTE: the first parameter may be "this" if the function is not 
static
+// See more at Chapter "3.6. Receiving Arguments" in
+// https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html
+return new ArrayList<>(parameterNamesWithIndex.values());
 }
 
 @Override
@@ -860,7 +901,7 @@ public final class ExtractionUtils {
 Label start,
 Label end,
 int index) {
-parameterNames.add(name);
+  

(flink) branch release-1.20 updated: [FLINK-35498][table] Fix unexpected argument name conflicts when extracting method parameter names from UDF

2024-07-04 Thread lincoln
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
 new ce29a69bcbe [FLINK-35498][table] Fix unexpected argument name 
conflicts when extracting method parameter names from UDF
ce29a69bcbe is described below

commit ce29a69bcbe384cdfaec7db98ce4447b760ce3a2
Author: Xuyang 
AuthorDate: Thu Jul 4 21:13:48 2024 +0800

[FLINK-35498][table] Fix unexpected argument name conflicts when extracting 
method parameter names from UDF

This closes #25019
---
 .../table/types/extraction/ExtractionUtils.java|  49 -
 .../types/extraction/ExtractionUtilsTest.java  | 110 +
 .../planner/runtime/stream/sql/FunctionITCase.java |  43 
 3 files changed, 198 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index 9854e62eb0c..5968b41fc84 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.types.extraction;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.api.DataTypes;
@@ -58,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -747,7 +749,8 @@ public final class ExtractionUtils {
 return fieldNames;
 }
 
-private static @Nullable List extractExecutableNames(Executable 
executable) {
+@VisibleForTesting
+static @Nullable List extractExecutableNames(Executable 
executable) {
 final int offset;
 if (!Modifier.isStatic(executable.getModifiers())) {
 // remove "this" as first parameter
@@ -824,6 +827,40 @@ public final class ExtractionUtils {
  *   
  * }
  * }
+ *
+ * If a constructor or method has multiple identical local variables 
that are not initialized
+ * like:
+ *
+ * {@code
+ * String localVariable;
+ * if (generic == null) {
+ * localVariable = "null";
+ * } else if (generic < 0) {
+ * localVariable = "negative";
+ * } else if (generic > 0) {
+ * localVariable = "positive";
+ * } else {
+ * localVariable = "zero";
+ * }
+ * }
+ *
+ * Its local variable table is as follows:
+ *
+ * {@code
+ * Start  Length  Slot Name   Signature
+ * 7   3   2 localVariable   Ljava/lang/String;
+ * 22  3   2 localVariable   Ljava/lang/String;
+ * 37  3   2 localVariable   Ljava/lang/String;
+ * 0  69   0 this...;
+ * 0  69   1 generic Ljava/lang/Long;
+ * 43 26   2 localVariable   Ljava/lang/String;
+ * }
+ *
+ * The method parameters are always at the head in the 'slot' list.
+ *
+ * NOTE: the first parameter may be "this" if the function is not 
static. See more at https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html;>3.6. 
Receiving
+ * Arguments
  */
 private static class ParameterExtractor extends ClassVisitor {
 
@@ -831,7 +868,7 @@ public final class ExtractionUtils {
 
 private final String methodDescriptor;
 
-private final List parameterNames = new ArrayList<>();
+private final Map parameterNamesWithIndex = new 
TreeMap<>();
 
 ParameterExtractor(Constructor constructor) {
 super(OPCODE);
@@ -844,7 +881,11 @@ public final class ExtractionUtils {
 }
 
 List getParameterNames() {
-return parameterNames;
+// method parameters are always at the head in the 'index' list
+// NOTE: the first parameter may be "this" if the function is not 
static
+// See more at Chapter "3.6. Receiving Arguments" in
+// https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html
+return new ArrayList<>(parameterNamesWithIndex.values());
 }
 
 @Override
@@ -860,7 +901,7 @@ public final class ExtractionUtils {
 Label start,
 Label end,
 int index) {
-parameterNames.add(name);
+  

(flink) branch ngn346 deleted (was 47ce6a97aaf)

2024-07-04 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch ngn346
in repository https://gitbox.apache.org/repos/asf/flink.git


 was 47ce6a97aaf fixup! [NGN-346] Add custom scope variables to operators

This change permanently discards the following revisions:

 discard 47ce6a97aaf fixup! [NGN-346] Add custom scope variables to operators
 discard fbb234349f5 [hotfix] Debug log added metrics and all variables
 discard bc0c9724904 [FRT-528] Revert "[FRT-499] Classify 'Not authorized to 
access group' as user-error"
 discard f55de043e03 [MATRIX-247] MLNGramsFunction implementation and tests 
(#1031)
 discard 6ef5e143887 [FLINKCC-1468] properly storing semaphore caches
 discard 928c36bcb87 [hotfix] use run_mvn in 
tools/ci/confluent/bump_nanoversion.sh
 discard ea611959631 [DP-14979] Disable setting base version in 
bump_nanoversion as a temporary fix
 discard f068331233c [SQL-2123] use message indices to de-serialize 
multi-message proto (#1030)
 discard 95cabe2a73f [MATRIX-240] Drop model classification as USER 
ExceptionKind, modified drop mo… (#1022)
 discard 2b256e177d2 [NGN-378] Introduce task initialization timeout
 discard 7d397c2abb3 [FLINKCC-1444][tests] Use RestClusterClient
 discard 1e16fc36684 [NGN-377][hotfix] Remove "<" and ">" from variables names 
that metric group automatically adds for Otel reporting.
 discard 2ca770708db [NGN-396] Introduce Metric and Event for "all tasks 
running/finished" uptime definition.
 discard 30539ffbf01 [hotfix] Use proper event name for "JobStatusChange" in 
JobMaster.
 discard 460010003fd [NGN-391] Label exception from illegal inputs in SQL 
function #fromBase64 as USER error.
 discard fcbda546790 [NGN-391][hotfix] Deduplicate code for exception search.
 discard c39968b07e7 [SQL-2135] Allow DISTRIBUTED BY definition in CTAS 
statement (#1015)
 discard c19c06103f9 Merge pull request #1008 from 
confluentinc/ROBUST_SCALER_DEV
 discard 5d11d20b2b0 [MATRIX-239] Scaler function type inferencing refactor to 
extract common code to MLUtils
 discard d107b152fc2 [MATRIX-241] API changes for Gemini models (#1016)
 discard 78ae8f3ac64 [FRT-475] Add error classification for CREATE/SELECT/DROP 
FUNCTION (#1005)
 discard a8034a093fe Merge pull request #1010 from confluentinc/fix_sql_2118
 discard 7974b214e18 [SQL-2118] Improve table not found message with private 
networking suggestion
 discard 28b51c59a73 [MATRIX-239] Robust Scaler function implementation and 
tests
 discard 70082303ac3 [SQL-1978] Use FileDescriptor for PROTO schema conversion 
(#1003)
 discard 5cbc1bc850c [FLINK-35687] JSON_QUERY should return a well formatted 
nested objects/arrays for ARRAY (#24976) (#1006)
 discard eb9c31cd7aa Merge pull request #1004 from 
confluentinc/ML_NORMALIZER_DEV
 discard e4657e08ceb [MATRIX-237] Normalizer function implementaion and AbsMax 
refactoring
 discard 1cebb08d7ff Merge pull request #1000 from 
confluentinc/ML_STANDARD_SCALER_DEV
 discard b3f94501c29 [MATRIX-225] Overloaded eval functions for explicit 
arguments
 discard 9ec97065d45 [MATRIX-225] Checkstyle change
 discard 0497a7b1800 [SQL-1234] Store comment for fields in all three supported 
formats (#910)
 discard 4e6ac98c8d8 [MATRIX] update jss for federated search query rewrite
 discard 41e6157574f [MATRIX-225] Added MLStandardScalerFunction class and Tests
 discard 68720053152 Update search function name (#999)
 discard 853dbff100f Merge pull request #995 from 
confluentinc/MAX_ABS_SCALER_DEV
 discard 6119d93cdfc [NGN-377] Report job status changes as events.
 discard 959c952ae17 [MATRIX-225] spotless apply
 discard bc052cf13b7 [MATRIX-225] Handling of zero/zero case added
 discard 91f21634ea5 [MATRIX-225] changed a test case to support code change
 discard c2ae5966dff [NGN-375] Report Checkpoints as Events.
 discard 3c12ea8c919 [SQL-1665] Remove Table Async option (#996)
 discard bca14d2ac09 [NGN-374] Report Job Failures as Events.
 discard d5fb264c57a [NGN-374] Introduce AttributeBuilder interface for 
SpanBuilder and EventBuilder.
 discard 924d979509f [MATRIX-225] Max Absolute Scaler function implemetation 
and its test added
 discard 9db90769e12 [FLINKCC-1400][MATRIX-219] Convert alter model reset to 
AlterModelOptionsOperation using ModelChange (#994)
 discard a8a21b6546b [Matrix-161] Refactor the ml functions directories for 
search. (#993)
 discard 6ce4d61df9f [FRT-476] Adds main thread callback to AsyncWaitOperator 
(#902)
 discard c697683dccc Merge pull request #987 from 
confluentinc/MIN_MAX_SCALER_REFACTOR
 discard b37b89f9fa2 [FRT-499] Classify 'Not authorized to access group' as 
user-error
 discard 3047f628f21 [FRT-499] Add util to check exception contains message
 discard f6e6d2d997a [NGN-348][hotfix] Fix comment and add annotation.
 discard 49a585ddd29 [NGN-348][hotfix] Fix comment and add annotation.
 discard e5a92f5c0ea [FLINKCC-983] Use JobID as job name
 discard 642a4c717d7 [FLINKCC-983] Add dedicated JobGraphGenerator interface
 discard 

(flink) branch master updated: [FLINK-34268] Removing exclusions for FLINK-33676 33805

2024-07-04 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new cc19159b489 [FLINK-34268] Removing exclusions for FLINK-33676 33805
cc19159b489 is described below

commit cc19159b489dafad71c01bf551bc224a38203eae
Author: Jim Hughes 
AuthorDate: Wed Jul 3 08:54:16 2024 -0400

[FLINK-34268] Removing exclusions for FLINK-33676 33805

When FLINK-34268 was initially implemented, FLINK-33676 and FLINK-33805 had 
not landed.

Given that, there were exclusions hard-coded.  This PR removes those.
---
 .../plan/nodes/exec/testutils/RestoreTestCompleteness.java   | 12 
 1 file changed, 12 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestCompleteness.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestCompleteness.java
index ca2d28b7166..092b37d7753 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestCompleteness.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestCompleteness.java
@@ -19,16 +19,12 @@
 package org.apache.flink.table.planner.plan.nodes.exec.testutils;
 
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate;
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate;
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupTableAggregate;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate;
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate;
 import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
 import 
org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil.ExecNodeNameVersion;
 
@@ -52,14 +48,6 @@ public class RestoreTestCompleteness {
 private static final Set>> SKIP_EXEC_NODES =
 new HashSet>>() {
 {
-/** TODO: Remove after FLINK-33676 is merged. */
-add(StreamExecWindowAggregate.class);
-add(StreamExecLocalWindowAggregate.class);
-add(StreamExecGlobalWindowAggregate.class);
-
-/** TODO: Remove after FLINK-33805 is merged. */
-add(StreamExecOverAggregate.class);
-
 /** Ignoring python based exec nodes temporarily. */
 add(StreamExecPythonCalc.class);
 add(StreamExecPythonCorrelate.class);



Re: [I] lost data when gtid with Multiple transaction id on mgr cluster [flink-cdc]

2024-07-04 Thread via GitHub


zbingwen commented on issue #3452:
URL: https://github.com/apache/flink-cdc/issues/3452#issuecomment-2208425070

   change to below , test is correct. 
   ```
   public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet 
restoredGtidSet) {
   Map newSet = new HashMap<>();
   
   serverGtidSet.getUUIDSets().forEach(uuidSet -> 
newSet.put(uuidSet.getUUID(), uuidSet));
   
   for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
   
   GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
   
   if (serverUuidSet != null) {
   List restoredIntervals = 
uuidSet.getIntervals();
   int restoredIntervalsSize = restoredIntervals.size();
   List serverIntervals = 
serverUuidSet.getIntervals();
   
   List 
newIntervals =
   new ArrayList<>();
   
   for (int i = 0; i < serverIntervals.size(); i++) {
   GtidSet.Interval serverInterval = serverIntervals.get(i);
   if (i < restoredIntervalsSize) {
   GtidSet.Interval restoredInterval = 
restoredIntervals.get(i);
   if (serverInterval.getEnd() <= 
restoredInterval.getEnd()) {
   newIntervals.add(
   new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
   serverInterval.getStart(), 
serverInterval.getEnd()));
   } else if (serverInterval.getStart() <= 
restoredInterval.getEnd()
   && serverInterval.getEnd() > 
restoredInterval.getEnd()) {
   newIntervals.add(
   new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
   serverInterval.getStart(), 
restoredInterval.getEnd()));
   }
   } else {
   newIntervals.add(
   new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
   serverInterval.getStart(), 
serverInterval.getEnd()));
   }
   }
   newSet.put(
   uuidSet.getUUID(),
   new GtidSet.UUIDSet(
   new 
com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
   uuidSet.getUUID(), newIntervals)));
   } else {
   newSet.put(uuidSet.getUUID(), uuidSet);
   }
   }
   return new GtidSet(newSet);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] lost data when gtid with Multiple transaction id on mgr cluster [flink-cdc]

2024-07-04 Thread via GitHub


zbingwen commented on issue #3452:
URL: https://github.com/apache/flink-cdc/issues/3452#issuecomment-2208424086

   restoredIntervalEnd logic is wrong
   ```
   public static GtidSet fixRestoredGtidSetOld(GtidSet serverGtidSet, 
GtidSet restoredGtidSet) {
   Map newSet = new HashMap<>();
   serverGtidSet.getUUIDSets().forEach(uuidSet -> 
newSet.put(uuidSet.getUUID(), uuidSet));
   for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
   GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
   if (serverUuidSet != null) {
   long restoredIntervalEnd = getIntervalEnd(uuidSet);
   List 
newIntervals =
   new ArrayList<>();
   for (GtidSet.Interval serverInterval : 
serverUuidSet.getIntervals()) {
   if (serverInterval.getEnd() <= restoredIntervalEnd) {
   newIntervals.add(
   new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
   serverInterval.getStart(), 
serverInterval.getEnd()));
   } else if (serverInterval.getStart() <= 
restoredIntervalEnd
   && serverInterval.getEnd() > 
restoredIntervalEnd) {
   newIntervals.add(
   new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
   serverInterval.getStart(), 
restoredIntervalEnd));
   }
   }
   newSet.put(
   uuidSet.getUUID(),
   new GtidSet.UUIDSet(
   new 
com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
   uuidSet.getUUID(), newIntervals)));
   } else {
   newSet.put(uuidSet.getUUID(), uuidSet);
   }
   }
   return new GtidSet(newSet);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] lost data when gtid with Multiple transaction id on mgr cluster [flink-cdc]

2024-07-04 Thread via GitHub


zbingwen opened a new issue, #3452:
URL: https://github.com/apache/flink-cdc/issues/3452

   ### Please don't check the box below
   
   - [X] I'm aware that bugs and new features should be reported on [Apache 
Jira](https://issues.apache.org/jira)  or Flink mailing list 
(d...@flink.apache.org) instead of here
   
   ### Again, please don't check the box below
   
   - [X] I'm aware that new issues on GitHub will be ignored and automatically 
closed.
   
   ### 请不要勾选以下选项
   
   - [X] 我已知悉缺陷和新功能需要在 [Apache Jira](https://issues.apache.org/jira) 或 Flink 
邮件列表(d...@flink.apache.org)中反馈,而不是在这里创建新 issue。
   
   ### 也请不要勾选以下选项
   
   - [X] 我已知悉 GitHub 上的新 issue 会被忽略且自动关闭。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] io/debezium/connector/mysql/MySqlConnection$MySqlConnectionConfiguration.class 类重复导致 异常java.lang.NoSuchMethodError [flink-cdc]

2024-07-04 Thread via GitHub


Tangsonghuai commented on issue #3066:
URL: https://github.com/apache/flink-cdc/issues/3066#issuecomment-2208332993

   debezium的问题请问一下,解决了吗


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(flink) branch master updated: [FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener in Executing state

2024-07-04 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1f03a0a62ec [FLINK-35553][runtime] Wire-up RescaleManager with 
CheckpointStatsListener in Executing state
1f03a0a62ec is described below

commit 1f03a0a62ec371d24b194759e6c95bf661501e07
Author: David Moravek 
AuthorDate: Thu Oct 26 12:52:04 2023 +0200

[FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener 
in Executing state
---
 .../generated/all_jobmanager_section.html  |   8 +-
 .../generated/expert_scheduling_section.html   |   8 +-
 .../generated/job_manager_configuration.html   |   8 +-
 .../flink/configuration/JobManagerOptions.java |  21 ++-
 .../checkpoint/CheckpointStatsListener.java|  33 +
 .../checkpoint/DefaultCheckpointStatsTracker.java  |  29 
 .../scheduler/adaptive/AdaptiveScheduler.java  | 148 +--
 .../scheduler/adaptive/DefaultRescaleManager.java  |  43 +++---
 .../runtime/scheduler/adaptive/Executing.java  |  57 +++-
 .../flink/runtime/scheduler/adaptive/State.java|  34 -
 .../DefaultCheckpointStatsTrackerTest.java |  67 +
 .../adaptive/AdaptiveSchedulerBuilder.java |  40 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 156 -
 .../adaptive/DefaultRescaleManagerTest.java|   3 +-
 .../runtime/scheduler/adaptive/ExecutingTest.java  | 128 -
 .../test/scheduling/RescaleOnCheckpointITCase.java | 146 +++
 16 files changed, 864 insertions(+), 65 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html 
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index 4539ce365d5..760ca029e04 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -12,7 +12,7 @@
 
jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
 (none)
 Duration
-The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and %dx of the checkpointing interval if checkpointing is 
enabled).
+The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and the checkpointing interval multiplied by the by-1-incremented 
parameter value of 
jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if 
checkpointing is enabled).
 
 
 
jobmanager.adaptive-scheduler.min-parallelism-increase
@@ -32,6 +32,12 @@
 Duration
 The maximum time the JobManager will wait to acquire all 
required resources after a job submission or restart. Once elapsed it will try 
to run the job with a lower parallelism, or fail if the minimum amount of 
resources could not be acquired.Increasing this value will make the 
cluster more resilient against temporary resources shortages (e.g., there is 
more time for a failed TaskManager to be restarted).Setting a negative 
duration will disable the resource tim [...]
 
+
+
jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count
+2
+Integer
+The number of consecutive failed checkpoints that will trigger 
rescaling even in the absence of a completed checkpoint.
+
 
 
jobmanager.adaptive-scheduler.scaling-interval.max
 (none)
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html 
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 10e5ad134ce..6be6547547c 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -90,7 +90,7 @@
 
jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
 (none)
 Duration
-The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and %dx of the checkpointing interval if checkpointing is 
enabled).
+The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and the checkpointing interval multiplied by the by-1-incremented 
parameter value of 
jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if 
checkpointing is enabled).
 
 
 
jobmanager.adaptive-scheduler.min-parallelism-increase
@@ -110,6 +110,12 @@
 Duration
 The maximum time the JobManager will wait to acquire all 
required resources after a job 

(flink-kubernetes-operator) branch main updated: [FLINK-35357][docs] Add kubernetes.operator.plugins.listeners config (#845)

2024-07-04 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new ffaa3dd8 [FLINK-35357][docs] Add kubernetes.operator.plugins.listeners 
config  (#845)
ffaa3dd8 is described below

commit ffaa3dd8d4f03d98c01d938c730fab8a21f340c0
Author: 阿洋 
AuthorDate: Thu Jul 4 14:14:49 2024 +0800

[FLINK-35357][docs] Add kubernetes.operator.plugins.listeners config  (#845)
---
 docs/layouts/shortcodes/generated/dynamic_section.html   | 6 ++
 .../generated/kubernetes_operator_config_configuration.html  | 6 ++
 .../operator/config/KubernetesOperatorConfigOptions.java | 9 +
 3 files changed, 21 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index a51a6e75..9c1083fc 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -140,6 +140,12 @@
 String
 Option to enable automatic savepoint triggering. Can be 
specified either as a Duration type (i.e. '10m') or as a cron expression in 
Quartz format (6 or 7 positions, see 
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
 triggering schedule is not guaranteed, savepoints will be triggered as part of 
the regular reconcile loop. WARNING: not intended to be used together with the 
cron-based periodic savepoint triggering
 
+
+
kubernetes.operator.plugins.listeners.listener-name.class
+(none)
+String
+Custom plugins listener class, 'listener-name' is the name of 
the plugin listener, and its value is a fully qualified class name.
+
 
 
kubernetes.operator.pod-template.merge-arrays-by-name
 false
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 918b4074..172029c8 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -278,6 +278,12 @@
 String
 Option to enable automatic savepoint triggering. Can be 
specified either as a Duration type (i.e. '10m') or as a cron expression in 
Quartz format (6 or 7 positions, see 
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The
 triggering schedule is not guaranteed, savepoints will be triggered as part of 
the regular reconcile loop. WARNING: not intended to be used together with the 
cron-based periodic savepoint triggering
 
+
+
kubernetes.operator.plugins.listeners.listener-name.class
+(none)
+String
+Custom plugins listener class, 'listener-name' is the name of 
the plugin listener, and its value is a fully qualified class name.
+
 
 
kubernetes.operator.pod-template.merge-arrays-by-name
 false
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 1334ba4f..b065d228 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -333,6 +333,15 @@ public class KubernetesOperatorConfigOptions {
 + "WARNING: not intended to be used 
together with the cron-based "
 + "periodic checkpoint triggering");
 
+@SuppressWarnings("unused")
+@Documentation.Section(SECTION_DYNAMIC)
+public static final ConfigOption PLUGINS_LISTENERS_CLASS =
+operatorConfig("plugins.listeners..class")
+.stringType()
+.noDefaultValue()
+.withDescription(
+"Custom plugins listener class, 'listener-name' is 
the name of the plugin listener, and its value is a fully qualified class 
name.");
+
 @Documentation.Section(SECTION_SYSTEM)
 public static final ConfigOption OPERATOR_WATCHED_NAMESPACES =
 operatorConfig("watched.namespaces")