This is an automated email from the ASF dual-hosted git repository. suneet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 0457c71d039 Fix k8sAndWorker mode in a zookeeper-less environment (#15445) 0457c71d039 is described below commit 0457c71d0399904148f581d62d92099670d615bd Author: YongGang <mail....@gmail.com> AuthorDate: Fri Jan 12 09:30:01 2024 -0800 Fix k8sAndWorker mode in a zookeeper-less environment (#15445) * Fix k8sAndWorker mode in a zookeeper-less environment * add unit test * code reformat * minor refine * change to inject Provider * correct style * bind HttpRemoteTaskRunnerFactory as provider * change to bind on TaskRunnerFactory * fix styling --- .../KubernetesAndWorkerTaskRunnerFactory.java | 24 +--- .../k8s/overlord/KubernetesOverlordModule.java | 25 ++++ .../KubernetesAndWorkerTaskRunnerFactoryTest.java | 55 +-------- .../k8s/overlord/KubernetesOverlordModuleTest.java | 130 +++++++++++++++++++++ 4 files changed, 166 insertions(+), 68 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java index de6db915c8a..9b482e75dc9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java @@ -20,10 +20,9 @@ package org.apache.druid.k8s.overlord; import com.google.inject.Inject; -import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; +import com.google.inject.name.Named; import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.WorkerTaskRunner; -import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy; @@ -32,9 +31,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K public static final String TYPE_NAME = "k8sAndWorker"; private final KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory; - private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory; - private final RemoteTaskRunnerFactory remoteTaskRunnerFactory; - private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig; + private final TaskRunnerFactory<? extends WorkerTaskRunner> taskRunnerFactory; private final RunnerStrategy runnerStrategy; private KubernetesAndWorkerTaskRunner runner; @@ -42,16 +39,12 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K @Inject public KubernetesAndWorkerTaskRunnerFactory( KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory, - HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory, - RemoteTaskRunnerFactory remoteTaskRunnerFactory, - KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig, + @Named("taskRunnerFactory") TaskRunnerFactory<? extends WorkerTaskRunner> taskRunnerFactory, RunnerStrategy runnerStrategy ) { this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory; - this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory; - this.remoteTaskRunnerFactory = remoteTaskRunnerFactory; - this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig; + this.taskRunnerFactory = taskRunnerFactory; this.runnerStrategy = runnerStrategy; } @@ -60,19 +53,12 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K { runner = new KubernetesAndWorkerTaskRunner( kubernetesTaskRunnerFactory.build(), - getWorkerTaskRunner(), + taskRunnerFactory.build(), runnerStrategy ); return runner; } - private WorkerTaskRunner getWorkerTaskRunner() - { - String workerType = kubernetesAndWorkerTaskRunnerConfig.getWorkerType(); - return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) ? - httpRemoteTaskRunnerFactory.build() : remoteTaskRunnerFactory.build(); - } - @Override public KubernetesAndWorkerTaskRunner get() { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java index afd9d9a7c4e..1e52a3583b2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java @@ -21,10 +21,12 @@ package org.apache.druid.k8s.overlord; import com.google.inject.Binder; import com.google.inject.Inject; +import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.multibindings.MapBinder; +import com.google.inject.name.Named; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; import org.apache.druid.discovery.NodeRole; @@ -37,8 +39,11 @@ import org.apache.druid.guice.PolyBind; import org.apache.druid.guice.annotations.LoadScope; import org.apache.druid.indexing.common.config.FileTaskLogsConfig; import org.apache.druid.indexing.common.tasklogs.FileTaskLogs; +import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskRunnerFactory; +import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; +import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; @@ -130,6 +135,26 @@ public class KubernetesOverlordModule implements DruidModule return client; } + /** + * Provides a TaskRunnerFactory instance suitable for environments without Zookeeper. + * In such environments, the standard RemoteTaskRunnerFactory may not be operational. + * Depending on the workerType defined in KubernetesAndWorkerTaskRunnerConfig, + * this method selects and returns an appropriate TaskRunnerFactory implementation. + */ + @Provides + @LazySingleton + @Named("taskRunnerFactory") + TaskRunnerFactory<? extends WorkerTaskRunner> provideWorkerTaskRunner( + KubernetesAndWorkerTaskRunnerConfig runnerConfig, + Injector injector + ) + { + String workerType = runnerConfig.getWorkerType(); + return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) + ? injector.getInstance(HttpRemoteTaskRunnerFactory.class) + : injector.getInstance(RemoteTaskRunnerFactory.class); + } + private static class RunnerStrategyProvider implements Provider<RunnerStrategy> { private KubernetesAndWorkerTaskRunnerConfig runnerConfig; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java index 88696017d05..b309d4ef04b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java @@ -20,9 +20,8 @@ package org.apache.druid.k8s.overlord; -import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; -import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; -import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy; +import org.apache.druid.indexing.overlord.TaskRunnerFactory; +import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -37,59 +36,17 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends EasyMockSupport { @Mock KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory; - @Mock HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory; - @Mock RemoteTaskRunnerFactory remoteTaskRunnerFactory; + @Mock TaskRunnerFactory<? extends WorkerTaskRunner> taskRunnerFactory; @Test - public void test_useHttpTaskRunner_asDefault() + public void test_buildKubernetesTaskRunnerSuccessfully() { KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory( kubernetesTaskRunnerFactory, - httpRemoteTaskRunnerFactory, - remoteTaskRunnerFactory, - new KubernetesAndWorkerTaskRunnerConfig(null, null), + taskRunnerFactory, new WorkerRunnerStrategy() ); - - EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null); - EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null); - - replayAll(); - factory.build(); - verifyAll(); - } - - @Test - public void test_specifyRemoteTaskRunner() - { - KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory( - kubernetesTaskRunnerFactory, - httpRemoteTaskRunnerFactory, - remoteTaskRunnerFactory, - new KubernetesAndWorkerTaskRunnerConfig(null, "remote"), - new WorkerRunnerStrategy() - ); - - EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null); - EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null); - - replayAll(); - factory.build(); - verifyAll(); - } - - @Test(expected = IllegalArgumentException.class) - public void test_specifyIncorrectTaskRunner_shouldThrowException() - { - KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory( - kubernetesTaskRunnerFactory, - httpRemoteTaskRunnerFactory, - remoteTaskRunnerFactory, - new KubernetesAndWorkerTaskRunnerConfig(null, "noop"), - new KubernetesRunnerStrategy() - ); - - EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null); + EasyMock.expect(taskRunnerFactory.build()).andReturn(null); EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null); replayAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java new file mode 100644 index 00000000000..b83ec562bda --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.ProvisionException; +import org.apache.druid.guice.ConfigModule; +import org.apache.druid.guice.DruidGuiceExtensions; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; +import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; +import org.apache.druid.jackson.JacksonModule; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.server.DruidNode; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Properties; + +@RunWith(EasyMockRunner.class) +public class KubernetesOverlordModuleTest +{ + @Mock + private ServiceEmitter serviceEmitter; + @Mock + private TaskConfig taskConfig; + @Mock + private HttpClient httpClient; + @Mock + private RemoteTaskRunnerFactory remoteTaskRunnerFactory; + @Mock + private HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory; + private Injector injector; + + @Test + public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully() + { + injector = makeInjectorWithProperties(initializePropertes(false), false, true); + KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = injector.getInstance( + KubernetesAndWorkerTaskRunnerFactory.class); + Assert.assertNotNull(taskRunnerFactory); + + Assert.assertNotNull(taskRunnerFactory.build()); + } + + @Test + public void testRemoteTaskRunnerFactoryBindSuccessfully() + { + injector = makeInjectorWithProperties(initializePropertes(true), true, false); + KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = injector.getInstance( + KubernetesAndWorkerTaskRunnerFactory.class); + Assert.assertNotNull(taskRunnerFactory); + + Assert.assertNotNull(taskRunnerFactory.build()); + } + + @Test(expected = ProvisionException.class) + public void testExceptionThrownIfNoTaskRunnerFactoryBind() + { + injector = makeInjectorWithProperties(initializePropertes(false), false, false); + injector.getInstance(KubernetesAndWorkerTaskRunnerFactory.class); + } + + private Injector makeInjectorWithProperties( + final Properties props, + boolean isWorkerTypeRemote, + boolean isWorkerTypeHttpRemote + ) + { + return Guice.createInjector( + ImmutableList.of( + new DruidGuiceExtensions(), + new JacksonModule(), + + binder -> { + binder.bind(Properties.class).toInstance(props); + binder.bind(ServiceEmitter.class).toInstance(serviceEmitter); + binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient); + binder.bind(TaskConfig.class).toInstance(taskConfig); + binder.bind(DruidNode.class) + .annotatedWith(Self.class) + .toInstance(new DruidNode("test-inject", null, false, null, null, true, false)); + if (isWorkerTypeRemote) { + binder.bind(RemoteTaskRunnerFactory.class).toInstance(remoteTaskRunnerFactory); + } + if (isWorkerTypeHttpRemote) { + binder.bind(HttpRemoteTaskRunnerFactory.class).toInstance(httpRemoteTaskRunnerFactory); + } + }, + new ConfigModule(), + new KubernetesOverlordModule() + )); + } + + private static Properties initializePropertes(boolean isWorkerTypeRemote) + { + final Properties props = new Properties(); + props.put("druid.indexer.runner.namespace", "NAMESPACE"); + props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.type", "k8s"); + if (isWorkerTypeRemote) { + props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType", "remote"); + } + return props; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org