This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new abf9d040 [FLINK-32412] Reduce JobID collision chance abf9d040 is described below commit abf9d040ae58caf8313ca7b71049d6709fa26ea3 Author: fabio.wanner <fabio.wan...@dectris.com> AuthorDate: Wed Jun 21 16:11:19 2023 +0200 [FLINK-32412] Reduce JobID collision chance Instead of using Java's hashCode, which is an integer value (32bit), a long representation of the uid is used. This decreases the chance for an ID collision drastically: For a 50% collision chance with random integers, 77000 numbers need to be generated. For a long value (64 bit) a 50% change of a collision needs 5.1×10^9 random longs. For details look up: "the birthday problem". A test is added to increase awareness of the problem when changing this part of the code. --- .../operator/observer/sessionjob/FlinkSessionJobObserver.java | 6 +++++- .../org/apache/flink/kubernetes/operator/utils/FlinkUtils.java | 4 +++- .../org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java | 7 +++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java index c143bff5..42762b13 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java @@ -43,6 +43,8 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.kubernetes.operator.utils.FlinkUtils.generateSessionJobFixedJobID; + /** The observer of {@link FlinkSessionJob}. */ public class FlinkSessionJobObserver extends AbstractFlinkResourceObserver<FlinkSessionJob> { @@ -83,7 +85,9 @@ public class FlinkSessionJobObserver extends AbstractFlinkResourceObserver<Flink var matchedJobs = new ArrayList<JobID>(); for (JobStatusMessage jobStatusMessage : jobStatusMessages) { var jobId = jobStatusMessage.getJobId(); - if (jobId.getLowerPart() == uid.hashCode() + if (jobId.getLowerPart() + == generateSessionJobFixedJobID(uid, jobId.getUpperPart() + 1L) + .getLowerPart() && !jobStatusMessage.getJobState().isGloballyTerminalState()) { matchedJobs.add(jobId); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index 68b23e34..3700b1f7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -55,6 +55,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; +import java.util.UUID; import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; @@ -363,7 +364,8 @@ public class FlinkUtils { */ public static JobID generateSessionJobFixedJobID(String uid, Long generation) { return new JobID( - Preconditions.checkNotNull(uid).hashCode(), Preconditions.checkNotNull(generation)); + UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), + Preconditions.checkNotNull(generation)); } /** diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java index 770a7042..8fe98f41 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java @@ -313,6 +313,13 @@ public class FlinkUtilsTest { assertEquals(List.of(v1merged, volume2, volume3), mergedPod.getSpec().getVolumes()); } + @Test + public void testJobIDGeneration() { + JobID jobID = + FlinkUtils.generateSessionJobFixedJobID("ffffffff-ffff-ffff-aaaa-aaaaaaaaaaaa", 2L); + assertEquals("ffffffffffffffff0000000000000002", jobID.toString()); + } + private void createHAConfigMapWithData( String configMapName, String namespace, String clusterId, Map<String, String> data) { final ConfigMap kubernetesConfigMap =