FrankChen021 commented on code in PR #19433: URL: https://github.com/apache/druid/pull/19433#discussion_r3241284979
########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerFactory.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.k8s.overlord; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.inject.Inject; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import org.apache.druid.common.config.ConfigManager; +import org.apache.druid.guice.IndexingServiceModuleHelper; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.overlord.TaskRunner; +import org.apache.druid.indexing.overlord.TaskRunnerFactory; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.k8s.overlord.common.CachingKubernetesPeonClient; +import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient; +import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; +import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; +import org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClientFactory; +import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig; +import org.apache.druid.k8s.overlord.taskadapter.DynamicConfigPodTemplateSelector; +import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter; +import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter; +import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter; +import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.tasklogs.TaskLogs; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.stream.Collectors; + +public class MultipleKubernetesTaskRunnerFactory implements TaskRunnerFactory<TaskRunner> +{ + public static final String TYPE_NAME = "multik8s"; + private final ObjectMapper smileMapper; + private final HttpClient httpClient; + private final TaskLogs taskLogs; + private final ServiceEmitter emitter; + private final Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigSupplier; + private final ConfigManager configManager; + private final MultipleKubernetesTaskRunnerConfig runnerConfig; + private final TaskConfig taskConfig; + private final StartupLoggingConfig startupLoggingConfig; + private final DruidNode druidNode; + private final Properties properties; + private final DruidKubernetesHttpClientFactory httpClientFactory; + private TaskRunner runner; + + @Inject + public MultipleKubernetesTaskRunnerFactory( + @Json ObjectMapper objectMapper, + @Smile ObjectMapper smileMapper, + @EscalatedGlobal final HttpClient httpClient, + TaskLogs taskLogs, + Properties properties, + ServiceEmitter emitter, + Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigSupplier, + @Nullable ConfigManager configManager, + TaskConfig taskConfig, + StartupLoggingConfig startupLoggingConfig, + @Self DruidNode druidNode, + DruidKubernetesHttpClientFactory httpClientFactory + ) + { + this.runnerConfig = MultipleKubernetesTaskRunnerConfig.fromProperties(objectMapper, properties); + + this.smileMapper = smileMapper; + this.httpClient = httpClient; + this.taskLogs = taskLogs; + this.emitter = emitter; + this.dynamicConfigSupplier = dynamicConfigSupplier; + this.configManager = configManager; + this.taskConfig = taskConfig; + this.startupLoggingConfig = startupLoggingConfig; + this.druidNode = druidNode; + this.properties = properties; + this.httpClientFactory = httpClientFactory; + } + + @Override + public TaskRunner build() + { + final List<MultipleKubernetesTaskRunnerConfig.KubernetesCluster> enabledClusters = this.runnerConfig.getClusters() + .stream() + .filter(cluster -> !cluster.isDisabled()) + .collect(Collectors.toList()); + + if (enabledClusters.isEmpty()) { + throw new IllegalArgumentException("At least one task runner must be enabled"); + } + + final int totalCapacity = new KubernetesTaskRunnerEffectiveConfig(this.runnerConfig, this.dynamicConfigSupplier).getCapacity(); + final AutoscalableThreadPoolExecutor sharedExecutor = new AutoscalableThreadPoolExecutor(totalCapacity, this.configManager); + + final List<MultipleKubernetesTaskRunnerDelegate> taskRunners = new ArrayList<>(); + for (MultipleKubernetesTaskRunnerConfig.KubernetesCluster kubernetesCluster : this.runnerConfig.getClusters()) { + + final KubernetesTaskRunnerStaticConfig clusterConfig = getPerClusterConfiguration(kubernetesCluster); + final KubernetesTaskRunnerEffectiveConfig effectiveConfig = new KubernetesTaskRunnerEffectiveConfig( + clusterConfig, + this.dynamicConfigSupplier + ); + + final DruidKubernetesClient client = createClientForCluster(kubernetesCluster, clusterConfig); + final TaskAdapter clusterTaskAdapter = buildTaskAdapter(client, effectiveConfig); + final boolean useOverlordNamespace = PodTemplateTaskAdapter.TYPE.equals(clusterTaskAdapter.getAdapterType()); + final DruidKubernetesCachingClient cachingClient = effectiveConfig.isUseK8sSharedInformers() + ? createCachingClient(client, effectiveConfig) + : null; + + final KubernetesPeonClient peonClient = createPeonClient( + client, + cachingClient, + effectiveConfig, + useOverlordNamespace + ); + + final KubernetesTaskRunner clusterRunner = new KubernetesTaskRunner( + clusterTaskAdapter, + effectiveConfig, + peonClient, + httpClient, + new KubernetesPeonLifecycleFactory( + peonClient, + taskLogs, + smileMapper, + effectiveConfig.getLogSaveTimeout().toStandardDuration().getMillis() + ), + emitter, + sharedExecutor, + configManager + ); + + taskRunners.add( + new MultipleKubernetesTaskRunnerDelegate( + clusterRunner, + kubernetesCluster.getName(), + kubernetesCluster.isDisabled(), + client, + cachingClient + ) + ); + } + + this.runner = new MultipleKubernetesTaskRunner( + new KubernetesTaskRunnerEffectiveConfig( + this.runnerConfig, + this.dynamicConfigSupplier + ), + runnerConfig.getClusterSelector(), + taskRunners, + sharedExecutor + ); + return this.runner; + } + + @Override + public TaskRunner get() + { + return runner; + } + + private DruidKubernetesClient createClientForCluster( + MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster, + KubernetesTaskRunnerStaticConfig clusterConfig + ) + { + Config config; + if (cluster.getKubeconfigPath() != null && !cluster.getKubeconfigPath().trim().isEmpty()) { + config = Config.fromKubeconfig(new File(cluster.getKubeconfigPath())); + } else { + config = new ConfigBuilder().build(); + } + + if (clusterConfig.isDisableClientProxy()) { + config.setHttpsProxy(null); + config.setHttpProxy(null); + } + + config.setNamespace(clusterConfig.getNamespace()); + + return new DruidKubernetesClient(httpClientFactory, config); + } + + protected DruidKubernetesCachingClient createCachingClient( + DruidKubernetesClient client, + KubernetesTaskRunnerEffectiveConfig effectiveConfig + ) + { + return new DruidKubernetesCachingClient( + client, + effectiveConfig.getNamespace(), + effectiveConfig.getK8sSharedInformerResyncPeriod().toStandardDuration().getMillis() + ); + } + + private KubernetesPeonClient createPeonClient( + DruidKubernetesClient client, + @Nullable DruidKubernetesCachingClient cachingClient, + KubernetesTaskRunnerEffectiveConfig effectiveConfig, + boolean useOverlordNamespace + ) + { + if (cachingClient != null) { + return new CachingKubernetesPeonClient( + cachingClient, + effectiveConfig.getNamespace(), + useOverlordNamespace ? effectiveConfig.getOverlordNamespace() : "", + effectiveConfig.isDebugJobs(), + emitter + ); + } + + return new KubernetesPeonClient( + client, + effectiveConfig.getNamespace(), + useOverlordNamespace ? effectiveConfig.getOverlordNamespace() : "", + effectiveConfig.isDebugJobs(), + emitter + ); + } + + private TaskAdapter buildTaskAdapter( + DruidKubernetesClient client, + KubernetesTaskRunnerEffectiveConfig effectiveConfig + ) + { + final String adapter = properties.getProperty(String.format( + Locale.ROOT, + "%s.%s.adapter.type", + IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, + KubernetesTaskRunnerFactory.TYPE_NAME + )); + + if (adapter != null + && !MultiContainerTaskAdapter.TYPE.equals(adapter) + && effectiveConfig.isSidecarSupport()) { + throw new IAE( + "Invalid pod adapter [%s], only pod adapter [%s] can be specified when sidecarSupport is enabled", + adapter, + MultiContainerTaskAdapter.TYPE + ); + } + + if (MultiContainerTaskAdapter.TYPE.equals(adapter) || effectiveConfig.isSidecarSupport()) { + return new MultiContainerTaskAdapter( + client, + effectiveConfig, + taskConfig, + startupLoggingConfig, + druidNode, + smileMapper, + taskLogs + ); + } else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) { + return new PodTemplateTaskAdapter( + effectiveConfig, + taskConfig, + druidNode, + smileMapper, + taskLogs, + new DynamicConfigPodTemplateSelector(properties, effectiveConfig) + ); + } else { + return new SingleContainerTaskAdapter( Review Comment: [P1] Default pod adapter fails for remote clusters The multik8s factory still defaults to SingleContainerTaskAdapter, and also allows MultiContainerTaskAdapter. Both adapters build jobs by reading the Overlord pod via the configured Kubernetes client and task namespace. In this factory, that client points at the selected target cluster and namespace, where the single Druid Overlord usually does not exist. With the documented multik8s sample, which does not set a custom template adapter, the first task submitted to a remote cluster will fail while trying to copy a non-existent Overlord pod. Either require customTemplateAdapter for multik8s or change these adapters to source the pod spec from the local Overlord cluster instead of the target task cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
