kfaraz commented on code in PR #18599: URL: https://github.com/apache/druid/pull/18599#discussion_r2599360171
########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifier.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Manages event notifications for Kubernetes resources (Jobs and Pods). + * <p> + * Allows tasks to wait for specific resource changes without polling, improving efficiency and responsiveness. + * Critical component of {@link CachingKubernetesPeonClient} functionality. + * </p> + * <p> + * This implementation assumes only one waiter per job/pod at a time. If a new waiter is registered for a job that + * already has one, the previous waiter will be cancelled. + * </p> + */ +public class KubernetesResourceEventNotifier +{ + private static final EmittingLogger log = new EmittingLogger(KubernetesResourceEventNotifier.class); + + private final ConcurrentHashMap<String, CompletableFuture<Job>> jobWatchers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, CompletableFuture<Pod>> podWatchers = new ConcurrentHashMap<>(); + + /** + * Register to be notified when a job with the given name changes. + * <p> + * IMPORTANT: Callers must call {@link #cancelJobWatcher(String)} when done waiting to avoid resource leaks. + * + * @param jobName The name of the job to watch + * @return A future that completes when the job changes + */ + public CompletableFuture<Job> waitForJobChange(String jobName) + { + CompletableFuture<Job> future = new CompletableFuture<>(); + CompletableFuture<Job> previous = jobWatchers.put(jobName, future); + + if (previous != null && !previous.isDone()) { + log.warn("Replacing active watcher for job [%s] - multiple waiters detected", jobName); + previous.cancel(true); + } + + log.debug("Registered watcher for job [%s]", jobName); + return future; + } + + /** + * Register to be notified when a pod for the given job name changes. + * <p> + * IMPORTANT: Callers must call {@link #cancelPodWatcher(String)} when done waiting to avoid resource leaks. + * + * @param jobName The job-name label value to watch for + * @return A future that completes when a matching pod changes + */ + public CompletableFuture<Pod> waitForPodChange(String jobName) + { + CompletableFuture<Pod> future = new CompletableFuture<>(); + CompletableFuture<Pod> previous = podWatchers.put(jobName, future); + + if (previous != null && !previous.isDone()) { + log.warn("Replacing active watcher for pod with job-name [%s] - multiple waiters detected", jobName); + previous.cancel(true); + } + + log.debug("Registered watcher for pod with job-name [%s]", jobName); + return future; + } + + /** + * Cancel and remove a job watcher. Safe to call even if the future has already completed. + * + * @param jobName The name of the job to stop watching + */ + public void cancelJobWatcher(String jobName) + { + CompletableFuture<Job> future = jobWatchers.remove(jobName); + if (future != null && !future.isDone()) { + log.debug("Cancelling watcher for job [%s]", jobName); Review Comment: Nit: Druid style logs ```suggestion log.debug("Cancelling watcher for job[%s]", jobName); ``` ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java: ########## @@ -232,6 +238,18 @@ public Builder withLogSaveTimeout(Period logSaveTimeout) return this; } + public Builder withEnablePeonClientCache(boolean enableKubernetesClientCaching) Review Comment: Please align the method name and arg name with the config i.e. `useK8sSharedInformers`. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class DruidKubernetesCachingClient +{ + /** + * Event types for Kubernetes informer resource events. + */ + public enum InformerEventType + { + ADD, + UPDATE, + DELETE + } + + /** + * Impl of ResourceEventHandler that simplifies event handling + * by accepting a single lambda BiConsumer for all event types (add, update, delete). + * + * @param <T> The Kubernetes resource type (e.g., Pod, Job) + */ + public static class InformerEventHandler<T> implements ResourceEventHandler<T> + { + private final BiConsumer<T, InformerEventType> eventConsumer; + + public InformerEventHandler(BiConsumer<T, InformerEventType> eventConsumer) + { + this.eventConsumer = eventConsumer; + } + + @Override + public void onAdd(T resource) + { + eventConsumer.accept(resource, InformerEventType.ADD); + } + + @Override + public void onUpdate(T oldResource, T newResource) + { + eventConsumer.accept(newResource, InformerEventType.UPDATE); + } + + @Override + public void onDelete(T resource, boolean deletedFinalStateUnknown) + { + eventConsumer.accept(resource, InformerEventType.DELETE); + } + } + private static final EmittingLogger log = new EmittingLogger(DruidKubernetesCachingClient.class); + + public static final String JOB_NAME_INDEX = "byJobName"; + public static final String OVERLORD_NAMESPACE_INDEX = "byOverlordNamespace"; + + private static final long DEFAULT_INFORMER_RESYNC_PERIOD_MS = 300000L; // 5 minutes + + protected final SharedIndexInformer<Pod> podInformer; + protected final SharedIndexInformer<Job> jobInformer; + protected final KubernetesResourceEventNotifier eventNotifier; + private final KubernetesClientApi baseClient; + private final long informerResyncPeriodMillis; + + public DruidKubernetesCachingClient( + KubernetesClientApi baseClient, + String namespace, + long informerResyncPeriodMillis + ) + { + this.baseClient = baseClient; + this.informerResyncPeriodMillis = informerResyncPeriodMillis; + this.eventNotifier = new KubernetesResourceEventNotifier(); + + this.podInformer = setupPodInformer(namespace); + this.jobInformer = setupJobInformer(namespace); + } + + public void stop() + { + if (podInformer != null) { + podInformer.stop(); + } + if (jobInformer != null) { + jobInformer.stop(); + } + // Cancel all pending futures in the event notifier + eventNotifier.cancelAll(); + } + + public KubernetesClientApi getBaseClient() + { + return baseClient; + } + + // Delegate write operations to base client + public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException + { + return baseClient.executeRequest(executor); + } + + public KubernetesClient getClient() + { + return baseClient.getClient(); + } + + public <T> T readPodCache(SharedInformerCacheReadRequestExecutor<T, Pod> executor) Review Comment: Please add 1-line javadocs for these methods. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class DruidKubernetesCachingClient +{ + /** + * Event types for Kubernetes informer resource events. + */ + public enum InformerEventType + { + ADD, + UPDATE, + DELETE + } + + /** + * Impl of ResourceEventHandler that simplifies event handling + * by accepting a single lambda BiConsumer for all event types (add, update, delete). + * + * @param <T> The Kubernetes resource type (e.g., Pod, Job) + */ + public static class InformerEventHandler<T> implements ResourceEventHandler<T> + { + private final BiConsumer<T, InformerEventType> eventConsumer; + + public InformerEventHandler(BiConsumer<T, InformerEventType> eventConsumer) + { + this.eventConsumer = eventConsumer; + } + + @Override + public void onAdd(T resource) + { + eventConsumer.accept(resource, InformerEventType.ADD); + } + + @Override + public void onUpdate(T oldResource, T newResource) + { + eventConsumer.accept(newResource, InformerEventType.UPDATE); + } + + @Override + public void onDelete(T resource, boolean deletedFinalStateUnknown) + { + eventConsumer.accept(resource, InformerEventType.DELETE); + } + } + private static final EmittingLogger log = new EmittingLogger(DruidKubernetesCachingClient.class); + + public static final String JOB_NAME_INDEX = "byJobName"; + public static final String OVERLORD_NAMESPACE_INDEX = "byOverlordNamespace"; + + private static final long DEFAULT_INFORMER_RESYNC_PERIOD_MS = 300000L; // 5 minutes + + protected final SharedIndexInformer<Pod> podInformer; + protected final SharedIndexInformer<Job> jobInformer; + protected final KubernetesResourceEventNotifier eventNotifier; + private final KubernetesClientApi baseClient; + private final long informerResyncPeriodMillis; + + public DruidKubernetesCachingClient( + KubernetesClientApi baseClient, + String namespace, + long informerResyncPeriodMillis + ) + { + this.baseClient = baseClient; + this.informerResyncPeriodMillis = informerResyncPeriodMillis; + this.eventNotifier = new KubernetesResourceEventNotifier(); + + this.podInformer = setupPodInformer(namespace); + this.jobInformer = setupJobInformer(namespace); + } + + public void stop() + { + if (podInformer != null) { + podInformer.stop(); + } + if (jobInformer != null) { + jobInformer.stop(); + } + // Cancel all pending futures in the event notifier + eventNotifier.cancelAll(); + } + + public KubernetesClientApi getBaseClient() + { + return baseClient; + } + + // Delegate write operations to base client + public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException + { + return baseClient.executeRequest(executor); + } + + public KubernetesClient getClient() + { + return baseClient.getClient(); + } + + public <T> T readPodCache(SharedInformerCacheReadRequestExecutor<T, Pod> executor) + { + if (podInformer == null) { Review Comment: `podInformer` and `jobInformer` will never be null. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class DruidKubernetesCachingClient +{ + /** + * Event types for Kubernetes informer resource events. + */ + public enum InformerEventType + { + ADD, + UPDATE, + DELETE + } + + /** + * Impl of ResourceEventHandler that simplifies event handling + * by accepting a single lambda BiConsumer for all event types (add, update, delete). + * + * @param <T> The Kubernetes resource type (e.g., Pod, Job) + */ + public static class InformerEventHandler<T> implements ResourceEventHandler<T> + { + private final BiConsumer<T, InformerEventType> eventConsumer; + + public InformerEventHandler(BiConsumer<T, InformerEventType> eventConsumer) + { + this.eventConsumer = eventConsumer; + } + + @Override + public void onAdd(T resource) + { + eventConsumer.accept(resource, InformerEventType.ADD); + } + + @Override + public void onUpdate(T oldResource, T newResource) + { + eventConsumer.accept(newResource, InformerEventType.UPDATE); + } + + @Override + public void onDelete(T resource, boolean deletedFinalStateUnknown) + { + eventConsumer.accept(resource, InformerEventType.DELETE); + } + } + private static final EmittingLogger log = new EmittingLogger(DruidKubernetesCachingClient.class); + + public static final String JOB_NAME_INDEX = "byJobName"; + public static final String OVERLORD_NAMESPACE_INDEX = "byOverlordNamespace"; + + private static final long DEFAULT_INFORMER_RESYNC_PERIOD_MS = 300000L; // 5 minutes + + protected final SharedIndexInformer<Pod> podInformer; + protected final SharedIndexInformer<Job> jobInformer; + protected final KubernetesResourceEventNotifier eventNotifier; + private final KubernetesClientApi baseClient; + private final long informerResyncPeriodMillis; + + public DruidKubernetesCachingClient( + KubernetesClientApi baseClient, + String namespace, + long informerResyncPeriodMillis + ) + { + this.baseClient = baseClient; + this.informerResyncPeriodMillis = informerResyncPeriodMillis; + this.eventNotifier = new KubernetesResourceEventNotifier(); + + this.podInformer = setupPodInformer(namespace); + this.jobInformer = setupJobInformer(namespace); + } + + public void stop() + { + if (podInformer != null) { + podInformer.stop(); + } + if (jobInformer != null) { + jobInformer.stop(); + } + // Cancel all pending futures in the event notifier + eventNotifier.cancelAll(); + } + + public KubernetesClientApi getBaseClient() + { + return baseClient; + } + + // Delegate write operations to base client + public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException Review Comment: Not used anymore. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java: ########## @@ -72,6 +72,10 @@ public interface KubernetesTaskRunnerConfig Integer getCapacity(); + boolean isUseK8sSharedInformers(); + + Period getK8sSharedInformerResyncPeriod(); Review Comment: Maybe add 1-line javadocs for these. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import com.google.common.base.Optional; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.joda.time.Duration; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A KubernetesPeonClient implementation that uses shared informers to read Job and Pod state from a local cache. + * <p> + * This implementation greatly reduces load on the Kubernetes API server by centralizing watches and allowing + * tasks to query cached resource state instead of making per-task API calls. Mutable operations (job creation, + * deletion) still contact the API server directly. + * </p> + */ +public class CachingKubernetesPeonClient extends KubernetesPeonClient +{ + protected static final EmittingLogger log = new EmittingLogger(CachingKubernetesPeonClient.class); + + private final DruidKubernetesCachingClient cachingClient; + + public CachingKubernetesPeonClient( + DruidKubernetesCachingClient cachingClient, + String namespace, + String overlordNamespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + + super(cachingClient.getBaseClient(), namespace, overlordNamespace == null ? "" : overlordNamespace, debugJobs, emitter); + this.cachingClient = cachingClient; + } + + @Override + public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit) + { + Duration timeout = Duration.millis(unit.toMillis(howLong)); + Duration jobMustBeSeenWithin = Duration.millis(cachingClient.getInformerResyncPeriodMillis() * 2); + Stopwatch stopwatch = Stopwatch.createStarted(); Review Comment: Nit: Make these `final`? ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import com.google.common.base.Optional; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.joda.time.Duration; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A KubernetesPeonClient implementation that uses shared informers to read Job and Pod state from a local cache. + * <p> + * This implementation greatly reduces load on the Kubernetes API server by centralizing watches and allowing + * tasks to query cached resource state instead of making per-task API calls. Mutable operations (job creation, + * deletion) still contact the API server directly. + * </p> + */ +public class CachingKubernetesPeonClient extends KubernetesPeonClient +{ + protected static final EmittingLogger log = new EmittingLogger(CachingKubernetesPeonClient.class); + + private final DruidKubernetesCachingClient cachingClient; + + public CachingKubernetesPeonClient( + DruidKubernetesCachingClient cachingClient, + String namespace, + String overlordNamespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + + super(cachingClient.getBaseClient(), namespace, overlordNamespace == null ? "" : overlordNamespace, debugJobs, emitter); + this.cachingClient = cachingClient; + } + + @Override + public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit) + { + Duration timeout = Duration.millis(unit.toMillis(howLong)); + Duration jobMustBeSeenWithin = Duration.millis(cachingClient.getInformerResyncPeriodMillis() * 2); + Stopwatch stopwatch = Stopwatch.createStarted(); + boolean jobSeenInCache = false; + + try { + CompletableFuture<Job> jobFuture = cachingClient.waitForJobChange(taskId.getK8sJobName()); + while (stopwatch.hasNotElapsed(timeout) && (jobSeenInCache || stopwatch.hasNotElapsed(jobMustBeSeenWithin))) { + if (jobFuture.isDone()) { + jobFuture = cachingClient.waitForJobChange(taskId.getK8sJobName()); Review Comment: ```suggestion // Register a future to watch the next change to this job jobFuture = cachingClient.waitForJobChange(taskId.getK8sJobName()); ``` ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SharedInformerCacheReadRequestExecutor.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +@FunctionalInterface +public interface SharedInformerCacheReadRequestExecutor<T, R> Review Comment: ```suggestion public interface SharedInformerCacheReader<T, R> ``` ########## extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestCachingKubernetesClient.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +public class TestCachingKubernetesClient extends DruidKubernetesCachingClient +{ + private static final long TESTING_RESYNC_PERIOD_MS = 10L; + + public TestCachingKubernetesClient(KubernetesClientApi clientApi, String namespace) + { + super(clientApi, namespace, TESTING_RESYNC_PERIOD_MS); + } + + public void start() Review Comment: Should this be in the super class itself? When do we call `podInformer.run()` for the super class? ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import com.google.common.base.Optional; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.joda.time.Duration; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A KubernetesPeonClient implementation that uses shared informers to read Job and Pod state from a local cache. + * <p> + * This implementation greatly reduces load on the Kubernetes API server by centralizing watches and allowing + * tasks to query cached resource state instead of making per-task API calls. Mutable operations (job creation, + * deletion) still contact the API server directly. + * </p> + */ +public class CachingKubernetesPeonClient extends KubernetesPeonClient +{ + protected static final EmittingLogger log = new EmittingLogger(CachingKubernetesPeonClient.class); + + private final DruidKubernetesCachingClient cachingClient; + + public CachingKubernetesPeonClient( + DruidKubernetesCachingClient cachingClient, + String namespace, + String overlordNamespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + + super(cachingClient.getBaseClient(), namespace, overlordNamespace == null ? "" : overlordNamespace, debugJobs, emitter); + this.cachingClient = cachingClient; + } + + @Override + public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit) + { + Duration timeout = Duration.millis(unit.toMillis(howLong)); + Duration jobMustBeSeenWithin = Duration.millis(cachingClient.getInformerResyncPeriodMillis() * 2); + Stopwatch stopwatch = Stopwatch.createStarted(); + boolean jobSeenInCache = false; + + try { + CompletableFuture<Job> jobFuture = cachingClient.waitForJobChange(taskId.getK8sJobName()); + while (stopwatch.hasNotElapsed(timeout) && (jobSeenInCache || stopwatch.hasNotElapsed(jobMustBeSeenWithin))) { + if (jobFuture.isDone()) { + jobFuture = cachingClient.waitForJobChange(taskId.getK8sJobName()); + } + Optional<Job> maybeJob = getPeonJob(taskId.getK8sJobName()); + if (maybeJob.isPresent()) { + jobSeenInCache = true; + Job job = maybeJob.get(); + JobResponse currentResponse = determineJobResponse(job); + if (currentResponse.getPhase() != PeonPhase.RUNNING) { + return currentResponse; + } else { + log.debug("K8s job[%s] found in cache and is still running", taskId.getK8sJobName()); + } + } else if (jobSeenInCache) { + // Job was in cache before, but now it's gone - it was deleted and will never complete. + log.warn("K8s Job[%s] was not found. It can happen if the task was canceled", taskId.getK8sJobName()); + return new JobResponse(null, PeonPhase.FAILED); + } else { + log.debug("K8s job[%s] not yet found in cache", taskId.getK8sJobName()); + } + + try { + jobFuture.get(cachingClient.getInformerResyncPeriodMillis(), TimeUnit.MILLISECONDS); + } + catch (ExecutionException | CancellationException e) { + Throwable cause = e.getCause(); + if (cause instanceof CancellationException) { + log.noStackTrace().warn("Job change watch for job[%s] was cancelled", taskId.getK8sJobName()); + } else { + log.noStackTrace().warn(cause, "Exception while waiting for change notification of job[%s]", taskId.getK8sJobName()); + } + } + catch (TimeoutException e) { + // No job change event notified within the timeout time. If there is more time, it will loop back and check the cache again. + log.debug("Timeout waiting for change notification of job[%s].", taskId.getK8sJobName()); + } + catch (InterruptedException e) { + throw DruidException.defensive(e, "Interrupted waiting for job change notification for job[%s]", taskId.getK8sJobName()); Review Comment: This would cause a bunch of DruidException to be thrown when an Overlord is terminated, but I am not sure what the alternative should be. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class DruidKubernetesCachingClient +{ + /** + * Event types for Kubernetes informer resource events. + */ + public enum InformerEventType + { + ADD, + UPDATE, + DELETE + } + + /** + * Impl of ResourceEventHandler that simplifies event handling + * by accepting a single lambda BiConsumer for all event types (add, update, delete). + * + * @param <T> The Kubernetes resource type (e.g., Pod, Job) + */ + public static class InformerEventHandler<T> implements ResourceEventHandler<T> + { + private final BiConsumer<T, InformerEventType> eventConsumer; + + public InformerEventHandler(BiConsumer<T, InformerEventType> eventConsumer) + { + this.eventConsumer = eventConsumer; + } + + @Override + public void onAdd(T resource) + { + eventConsumer.accept(resource, InformerEventType.ADD); + } + + @Override + public void onUpdate(T oldResource, T newResource) + { + eventConsumer.accept(newResource, InformerEventType.UPDATE); + } + + @Override + public void onDelete(T resource, boolean deletedFinalStateUnknown) + { + eventConsumer.accept(resource, InformerEventType.DELETE); + } + } + private static final EmittingLogger log = new EmittingLogger(DruidKubernetesCachingClient.class); + + public static final String JOB_NAME_INDEX = "byJobName"; + public static final String OVERLORD_NAMESPACE_INDEX = "byOverlordNamespace"; + + private static final long DEFAULT_INFORMER_RESYNC_PERIOD_MS = 300000L; // 5 minutes + + protected final SharedIndexInformer<Pod> podInformer; + protected final SharedIndexInformer<Job> jobInformer; + protected final KubernetesResourceEventNotifier eventNotifier; + private final KubernetesClientApi baseClient; + private final long informerResyncPeriodMillis; + + public DruidKubernetesCachingClient( + KubernetesClientApi baseClient, + String namespace, + long informerResyncPeriodMillis + ) + { + this.baseClient = baseClient; + this.informerResyncPeriodMillis = informerResyncPeriodMillis; + this.eventNotifier = new KubernetesResourceEventNotifier(); + + this.podInformer = setupPodInformer(namespace); + this.jobInformer = setupJobInformer(namespace); + } + + public void stop() Review Comment: Please add a short javadoc. Should this be called upon loss of leadership too or only service termination? Should there also be an equivalent `start()` that is invoked on becoming leader? ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java: ########## @@ -156,46 +172,76 @@ public boolean deletePeonJob(K8sTaskId taskId) } } + /** + * Get a LogWatch for the peon pod associated with the given taskId. Create it if it does not already exist. + * <p> + * Any issues creating the LogWatch will be logged and an absent Optional will be returned. + * </p> + * + * @return an Optional containing the {@link LogWatch} if it exists or was created. + */ public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId) { + Optional<Pod> maybePod = getPeonPod(taskId.getK8sJobName()); + if (!maybePod.isPresent()) { + log.debug("Pod for job[%s] not found in cache, cannot watch logs", taskId.getK8sJobName()); + return Optional.absent(); + } + + Pod pod = maybePod.get(); + String podName = pod.getMetadata().getName(); + KubernetesClient k8sClient = clientApi.getClient(); try { - LogWatch logWatch = k8sClient.batch() - .v1() - .jobs() - .inNamespace(namespace) - .withName(taskId.getK8sJobName()) - .inContainer("main") - .watchLog(); + LogWatch logWatch = k8sClient.pods() + .inNamespace(namespace) + .withName(podName) + .inContainer("main") + .watchLog(); if (logWatch == null) { return Optional.absent(); } return Optional.of(logWatch); } catch (Exception e) { - log.error(e, "Error watching logs from task: %s", taskId); + log.error(e, "Error watching logs from task: %s, pod: %s", taskId, podName); Review Comment: ```suggestion log.error(e, "Error watching logs from task[%s], pod[%s].", taskId, podName); ``` ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java: ########## @@ -156,46 +172,76 @@ public boolean deletePeonJob(K8sTaskId taskId) } } + /** + * Get a LogWatch for the peon pod associated with the given taskId. Create it if it does not already exist. + * <p> + * Any issues creating the LogWatch will be logged and an absent Optional will be returned. + * </p> + * + * @return an Optional containing the {@link LogWatch} if it exists or was created. + */ public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId) { + Optional<Pod> maybePod = getPeonPod(taskId.getK8sJobName()); + if (!maybePod.isPresent()) { + log.debug("Pod for job[%s] not found in cache, cannot watch logs", taskId.getK8sJobName()); + return Optional.absent(); + } + + Pod pod = maybePod.get(); + String podName = pod.getMetadata().getName(); + KubernetesClient k8sClient = clientApi.getClient(); try { - LogWatch logWatch = k8sClient.batch() - .v1() - .jobs() - .inNamespace(namespace) - .withName(taskId.getK8sJobName()) - .inContainer("main") - .watchLog(); + LogWatch logWatch = k8sClient.pods() + .inNamespace(namespace) + .withName(podName) + .inContainer("main") + .watchLog(); if (logWatch == null) { return Optional.absent(); } return Optional.of(logWatch); } catch (Exception e) { - log.error(e, "Error watching logs from task: %s", taskId); + log.error(e, "Error watching logs from task: %s, pod: %s", taskId, podName); return Optional.absent(); } } + /** + * Get an InputStream for the logs of the peon pod associated with the given taskId. + * <p> + * Any issues creating the InputStream will be logged and an absent Optional will be returned. + * </p> + * + * @return an Optional containing the {@link InputStream} if the pod exists and logs could be streamed, or absent otherwise Review Comment: ```suggestion * @return an Optional containing the {@link InputStream} for the logs of the pod, if it exists and logs could be streamed, or absent otherwise. ``` ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class DruidKubernetesCachingClient +{ + /** + * Event types for Kubernetes informer resource events. + */ + public enum InformerEventType + { + ADD, + UPDATE, + DELETE + } + + /** + * Impl of ResourceEventHandler that simplifies event handling + * by accepting a single lambda BiConsumer for all event types (add, update, delete). + * + * @param <T> The Kubernetes resource type (e.g., Pod, Job) + */ + public static class InformerEventHandler<T> implements ResourceEventHandler<T> + { + private final BiConsumer<T, InformerEventType> eventConsumer; + + public InformerEventHandler(BiConsumer<T, InformerEventType> eventConsumer) + { + this.eventConsumer = eventConsumer; + } + + @Override + public void onAdd(T resource) + { + eventConsumer.accept(resource, InformerEventType.ADD); + } + + @Override + public void onUpdate(T oldResource, T newResource) + { + eventConsumer.accept(newResource, InformerEventType.UPDATE); + } + + @Override + public void onDelete(T resource, boolean deletedFinalStateUnknown) + { + eventConsumer.accept(resource, InformerEventType.DELETE); + } + } + private static final EmittingLogger log = new EmittingLogger(DruidKubernetesCachingClient.class); + + public static final String JOB_NAME_INDEX = "byJobName"; + public static final String OVERLORD_NAMESPACE_INDEX = "byOverlordNamespace"; + + private static final long DEFAULT_INFORMER_RESYNC_PERIOD_MS = 300000L; // 5 minutes + + protected final SharedIndexInformer<Pod> podInformer; + protected final SharedIndexInformer<Job> jobInformer; + protected final KubernetesResourceEventNotifier eventNotifier; Review Comment: Do these need to be `protected` or can they be `private` too? ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class DruidKubernetesCachingClient +{ + /** + * Event types for Kubernetes informer resource events. + */ + public enum InformerEventType + { + ADD, + UPDATE, + DELETE + } + + /** + * Impl of ResourceEventHandler that simplifies event handling + * by accepting a single lambda BiConsumer for all event types (add, update, delete). + * + * @param <T> The Kubernetes resource type (e.g., Pod, Job) + */ + public static class InformerEventHandler<T> implements ResourceEventHandler<T> Review Comment: Nit: this class may be in a file of its own. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class DruidKubernetesCachingClient +{ + /** + * Event types for Kubernetes informer resource events. + */ + public enum InformerEventType + { + ADD, + UPDATE, + DELETE + } + + /** + * Impl of ResourceEventHandler that simplifies event handling + * by accepting a single lambda BiConsumer for all event types (add, update, delete). + * + * @param <T> The Kubernetes resource type (e.g., Pod, Job) + */ + public static class InformerEventHandler<T> implements ResourceEventHandler<T> + { + private final BiConsumer<T, InformerEventType> eventConsumer; + + public InformerEventHandler(BiConsumer<T, InformerEventType> eventConsumer) + { + this.eventConsumer = eventConsumer; + } + + @Override + public void onAdd(T resource) + { + eventConsumer.accept(resource, InformerEventType.ADD); + } + + @Override + public void onUpdate(T oldResource, T newResource) + { + eventConsumer.accept(newResource, InformerEventType.UPDATE); + } + + @Override + public void onDelete(T resource, boolean deletedFinalStateUnknown) + { + eventConsumer.accept(resource, InformerEventType.DELETE); + } + } + private static final EmittingLogger log = new EmittingLogger(DruidKubernetesCachingClient.class); Review Comment: Nit: please add a newline before this. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SharedInformerCacheReadRequestExecutor.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +@FunctionalInterface +public interface SharedInformerCacheReadRequestExecutor<T, R> +{ + T executeRequest(SharedIndexInformer<R> informer); Review Comment: ```suggestion T readFromCache(Indexer<R> cacheIndexer); ``` ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java: ########## @@ -156,46 +172,76 @@ public boolean deletePeonJob(K8sTaskId taskId) } } + /** + * Get a LogWatch for the peon pod associated with the given taskId. Create it if it does not already exist. + * <p> + * Any issues creating the LogWatch will be logged and an absent Optional will be returned. + * </p> + * + * @return an Optional containing the {@link LogWatch} if it exists or was created. + */ public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId) { + Optional<Pod> maybePod = getPeonPod(taskId.getK8sJobName()); + if (!maybePod.isPresent()) { + log.debug("Pod for job[%s] not found in cache, cannot watch logs", taskId.getK8sJobName()); + return Optional.absent(); + } + + Pod pod = maybePod.get(); + String podName = pod.getMetadata().getName(); + KubernetesClient k8sClient = clientApi.getClient(); try { - LogWatch logWatch = k8sClient.batch() - .v1() - .jobs() - .inNamespace(namespace) - .withName(taskId.getK8sJobName()) - .inContainer("main") - .watchLog(); + LogWatch logWatch = k8sClient.pods() + .inNamespace(namespace) + .withName(podName) + .inContainer("main") + .watchLog(); if (logWatch == null) { return Optional.absent(); } return Optional.of(logWatch); } catch (Exception e) { - log.error(e, "Error watching logs from task: %s", taskId); + log.error(e, "Error watching logs from task: %s, pod: %s", taskId, podName); return Optional.absent(); } } + /** + * Get an InputStream for the logs of the peon pod associated with the given taskId. + * <p> + * Any issues creating the InputStream will be logged and an absent Optional will be returned. + * </p> + * Review Comment: Nit: I think this part can be omitted since the same info is already captured in `@return` tag too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
