(flink) branch master updated: [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2.
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 6b08eb15d09 [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2. 6b08eb15d09 is described below commit 6b08eb15d09d6c8debb609b8b3a2a241ca855155 Author: JunRuiLee AuthorDate: Sun Jun 30 18:39:13 2024 +0800 [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2. This closes #24998. --- .../adaptivebatch/AdaptiveBatchScheduler.java | 41 +- .../adaptivebatch/AdaptiveBatchSchedulerTest.java | 12 +++ .../translators/SinkTransformationTranslator.java | 4 ++- .../SinkTransformationTranslatorITCaseBase.java| 32 +++-- 4 files changed, 69 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index d9578e7be10..6111e69dc41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -627,15 +627,8 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { final ExecutionJobVertex jobVertex, List inputs) { int vertexInitialParallelism = jobVertex.getParallelism(); ForwardGroup forwardGroup = forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId()); -if (!jobVertex.isParallelismDecided() -&& forwardGroup != null -&& forwardGroup.isParallelismDecided()) { -vertexInitialParallelism = forwardGroup.getParallelism(); -log.info( -"Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", -jobVertex.getName(), -jobVertex.getJobVertexId(), -vertexInitialParallelism); +if (!jobVertex.isParallelismDecided() && forwardGroup != null) { +checkState(!forwardGroup.isParallelismDecided()); } int vertexMinParallelism = ExecutionConfig.PARALLELISM_DEFAULT; @@ -674,6 +667,36 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { if (forwardGroup != null && !forwardGroup.isParallelismDecided()) { forwardGroup.setParallelism(parallelismAndInputInfos.getParallelism()); + +// When the parallelism for a forward group is determined, we ensure that the +// parallelism for all job vertices within that group is also set. +// This approach ensures that each forward edge produces single subpartition. +// +// This setting is crucial because the Sink V2 committer relies on the interplay +// between the CommittableSummary and the CommittableWithLineage, which are sent by +// the upstream Sink V2 Writer. The committer expects to receive CommittableSummary +// before CommittableWithLineage. +// +// If the number of subpartitions produced by a forward edge is greater than one, +// the ordering of these elements received by the committer cannot be assured, which +// would break the assumption that CommittableSummary is received before +// CommittableWithLineage. +for (JobVertexID jobVertexId : forwardGroup.getJobVertexIds()) { +ExecutionJobVertex executionJobVertex = getExecutionJobVertex(jobVertexId); +if (!executionJobVertex.isParallelismDecided()) { +log.info( +"Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", +executionJobVertex.getName(), +executionJobVertex.getJobVertexId(), +parallelismAndInputInfos.getParallelism()); +changeJobVertexParallelism( +executionJobVertex, parallelismAndInputInfos.getParallelism()); +} else { +checkState( +parallelismAndInputInfos.getParallelism() +== executionJobVertex.getParallelism()); +} +} } return parallelismAndInputInfos; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index e1a0a4714f9..2a5da861067 100644 ---
Re: [I] oracle-connector实时读取只读备库的问题 [flink-cdc]
xiaofan2022 commented on issue #1315: URL: https://github.com/apache/flink-cdc/issues/1315#issuecomment-2209869175 According to the document: https://www.infoq.cn/article/pgaknsli9xufej9htolt. Modify debizium https://github.com/xiaofan2022/debezium branch: mining_database_1. 9. The usage can refer to the IO. Debezium. Oracle. CDCTest. Documentation: 备库挖掘实现.md -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(flink) branch release-1.19 updated: [FLINK-35498][table] Fix unexpected argument name conflicts when extracting method parameter names from UDF
This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new 2581c26e4f0 [FLINK-35498][table] Fix unexpected argument name conflicts when extracting method parameter names from UDF 2581c26e4f0 is described below commit 2581c26e4f0e38886ba29f36268d7d7657eb8531 Author: Xuyang AuthorDate: Thu Jul 4 21:17:02 2024 +0800 [FLINK-35498][table] Fix unexpected argument name conflicts when extracting method parameter names from UDF This closes #25020 --- .../table/types/extraction/ExtractionUtils.java| 49 - .../types/extraction/ExtractionUtilsTest.java | 110 + .../planner/runtime/stream/sql/FunctionITCase.java | 43 3 files changed, 198 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java index 9854e62eb0c..5968b41fc84 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.table.types.extraction; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.api.DataTypes; @@ -58,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -747,7 +749,8 @@ public final class ExtractionUtils { return fieldNames; } -private static @Nullable List extractExecutableNames(Executable executable) { +@VisibleForTesting +static @Nullable List extractExecutableNames(Executable executable) { final int offset; if (!Modifier.isStatic(executable.getModifiers())) { // remove "this" as first parameter @@ -824,6 +827,40 @@ public final class ExtractionUtils { * * } * } + * + * If a constructor or method has multiple identical local variables that are not initialized + * like: + * + * {@code + * String localVariable; + * if (generic == null) { + * localVariable = "null"; + * } else if (generic < 0) { + * localVariable = "negative"; + * } else if (generic > 0) { + * localVariable = "positive"; + * } else { + * localVariable = "zero"; + * } + * } + * + * Its local variable table is as follows: + * + * {@code + * Start Length Slot Name Signature + * 7 3 2 localVariable Ljava/lang/String; + * 22 3 2 localVariable Ljava/lang/String; + * 37 3 2 localVariable Ljava/lang/String; + * 0 69 0 this...; + * 0 69 1 generic Ljava/lang/Long; + * 43 26 2 localVariable Ljava/lang/String; + * } + * + * The method parameters are always at the head in the 'slot' list. + * + * NOTE: the first parameter may be "this" if the function is not static. See more at https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html;>3.6. Receiving + * Arguments */ private static class ParameterExtractor extends ClassVisitor { @@ -831,7 +868,7 @@ public final class ExtractionUtils { private final String methodDescriptor; -private final List parameterNames = new ArrayList<>(); +private final Map parameterNamesWithIndex = new TreeMap<>(); ParameterExtractor(Constructor constructor) { super(OPCODE); @@ -844,7 +881,11 @@ public final class ExtractionUtils { } List getParameterNames() { -return parameterNames; +// method parameters are always at the head in the 'index' list +// NOTE: the first parameter may be "this" if the function is not static +// See more at Chapter "3.6. Receiving Arguments" in +// https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html +return new ArrayList<>(parameterNamesWithIndex.values()); } @Override @@ -860,7 +901,7 @@ public final class ExtractionUtils { Label start, Label end, int index) { -parameterNames.add(name); +
(flink) branch release-1.20 updated: [FLINK-35498][table] Fix unexpected argument name conflicts when extracting method parameter names from UDF
This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.20 by this push: new ce29a69bcbe [FLINK-35498][table] Fix unexpected argument name conflicts when extracting method parameter names from UDF ce29a69bcbe is described below commit ce29a69bcbe384cdfaec7db98ce4447b760ce3a2 Author: Xuyang AuthorDate: Thu Jul 4 21:13:48 2024 +0800 [FLINK-35498][table] Fix unexpected argument name conflicts when extracting method parameter names from UDF This closes #25019 --- .../table/types/extraction/ExtractionUtils.java| 49 - .../types/extraction/ExtractionUtilsTest.java | 110 + .../planner/runtime/stream/sql/FunctionITCase.java | 43 3 files changed, 198 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java index 9854e62eb0c..5968b41fc84 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.table.types.extraction; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.api.DataTypes; @@ -58,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -747,7 +749,8 @@ public final class ExtractionUtils { return fieldNames; } -private static @Nullable List extractExecutableNames(Executable executable) { +@VisibleForTesting +static @Nullable List extractExecutableNames(Executable executable) { final int offset; if (!Modifier.isStatic(executable.getModifiers())) { // remove "this" as first parameter @@ -824,6 +827,40 @@ public final class ExtractionUtils { * * } * } + * + * If a constructor or method has multiple identical local variables that are not initialized + * like: + * + * {@code + * String localVariable; + * if (generic == null) { + * localVariable = "null"; + * } else if (generic < 0) { + * localVariable = "negative"; + * } else if (generic > 0) { + * localVariable = "positive"; + * } else { + * localVariable = "zero"; + * } + * } + * + * Its local variable table is as follows: + * + * {@code + * Start Length Slot Name Signature + * 7 3 2 localVariable Ljava/lang/String; + * 22 3 2 localVariable Ljava/lang/String; + * 37 3 2 localVariable Ljava/lang/String; + * 0 69 0 this...; + * 0 69 1 generic Ljava/lang/Long; + * 43 26 2 localVariable Ljava/lang/String; + * } + * + * The method parameters are always at the head in the 'slot' list. + * + * NOTE: the first parameter may be "this" if the function is not static. See more at https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html;>3.6. Receiving + * Arguments */ private static class ParameterExtractor extends ClassVisitor { @@ -831,7 +868,7 @@ public final class ExtractionUtils { private final String methodDescriptor; -private final List parameterNames = new ArrayList<>(); +private final Map parameterNamesWithIndex = new TreeMap<>(); ParameterExtractor(Constructor constructor) { super(OPCODE); @@ -844,7 +881,11 @@ public final class ExtractionUtils { } List getParameterNames() { -return parameterNames; +// method parameters are always at the head in the 'index' list +// NOTE: the first parameter may be "this" if the function is not static +// See more at Chapter "3.6. Receiving Arguments" in +// https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html +return new ArrayList<>(parameterNamesWithIndex.values()); } @Override @@ -860,7 +901,7 @@ public final class ExtractionUtils { Label start, Label end, int index) { -parameterNames.add(name); +
(flink) branch ngn346 deleted (was 47ce6a97aaf)
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a change to branch ngn346 in repository https://gitbox.apache.org/repos/asf/flink.git was 47ce6a97aaf fixup! [NGN-346] Add custom scope variables to operators This change permanently discards the following revisions: discard 47ce6a97aaf fixup! [NGN-346] Add custom scope variables to operators discard fbb234349f5 [hotfix] Debug log added metrics and all variables discard bc0c9724904 [FRT-528] Revert "[FRT-499] Classify 'Not authorized to access group' as user-error" discard f55de043e03 [MATRIX-247] MLNGramsFunction implementation and tests (#1031) discard 6ef5e143887 [FLINKCC-1468] properly storing semaphore caches discard 928c36bcb87 [hotfix] use run_mvn in tools/ci/confluent/bump_nanoversion.sh discard ea611959631 [DP-14979] Disable setting base version in bump_nanoversion as a temporary fix discard f068331233c [SQL-2123] use message indices to de-serialize multi-message proto (#1030) discard 95cabe2a73f [MATRIX-240] Drop model classification as USER ExceptionKind, modified drop mo… (#1022) discard 2b256e177d2 [NGN-378] Introduce task initialization timeout discard 7d397c2abb3 [FLINKCC-1444][tests] Use RestClusterClient discard 1e16fc36684 [NGN-377][hotfix] Remove "<" and ">" from variables names that metric group automatically adds for Otel reporting. discard 2ca770708db [NGN-396] Introduce Metric and Event for "all tasks running/finished" uptime definition. discard 30539ffbf01 [hotfix] Use proper event name for "JobStatusChange" in JobMaster. discard 460010003fd [NGN-391] Label exception from illegal inputs in SQL function #fromBase64 as USER error. discard fcbda546790 [NGN-391][hotfix] Deduplicate code for exception search. discard c39968b07e7 [SQL-2135] Allow DISTRIBUTED BY definition in CTAS statement (#1015) discard c19c06103f9 Merge pull request #1008 from confluentinc/ROBUST_SCALER_DEV discard 5d11d20b2b0 [MATRIX-239] Scaler function type inferencing refactor to extract common code to MLUtils discard d107b152fc2 [MATRIX-241] API changes for Gemini models (#1016) discard 78ae8f3ac64 [FRT-475] Add error classification for CREATE/SELECT/DROP FUNCTION (#1005) discard a8034a093fe Merge pull request #1010 from confluentinc/fix_sql_2118 discard 7974b214e18 [SQL-2118] Improve table not found message with private networking suggestion discard 28b51c59a73 [MATRIX-239] Robust Scaler function implementation and tests discard 70082303ac3 [SQL-1978] Use FileDescriptor for PROTO schema conversion (#1003) discard 5cbc1bc850c [FLINK-35687] JSON_QUERY should return a well formatted nested objects/arrays for ARRAY (#24976) (#1006) discard eb9c31cd7aa Merge pull request #1004 from confluentinc/ML_NORMALIZER_DEV discard e4657e08ceb [MATRIX-237] Normalizer function implementaion and AbsMax refactoring discard 1cebb08d7ff Merge pull request #1000 from confluentinc/ML_STANDARD_SCALER_DEV discard b3f94501c29 [MATRIX-225] Overloaded eval functions for explicit arguments discard 9ec97065d45 [MATRIX-225] Checkstyle change discard 0497a7b1800 [SQL-1234] Store comment for fields in all three supported formats (#910) discard 4e6ac98c8d8 [MATRIX] update jss for federated search query rewrite discard 41e6157574f [MATRIX-225] Added MLStandardScalerFunction class and Tests discard 68720053152 Update search function name (#999) discard 853dbff100f Merge pull request #995 from confluentinc/MAX_ABS_SCALER_DEV discard 6119d93cdfc [NGN-377] Report job status changes as events. discard 959c952ae17 [MATRIX-225] spotless apply discard bc052cf13b7 [MATRIX-225] Handling of zero/zero case added discard 91f21634ea5 [MATRIX-225] changed a test case to support code change discard c2ae5966dff [NGN-375] Report Checkpoints as Events. discard 3c12ea8c919 [SQL-1665] Remove Table Async option (#996) discard bca14d2ac09 [NGN-374] Report Job Failures as Events. discard d5fb264c57a [NGN-374] Introduce AttributeBuilder interface for SpanBuilder and EventBuilder. discard 924d979509f [MATRIX-225] Max Absolute Scaler function implemetation and its test added discard 9db90769e12 [FLINKCC-1400][MATRIX-219] Convert alter model reset to AlterModelOptionsOperation using ModelChange (#994) discard a8a21b6546b [Matrix-161] Refactor the ml functions directories for search. (#993) discard 6ce4d61df9f [FRT-476] Adds main thread callback to AsyncWaitOperator (#902) discard c697683dccc Merge pull request #987 from confluentinc/MIN_MAX_SCALER_REFACTOR discard b37b89f9fa2 [FRT-499] Classify 'Not authorized to access group' as user-error discard 3047f628f21 [FRT-499] Add util to check exception contains message discard f6e6d2d997a [NGN-348][hotfix] Fix comment and add annotation. discard 49a585ddd29 [NGN-348][hotfix] Fix comment and add annotation. discard e5a92f5c0ea [FLINKCC-983] Use JobID as job name discard 642a4c717d7 [FLINKCC-983] Add dedicated JobGraphGenerator interface discard
(flink) branch master updated: [FLINK-34268] Removing exclusions for FLINK-33676 33805
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new cc19159b489 [FLINK-34268] Removing exclusions for FLINK-33676 33805 cc19159b489 is described below commit cc19159b489dafad71c01bf551bc224a38203eae Author: Jim Hughes AuthorDate: Wed Jul 3 08:54:16 2024 -0400 [FLINK-34268] Removing exclusions for FLINK-33676 33805 When FLINK-34268 was initially implemented, FLINK-33676 and FLINK-33805 had not landed. Given that, there were exclusions hard-coded. This PR removes those. --- .../plan/nodes/exec/testutils/RestoreTestCompleteness.java | 12 1 file changed, 12 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestCompleteness.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestCompleteness.java index ca2d28b7166..092b37d7753 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestCompleteness.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestCompleteness.java @@ -19,16 +19,12 @@ package org.apache.flink.table.planner.plan.nodes.exec.testutils; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate; -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate; -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupTableAggregate; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate; -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate; import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil; import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil.ExecNodeNameVersion; @@ -52,14 +48,6 @@ public class RestoreTestCompleteness { private static final Set>> SKIP_EXEC_NODES = new HashSet>>() { { -/** TODO: Remove after FLINK-33676 is merged. */ -add(StreamExecWindowAggregate.class); -add(StreamExecLocalWindowAggregate.class); -add(StreamExecGlobalWindowAggregate.class); - -/** TODO: Remove after FLINK-33805 is merged. */ -add(StreamExecOverAggregate.class); - /** Ignoring python based exec nodes temporarily. */ add(StreamExecPythonCalc.class); add(StreamExecPythonCorrelate.class);
Re: [I] lost data when gtid with Multiple transaction id on mgr cluster [flink-cdc]
zbingwen commented on issue #3452: URL: https://github.com/apache/flink-cdc/issues/3452#issuecomment-2208425070 change to below , test is correct. ``` public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) { Map newSet = new HashMap<>(); serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet)); for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) { GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID()); if (serverUuidSet != null) { List restoredIntervals = uuidSet.getIntervals(); int restoredIntervalsSize = restoredIntervals.size(); List serverIntervals = serverUuidSet.getIntervals(); List newIntervals = new ArrayList<>(); for (int i = 0; i < serverIntervals.size(); i++) { GtidSet.Interval serverInterval = serverIntervals.get(i); if (i < restoredIntervalsSize) { GtidSet.Interval restoredInterval = restoredIntervals.get(i); if (serverInterval.getEnd() <= restoredInterval.getEnd()) { newIntervals.add( new com.github.shyiko.mysql.binlog.GtidSet.Interval( serverInterval.getStart(), serverInterval.getEnd())); } else if (serverInterval.getStart() <= restoredInterval.getEnd() && serverInterval.getEnd() > restoredInterval.getEnd()) { newIntervals.add( new com.github.shyiko.mysql.binlog.GtidSet.Interval( serverInterval.getStart(), restoredInterval.getEnd())); } } else { newIntervals.add( new com.github.shyiko.mysql.binlog.GtidSet.Interval( serverInterval.getStart(), serverInterval.getEnd())); } } newSet.put( uuidSet.getUUID(), new GtidSet.UUIDSet( new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet( uuidSet.getUUID(), newIntervals))); } else { newSet.put(uuidSet.getUUID(), uuidSet); } } return new GtidSet(newSet); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] lost data when gtid with Multiple transaction id on mgr cluster [flink-cdc]
zbingwen commented on issue #3452: URL: https://github.com/apache/flink-cdc/issues/3452#issuecomment-2208424086 restoredIntervalEnd logic is wrong ``` public static GtidSet fixRestoredGtidSetOld(GtidSet serverGtidSet, GtidSet restoredGtidSet) { Map newSet = new HashMap<>(); serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet)); for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) { GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID()); if (serverUuidSet != null) { long restoredIntervalEnd = getIntervalEnd(uuidSet); List newIntervals = new ArrayList<>(); for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) { if (serverInterval.getEnd() <= restoredIntervalEnd) { newIntervals.add( new com.github.shyiko.mysql.binlog.GtidSet.Interval( serverInterval.getStart(), serverInterval.getEnd())); } else if (serverInterval.getStart() <= restoredIntervalEnd && serverInterval.getEnd() > restoredIntervalEnd) { newIntervals.add( new com.github.shyiko.mysql.binlog.GtidSet.Interval( serverInterval.getStart(), restoredIntervalEnd)); } } newSet.put( uuidSet.getUUID(), new GtidSet.UUIDSet( new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet( uuidSet.getUUID(), newIntervals))); } else { newSet.put(uuidSet.getUUID(), uuidSet); } } return new GtidSet(newSet); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] lost data when gtid with Multiple transaction id on mgr cluster [flink-cdc]
zbingwen opened a new issue, #3452: URL: https://github.com/apache/flink-cdc/issues/3452 ### Please don't check the box below - [X] I'm aware that bugs and new features should be reported on [Apache Jira](https://issues.apache.org/jira) or Flink mailing list (d...@flink.apache.org) instead of here ### Again, please don't check the box below - [X] I'm aware that new issues on GitHub will be ignored and automatically closed. ### 请不要勾选以下选项 - [X] 我已知悉缺陷和新功能需要在 [Apache Jira](https://issues.apache.org/jira) 或 Flink 邮件列表(d...@flink.apache.org)中反馈,而不是在这里创建新 issue。 ### 也请不要勾选以下选项 - [X] 我已知悉 GitHub 上的新 issue 会被忽略且自动关闭。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] io/debezium/connector/mysql/MySqlConnection$MySqlConnectionConfiguration.class 类重复导致 异常java.lang.NoSuchMethodError [flink-cdc]
Tangsonghuai commented on issue #3066: URL: https://github.com/apache/flink-cdc/issues/3066#issuecomment-2208332993 debezium的问题请问一下,解决了吗 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(flink) branch master updated: [FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener in Executing state
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1f03a0a62ec [FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener in Executing state 1f03a0a62ec is described below commit 1f03a0a62ec371d24b194759e6c95bf661501e07 Author: David Moravek AuthorDate: Thu Oct 26 12:52:04 2023 +0200 [FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener in Executing state --- .../generated/all_jobmanager_section.html | 8 +- .../generated/expert_scheduling_section.html | 8 +- .../generated/job_manager_configuration.html | 8 +- .../flink/configuration/JobManagerOptions.java | 21 ++- .../checkpoint/CheckpointStatsListener.java| 33 + .../checkpoint/DefaultCheckpointStatsTracker.java | 29 .../scheduler/adaptive/AdaptiveScheduler.java | 148 +-- .../scheduler/adaptive/DefaultRescaleManager.java | 43 +++--- .../runtime/scheduler/adaptive/Executing.java | 57 +++- .../flink/runtime/scheduler/adaptive/State.java| 34 - .../DefaultCheckpointStatsTrackerTest.java | 67 + .../adaptive/AdaptiveSchedulerBuilder.java | 40 +- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 156 - .../adaptive/DefaultRescaleManagerTest.java| 3 +- .../runtime/scheduler/adaptive/ExecutingTest.java | 128 - .../test/scheduling/RescaleOnCheckpointITCase.java | 146 +++ 16 files changed, 864 insertions(+), 65 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html index 4539ce365d5..760ca029e04 100644 --- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html @@ -12,7 +12,7 @@ jobmanager.adaptive-scheduler.max-delay-for-scale-trigger (none) Duration -The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). +The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled). jobmanager.adaptive-scheduler.min-parallelism-increase @@ -32,6 +32,12 @@ Duration The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).Setting a negative duration will disable the resource tim [...] + + jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count +2 +Integer +The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint. + jobmanager.adaptive-scheduler.scaling-interval.max (none) diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html index 10e5ad134ce..6be6547547c 100644 --- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html +++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html @@ -90,7 +90,7 @@ jobmanager.adaptive-scheduler.max-delay-for-scale-trigger (none) Duration -The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). +The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled). jobmanager.adaptive-scheduler.min-parallelism-increase @@ -110,6 +110,12 @@ Duration The maximum time the JobManager will wait to acquire all required resources after a job
(flink-kubernetes-operator) branch main updated: [FLINK-35357][docs] Add kubernetes.operator.plugins.listeners config (#845)
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new ffaa3dd8 [FLINK-35357][docs] Add kubernetes.operator.plugins.listeners config (#845) ffaa3dd8 is described below commit ffaa3dd8d4f03d98c01d938c730fab8a21f340c0 Author: 阿洋 AuthorDate: Thu Jul 4 14:14:49 2024 +0800 [FLINK-35357][docs] Add kubernetes.operator.plugins.listeners config (#845) --- docs/layouts/shortcodes/generated/dynamic_section.html | 6 ++ .../generated/kubernetes_operator_config_configuration.html | 6 ++ .../operator/config/KubernetesOperatorConfigOptions.java | 9 + 3 files changed, 21 insertions(+) diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html b/docs/layouts/shortcodes/generated/dynamic_section.html index a51a6e75..9c1083fc 100644 --- a/docs/layouts/shortcodes/generated/dynamic_section.html +++ b/docs/layouts/shortcodes/generated/dynamic_section.html @@ -140,6 +140,12 @@ String Option to enable automatic savepoint triggering. Can be specified either as a Duration type (i.e. '10m') or as a cron expression in Quartz format (6 or 7 positions, see http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop. WARNING: not intended to be used together with the cron-based periodic savepoint triggering + + kubernetes.operator.plugins.listeners.listener-name.class +(none) +String +Custom plugins listener class, 'listener-name' is the name of the plugin listener, and its value is a fully qualified class name. + kubernetes.operator.pod-template.merge-arrays-by-name false diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 918b4074..172029c8 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -278,6 +278,12 @@ String Option to enable automatic savepoint triggering. Can be specified either as a Duration type (i.e. '10m') or as a cron expression in Quartz format (6 or 7 positions, see http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop. WARNING: not intended to be used together with the cron-based periodic savepoint triggering + + kubernetes.operator.plugins.listeners.listener-name.class +(none) +String +Custom plugins listener class, 'listener-name' is the name of the plugin listener, and its value is a fully qualified class name. + kubernetes.operator.pod-template.merge-arrays-by-name false diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index 1334ba4f..b065d228 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -333,6 +333,15 @@ public class KubernetesOperatorConfigOptions { + "WARNING: not intended to be used together with the cron-based " + "periodic checkpoint triggering"); +@SuppressWarnings("unused") +@Documentation.Section(SECTION_DYNAMIC) +public static final ConfigOption PLUGINS_LISTENERS_CLASS = +operatorConfig("plugins.listeners..class") +.stringType() +.noDefaultValue() +.withDescription( +"Custom plugins listener class, 'listener-name' is the name of the plugin listener, and its value is a fully qualified class name."); + @Documentation.Section(SECTION_SYSTEM) public static final ConfigOption OPERATOR_WATCHED_NAMESPACES = operatorConfig("watched.namespaces")