This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0457c71d039 Fix k8sAndWorker mode in a zookeeper-less environment 
(#15445)
0457c71d039 is described below

commit 0457c71d0399904148f581d62d92099670d615bd
Author: YongGang <mail....@gmail.com>
AuthorDate: Fri Jan 12 09:30:01 2024 -0800

    Fix k8sAndWorker mode in a zookeeper-less environment (#15445)
    
    * Fix k8sAndWorker mode in a zookeeper-less environment
    
    * add unit test
    
    * code reformat
    
    * minor refine
    
    * change to inject Provider
    
    * correct style
    
    * bind HttpRemoteTaskRunnerFactory as provider
    
    * change to bind on TaskRunnerFactory
    
    * fix styling
---
 .../KubernetesAndWorkerTaskRunnerFactory.java      |  24 +---
 .../k8s/overlord/KubernetesOverlordModule.java     |  25 ++++
 .../KubernetesAndWorkerTaskRunnerFactoryTest.java  |  55 +--------
 .../k8s/overlord/KubernetesOverlordModuleTest.java | 130 +++++++++++++++++++++
 4 files changed, 166 insertions(+), 68 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
index de6db915c8a..9b482e75dc9 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
@@ -20,10 +20,9 @@
 package org.apache.druid.k8s.overlord;
 
 import com.google.inject.Inject;
-import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
+import com.google.inject.name.Named;
 import org.apache.druid.indexing.overlord.TaskRunnerFactory;
 import org.apache.druid.indexing.overlord.WorkerTaskRunner;
-import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
 import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
 
 
@@ -32,9 +31,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements 
TaskRunnerFactory<K
   public static final String TYPE_NAME = "k8sAndWorker";
 
   private final KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
-  private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
-  private final RemoteTaskRunnerFactory remoteTaskRunnerFactory;
-  private final KubernetesAndWorkerTaskRunnerConfig 
kubernetesAndWorkerTaskRunnerConfig;
+  private final TaskRunnerFactory<? extends WorkerTaskRunner> 
taskRunnerFactory;
   private final RunnerStrategy runnerStrategy;
 
   private KubernetesAndWorkerTaskRunner runner;
@@ -42,16 +39,12 @@ public class KubernetesAndWorkerTaskRunnerFactory 
implements TaskRunnerFactory<K
   @Inject
   public KubernetesAndWorkerTaskRunnerFactory(
       KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory,
-      HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory,
-      RemoteTaskRunnerFactory remoteTaskRunnerFactory,
-      KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig,
+      @Named("taskRunnerFactory") TaskRunnerFactory<? extends 
WorkerTaskRunner> taskRunnerFactory,
       RunnerStrategy runnerStrategy
   )
   {
     this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory;
-    this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory;
-    this.remoteTaskRunnerFactory = remoteTaskRunnerFactory;
-    this.kubernetesAndWorkerTaskRunnerConfig = 
kubernetesAndWorkerTaskRunnerConfig;
+    this.taskRunnerFactory = taskRunnerFactory;
     this.runnerStrategy = runnerStrategy;
   }
 
@@ -60,19 +53,12 @@ public class KubernetesAndWorkerTaskRunnerFactory 
implements TaskRunnerFactory<K
   {
     runner = new KubernetesAndWorkerTaskRunner(
         kubernetesTaskRunnerFactory.build(),
-        getWorkerTaskRunner(),
+        taskRunnerFactory.build(),
         runnerStrategy
     );
     return runner;
   }
 
-  private WorkerTaskRunner getWorkerTaskRunner()
-  {
-    String workerType = kubernetesAndWorkerTaskRunnerConfig.getWorkerType();
-    return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) ?
-           httpRemoteTaskRunnerFactory.build() : 
remoteTaskRunnerFactory.build();
-  }
-
   @Override
   public KubernetesAndWorkerTaskRunner get()
   {
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index afd9d9a7c4e..1e52a3583b2 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -21,10 +21,12 @@ package org.apache.druid.k8s.overlord;
 
 import com.google.inject.Binder;
 import com.google.inject.Inject;
+import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.multibindings.MapBinder;
+import com.google.inject.name.Named;
 import io.fabric8.kubernetes.client.Config;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import org.apache.druid.discovery.NodeRole;
@@ -37,8 +39,11 @@ import org.apache.druid.guice.PolyBind;
 import org.apache.druid.guice.annotations.LoadScope;
 import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
 import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
+import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
 import org.apache.druid.indexing.overlord.TaskRunnerFactory;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
@@ -130,6 +135,26 @@ public class KubernetesOverlordModule implements 
DruidModule
     return client;
   }
 
+  /**
+   * Provides a TaskRunnerFactory instance suitable for environments without 
Zookeeper.
+   * In such environments, the standard RemoteTaskRunnerFactory may not be 
operational.
+   * Depending on the workerType defined in 
KubernetesAndWorkerTaskRunnerConfig,
+   * this method selects and returns an appropriate TaskRunnerFactory 
implementation.
+   */
+  @Provides
+  @LazySingleton
+  @Named("taskRunnerFactory")
+  TaskRunnerFactory<? extends WorkerTaskRunner> provideWorkerTaskRunner(
+      KubernetesAndWorkerTaskRunnerConfig runnerConfig,
+      Injector injector
+  )
+  {
+    String workerType = runnerConfig.getWorkerType();
+    return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType)
+           ? injector.getInstance(HttpRemoteTaskRunnerFactory.class)
+           : injector.getInstance(RemoteTaskRunnerFactory.class);
+  }
+
   private static class RunnerStrategyProvider implements 
