This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a664fc8be3c always set taskLocation (#17350)
a664fc8be3c is described below

commit a664fc8be3cdba479caed7ebd49ef2f835bf7699
Author: George Shiqi Wu <[email protected]>
AuthorDate: Wed Oct 16 14:02:39 2024 -0400

    always set taskLocation (#17350)
---
 .../k8s/overlord/KubernetesPeonLifecycle.java      |  51 +++++----
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  | 118 +++++++--------------
 2 files changed, 70 insertions(+), 99 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index eaef0cba6a1..fd6ae4bd6f1 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -20,6 +20,7 @@
 package org.apache.druid.k8s.overlord;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import io.fabric8.kubernetes.api.model.Pod;
@@ -31,6 +32,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
@@ -177,6 +179,11 @@ public class KubernetesPeonLifecycle
   protected synchronized TaskStatus join(long timeout) throws 
IllegalStateException
   {
     try {
+      /* It's okay to store taskLocation because podIP only changes on pod 
restart, and we have to set restartPolicy to Never
+        since Druid doesn't support retrying tasks from a external system 
(K8s). We can explore adding a fabric8 watcher
+        if we decide we need to change this later.
+      **/
+      taskLocation = getTaskLocationFromK8s();
       updateState(new State[]{State.NOT_STARTED, State.PENDING}, 
State.RUNNING);
 
       JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
@@ -254,24 +261,8 @@ public class KubernetesPeonLifecycle
     if we decide we need to change this later.
     **/
     if (taskLocation == null) {
-      Optional<Pod> maybePod = 
kubernetesClient.getPeonPod(taskId.getK8sJobName());
-      if (!maybePod.isPresent()) {
-        return TaskLocation.unknown();
-      }
-
-      Pod pod = maybePod.get();
-      PodStatus podStatus = pod.getStatus();
-
-      if (podStatus == null || podStatus.getPodIP() == null) {
-        return TaskLocation.unknown();
-      }
-      taskLocation = TaskLocation.create(
-          podStatus.getPodIP(),
-          DruidK8sConstants.PORT,
-          DruidK8sConstants.TLS_PORT,
-          
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED,
 "false")),
-          pod.getMetadata() != null ? pod.getMetadata().getName() : ""
-      );
+      log.warn("Unknown task location for [%s]", taskId);
+      return TaskLocation.unknown();
     }
 
     return taskLocation;
@@ -378,4 +369,28 @@ public class KubernetesPeonLifecycle
     );
     stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
   }
+
+  @VisibleForTesting
+  protected TaskLocation getTaskLocationFromK8s()
+  {
+    Pod pod = kubernetesClient.getPeonPodWithRetries(taskId.getK8sJobName());
+    PodStatus podStatus = pod.getStatus();
+
+    if (podStatus == null || podStatus.getPodIP() == null) {
+      throw new ISE("Could not find location of running task [%s]", taskId);
+    }
+
+    return TaskLocation.create(
+        podStatus.getPodIP(),
+        DruidK8sConstants.PORT,
+        DruidK8sConstants.TLS_PORT,
+        Boolean.parseBoolean(
+            pod.getMetadata() != null && pod.getMetadata().getAnnotations() != 
null ?
+                
pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, 
"false") :
+                "false"
+        ),
+        pod.getMetadata() != null ? pod.getMetadata().getName() : ""
+    );
+
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 59c3700b1fc..96b58edd1d3 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodStatus;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import io.fabric8.kubernetes.client.dsl.LogWatch;
@@ -57,6 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class KubernetesPeonLifecycleTest extends EasyMockSupport
 {
   private static final String ID = "id";
+  private static final String IP = "ip";
   private static final TaskStatus SUCCESS = TaskStatus.success(ID);
 
   @Mock KubernetesPeonClient kubernetesClient;
@@ -286,6 +288,9 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
     EasyMock.expectLastCall().once();
 
+    
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
+        new 
PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
+    );
     replayAll();
 
     TaskStatus taskStatus = peonLifecycle.join(0L);
@@ -337,7 +342,9 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-
+    
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
+        new 
PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
+    );
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
     replayAll();
@@ -393,7 +400,9 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall();
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
-
+    
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
+        new 
PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
+    ).anyTimes();
     replayAll();
 
     TaskStatus taskStatus = peonLifecycle.join(0L);
@@ -445,7 +454,9 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
-
+    
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
+        new 
PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
+    );
     replayAll();
 
     TaskStatus taskStatus = peonLifecycle.join(0L);
