This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new abf9d040 [FLINK-32412] Reduce JobID collision chance
abf9d040 is described below

commit abf9d040ae58caf8313ca7b71049d6709fa26ea3
Author: fabio.wanner <fabio.wan...@dectris.com>
AuthorDate: Wed Jun 21 16:11:19 2023 +0200

    [FLINK-32412] Reduce JobID collision chance
    
    Instead of using Java's hashCode, which is an integer value (32bit), a long
    representation of the uid is used. This decreases the chance for an ID
    collision drastically:
    
    For a 50% collision chance with random integers, 77000 numbers need to be
    generated. For a long value (64 bit) a 50% change of a collision needs 
5.1×10^9
    random longs. For details look up: "the birthday problem".
    
    A test is added to increase awareness of the problem when changing this 
part of
    the code.
---
 .../operator/observer/sessionjob/FlinkSessionJobObserver.java      | 6 +++++-
 .../org/apache/flink/kubernetes/operator/utils/FlinkUtils.java     | 4 +++-
 .../org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java | 7 +++++++
 3 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
index c143bff5..42762b13 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
@@ -43,6 +43,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.kubernetes.operator.utils.FlinkUtils.generateSessionJobFixedJobID;
+
 /** The observer of {@link FlinkSessionJob}. */
 public class FlinkSessionJobObserver extends 
AbstractFlinkResourceObserver<FlinkSessionJob> {
 
@@ -83,7 +85,9 @@ public class FlinkSessionJobObserver extends 
AbstractFlinkResourceObserver<Flink
         var matchedJobs = new ArrayList<JobID>();
         for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
             var jobId = jobStatusMessage.getJobId();
-            if (jobId.getLowerPart() == uid.hashCode()
+            if (jobId.getLowerPart()
+                            == generateSessionJobFixedJobID(uid, 
jobId.getUpperPart() + 1L)
+                                    .getLowerPart()
                     && 
!jobStatusMessage.getJobState().isGloballyTerminalState()) {
                 matchedJobs.add(jobId);
             }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 68b23e34..3700b1f7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -55,6 +55,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 
 import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 
@@ -363,7 +364,8 @@ public class FlinkUtils {
      */
     public static JobID generateSessionJobFixedJobID(String uid, Long 
generation) {
         return new JobID(
-                Preconditions.checkNotNull(uid).hashCode(), 
Preconditions.checkNotNull(generation));
+                
UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(),
+                Preconditions.checkNotNull(generation));
     }
 
     /**
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 770a7042..8fe98f41 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -313,6 +313,13 @@ public class FlinkUtilsTest {
         assertEquals(List.of(v1merged, volume2, volume3), 
mergedPod.getSpec().getVolumes());
     }
 
+    @Test
+    public void testJobIDGeneration() {
+        JobID jobID =
+                
FlinkUtils.generateSessionJobFixedJobID("ffffffff-ffff-ffff-aaaa-aaaaaaaaaaaa", 
2L);
+        assertEquals("ffffffffffffffff0000000000000002", jobID.toString());
+    }
+
     private void createHAConfigMapWithData(
             String configMapName, String namespace, String clusterId, 
Map<String, String> data) {
         final ConfigMap kubernetesConfigMap =

Reply via email to