Provider<RunnerStrategy>
   {
     private KubernetesAndWorkerTaskRunnerConfig runnerConfig;
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
index 88696017d05..b309d4ef04b 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
@@ -20,9 +20,8 @@
 
 package org.apache.druid.k8s.overlord;
 
-import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
-import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
-import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
+import org.apache.druid.indexing.overlord.TaskRunnerFactory;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
 import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -37,59 +36,17 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest 
extends EasyMockSupport
 {
 
   @Mock KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
-  @Mock HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
-  @Mock RemoteTaskRunnerFactory remoteTaskRunnerFactory;
+  @Mock TaskRunnerFactory<? extends WorkerTaskRunner> taskRunnerFactory;
 
   @Test
-  public void test_useHttpTaskRunner_asDefault()
+  public void test_buildKubernetesTaskRunnerSuccessfully()
   {
     KubernetesAndWorkerTaskRunnerFactory factory = new 
KubernetesAndWorkerTaskRunnerFactory(
         kubernetesTaskRunnerFactory,
-        httpRemoteTaskRunnerFactory,
-        remoteTaskRunnerFactory,
-        new KubernetesAndWorkerTaskRunnerConfig(null, null),
+        taskRunnerFactory,
         new WorkerRunnerStrategy()
     );
-
-    EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null);
-    EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
-
-    replayAll();
-    factory.build();
-    verifyAll();
-  }
-
-  @Test
-  public void test_specifyRemoteTaskRunner()
-  {
-    KubernetesAndWorkerTaskRunnerFactory factory = new 
KubernetesAndWorkerTaskRunnerFactory(
-        kubernetesTaskRunnerFactory,
-        httpRemoteTaskRunnerFactory,
-        remoteTaskRunnerFactory,
-        new KubernetesAndWorkerTaskRunnerConfig(null, "remote"),
-        new WorkerRunnerStrategy()
-    );
-
-    EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
-    EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
-
-    replayAll();
-    factory.build();
-    verifyAll();
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void test_specifyIncorrectTaskRunner_shouldThrowException()
-  {
-    KubernetesAndWorkerTaskRunnerFactory factory = new 
KubernetesAndWorkerTaskRunnerFactory(
-        kubernetesTaskRunnerFactory,
-        httpRemoteTaskRunnerFactory,
-        remoteTaskRunnerFactory,
-        new KubernetesAndWorkerTaskRunnerConfig(null, "noop"),
-        new KubernetesRunnerStrategy()
-    );
-
-    EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
+    EasyMock.expect(taskRunnerFactory.build()).andReturn(null);
     EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
 
     replayAll();
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
new file mode 100644
index 00000000000..b83ec562bda
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.ProvisionException;
+import org.apache.druid.guice.ConfigModule;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
+import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.server.DruidNode;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Properties;
+
+@RunWith(EasyMockRunner.class)
+public class KubernetesOverlordModuleTest
+{
+  @Mock
+  private ServiceEmitter serviceEmitter;
+  @Mock
+  private TaskConfig taskConfig;
+  @Mock
+  private HttpClient httpClient;
+  @Mock
+  private RemoteTaskRunnerFactory remoteTaskRunnerFactory;
+  @Mock
+  private HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
+  private Injector injector;
+
+  @Test
+  public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully()
+  {
+    injector = makeInjectorWithProperties(initializePropertes(false), false, 
true);
+    KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = 
injector.getInstance(
+        KubernetesAndWorkerTaskRunnerFactory.class);
+    Assert.assertNotNull(taskRunnerFactory);
+
+    Assert.assertNotNull(taskRunnerFactory.build());
+  }
+
+  @Test
+  public void testRemoteTaskRunnerFactoryBindSuccessfully()
+  {
+    injector = makeInjectorWithProperties(initializePropertes(true), true, 
false);
+    KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = 
injector.getInstance(
+        KubernetesAndWorkerTaskRunnerFactory.class);
+    Assert.assertNotNull(taskRunnerFactory);
+
+    Assert.assertNotNull(taskRunnerFactory.build());
+  }
+
+  @Test(expected = ProvisionException.class)
+  public void testExceptionThrownIfNoTaskRunnerFactoryBind()
+  {
+    injector = makeInjectorWithProperties(initializePropertes(false), false, 
false);
+    injector.getInstance(KubernetesAndWorkerTaskRunnerFactory.class);
+  }
+
+  private Injector makeInjectorWithProperties(
+      final Properties props,
+      boolean isWorkerTypeRemote,
+      boolean isWorkerTypeHttpRemote
+  )
+  {
+    return Guice.createInjector(
+        ImmutableList.of(
+            new DruidGuiceExtensions(),
+            new JacksonModule(),
+
+            binder -> {
+              binder.bind(Properties.class).toInstance(props);
+              binder.bind(ServiceEmitter.class).toInstance(serviceEmitter);
+              
binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient);
+              binder.bind(TaskConfig.class).toInstance(taskConfig);
+              binder.bind(DruidNode.class)
+                    .annotatedWith(Self.class)
+                    .toInstance(new DruidNode("test-inject", null, false, 
null, null, true, false));
+              if (isWorkerTypeRemote) {
+                
binder.bind(RemoteTaskRunnerFactory.class).toInstance(remoteTaskRunnerFactory);
+              }
+              if (isWorkerTypeHttpRemote) {
+                
binder.bind(HttpRemoteTaskRunnerFactory.class).toInstance(httpRemoteTaskRunnerFactory);
+              }
+            },
+            new ConfigModule(),
+            new KubernetesOverlordModule()
+        ));
+  }
+
+  private static Properties initializePropertes(boolean isWorkerTypeRemote)
+  {
+    final Properties props = new Properties();
+    props.put("druid.indexer.runner.namespace", "NAMESPACE");
+    props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.type", "k8s");
+    if (isWorkerTypeRemote) {
+      props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType", 
"remote");
+    }
+    return props;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to