@@ -493,7 +504,9 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-
+    
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
+        new 
PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
+    );
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
     replayAll();
@@ -545,7 +558,9 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-
+    
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
+        new 
PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
+    );
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
     replayAll();
@@ -585,7 +600,9 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
-
+    
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
+        new 
PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
+    );
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
 
     replayAll();
@@ -768,7 +785,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
   }
 
   @Test
-  public void 
test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnknown()
+  public void 
test_getTaskLocation_withRunningTaskState_taskLocationUnset_returnsUnknown()
       throws NoSuchFieldException, IllegalAccessException
   {
     KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
@@ -780,8 +797,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
 
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());
-
     replayAll();
 
     Assert.assertEquals(TaskLocation.unknown(), 
peonLifecycle.getTaskLocation());
@@ -790,35 +805,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
   }
 
   @Test
-  public void 
test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_returnsUnknown()
-      throws NoSuchFieldException, IllegalAccessException
-  {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
-        task,
-        kubernetesClient,
-        taskLogs,
-        mapper,
-        stateListener
-    );
-    setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
-
-    Pod pod = new PodBuilder()
-        .withNewMetadata()
-        .withName(ID)
-        .endMetadata()
-        .build();
-
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));
-
-    replayAll();
-
-    Assert.assertEquals(TaskLocation.unknown(), 
peonLifecycle.getTaskLocation());
-
-    verifyAll();
-  }
-
-  @Test
-  public void 
test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_returnsLocation()
+  public void test_getTaskLocationFromK8s()
       throws NoSuchFieldException, IllegalAccessException
   {
     KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
@@ -839,12 +826,11 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         .endStatus()
         .build();
 
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));
+    
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(pod).once();
 
     replayAll();
 
-    TaskLocation location = peonLifecycle.getTaskLocation();
-
+    TaskLocation location = peonLifecycle.getTaskLocationFromK8s();
     Assert.assertEquals("ip", location.getHost());
     Assert.assertEquals(8100, location.getPort());
     Assert.assertEquals(-1, location.getTlsPort());
@@ -854,43 +840,7 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
   }
 
   @Test
-  public void test_getTaskLocation_saveTaskLocation()
-      throws NoSuchFieldException, IllegalAccessException
-  {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
-        task,
-        kubernetesClient,
-        taskLogs,
-        mapper,
-        stateListener
-    );
-    setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.RUNNING);
-
-    Pod pod = new PodBuilder()
-        .withNewMetadata()
-        .withName(ID)
-        .endMetadata()
-        .withNewStatus()
-        .withPodIP("ip")
-        .endStatus()
-        .build();
-
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)).once();
-
-    replayAll();
-
-    TaskLocation location = peonLifecycle.getTaskLocation();
-    peonLifecycle.getTaskLocation();
-    Assert.assertEquals("ip", location.getHost());
-    Assert.assertEquals(8100, location.getPort());
-    Assert.assertEquals(-1, location.getTlsPort());
-    Assert.assertEquals(ID, location.getK8sPodName());
-
-    verifyAll();
-  }
-
-  @Test
-  public void 
test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation()
+  public void 
test_getTaskLocationFromK8s_withPeonPodWithStatusWithTLSAnnotation()
       throws NoSuchFieldException, IllegalAccessException
   {
     KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
@@ -912,11 +862,11 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         .endStatus()
         .build();
 
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));
+    
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(pod).once();
 
     replayAll();
 
-    TaskLocation location = peonLifecycle.getTaskLocation();
+    TaskLocation location = peonLifecycle.getTaskLocationFromK8s();
 
     Assert.assertEquals("ip", location.getHost());
     Assert.assertEquals(-1, location.getPort());
@@ -938,7 +888,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
         stateListener
     );
     setPeonLifecycleState(peonLifecycle, 
KubernetesPeonLifecycle.State.STOPPED);
-    
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once();
 
     replayAll();
     Assert.assertEquals(TaskLocation.unknown(), 
peonLifecycle.getTaskLocation());
@@ -952,4 +901,11 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     stateField.setAccessible(true);
     stateField.set(peonLifecycle, new AtomicReference<>(state));
   }
+
+  private PodStatus getPodStatusWithIP()
+  {
+    PodStatus podStatus = new PodStatus();
+    podStatus.setPodIP(IP);
+    return podStatus;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to