xinyuiscool commented on code in PR #1605: URL: https://github.com/apache/samza/pull/1605#discussion_r891676596
########## samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java: ########## @@ -399,6 +415,18 @@ private void onShutDown() { } } + private void stopDrainMonitorAndCleanup() { + // Stop the Drain Monitor. + // Also, garbage collect all DrainNotifications from Drain metadata-store of the job if the following conditions + // are met: + // 1) Job is already draining + // 2) All containers shutdown successfully (due to drain) + // 3) There was no exception in the coordinator + drainMonitor.stop(drainMonitor.isDraining() Review Comment: Let's add a method in the DrainWriter/Util class to do cleanup(). Basically it will check the drain message and delete it if needed. ########## samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala: ########## @@ -1026,6 +1036,34 @@ class SamzaContainer( } } + def startDrainMonitor: Unit = { + if (drainMonitor != null) { + drainMonitor.registerDrainCallback(new DrainCallback { + override def onDrain(): Unit = drain() + }) + + /** + * Prior to starting the DrainMonitor, we are doing a one-time check to see if DrainNotification is present + * in the metadata store for the current deployment. + * This check is to deal with the case where a container might have re-started during Drain. + * If yes, we will set the container to drain mode to prevent it from processing any new messages. This will + * in-turn guarantee that intermediate Drain control messages from the previous incarnation of the container are + * processed and there are no duplicate intermediate control messages for the same deployment. + * */ + if (drainMonitor.shouldDrain()) { Review Comment: Let's do it part of the drainMonitor.start(). So start() will do: 1. do a one-time check to see whether it's draining. 2. If yes, then invoke the callback, no need to schedule the timer 3. If no, schedule the timer. Let's not expose the internal here. ########## samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.drain; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.samza.SamzaException; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; +import org.apache.samza.metadatastore.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DrainMonitor is intended to monitor the MetadataStore for {@link DrainNotification} and invokes + * the {@link DrainCallback}. + * */ +public class DrainMonitor { + private static final Logger LOG = LoggerFactory.getLogger(DrainMonitor.class); + + /** + * Describes the state of the monitor. + * */ + private enum State { + /** + * Initial state when DrainMonitor is not polling for DrainNotifications. + * */ + INIT, + /** + * When Drain Monitor is started, it moves from INIT to RUNNING state and starts polling + * for Drain Notifications. + * */ + RUNNING, + /** + * Indicates that the DrainMonitor is stopped as Drain was found. + * */ + DRAINING, + /** + * Indicates that the Drain Monitor is explictly stopped. + * */ + STOPPED + } + + private static final int POLLING_INTERVAL_MILLIS = 60_000; + private static final int INITIAL_POLL_DELAY_MILLIS = 0; + + private final ScheduledExecutorService schedulerService = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("Samza DrainMonitor Thread-%d") + .setDaemon(true) + .build()); + private final String appRunId; + private final long pollingIntervalMillis; + private final NamespaceAwareCoordinatorStreamStore drainMetadataStore; + private final ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper(); + + private State state = State.INIT; + private DrainCallback callback; + + private static final Integer VERSION = 1; + // namespace for the underlying metadata store + private static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" + VERSION; + + public DrainMonitor(MetadataStore metadataStore, Config config) { + this(metadataStore, config, POLLING_INTERVAL_MILLIS); + } + + public DrainMonitor(MetadataStore metadataStore, Config config, long pollingIntervalMillis) { + Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot be null."); + Preconditions.checkNotNull(config, "Config parameter cannot be null."); + Preconditions.checkArgument(pollingIntervalMillis > 0, + String.format("Polling interval specified is %d ms. It should be greater than 0.", pollingIntervalMillis)); + this.drainMetadataStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE); + ApplicationConfig applicationConfig = new ApplicationConfig(config); + this.appRunId = applicationConfig.getRunId(); + this.pollingIntervalMillis = pollingIntervalMillis; + } + + /** + * Starts the DrainMonitor. + * */ + public void start() { + Preconditions.checkState(callback != null, + "Drain Callback needs to be set using registerCallback(callback) prior to starting the DrainManager."); + switch (state) { + case INIT: + state = State.RUNNING; + schedulerService.scheduleAtFixedRate(() -> { + if (shouldDrain()) { + LOG.info("Received Drain Notification for deployment: {}", appRunId); + callback.onDrain(); + drain(); + } + }, INITIAL_POLL_DELAY_MILLIS, pollingIntervalMillis, TimeUnit.MILLISECONDS); + LOG.info("Started DrainMonitor."); + break; + case RUNNING: + case DRAINING: + case STOPPED: + LOG.warn("Cannot call start() on the DrainMonitor when it is in {} state.", state); + break; + } + } + + /** + * Stops the DrainMonitor. + * + * @param shouldClearDrainMessages parameter to indicate if the DrainMonitor should perform cleanup of old + * drain messages. + * */ + public void stop(boolean shouldClearDrainMessages) { + switch (state) { + case RUNNING: + schedulerService.shutdownNow(); + state = State.STOPPED; + LOG.info("Stopped DrainMonitor."); + break; + case INIT: + case STOPPED: + case DRAINING: + // no-op + LOG.info("Cannot call stop() on the DrainMonitor when it is in {} state.", state); + break; + } + if (shouldClearDrainMessages) { + deleteAllDrainNotifications(); + } + } + + private void drain() { + switch (state) { + case RUNNING: + schedulerService.shutdownNow(); + state = State.DRAINING; + LOG.info("Stopped DrainMonitor."); + break; + case INIT: + case STOPPED: + case DRAINING: + LOG.info("Cannot call drain() on the DrainMonitor when it is in {} state.", state); + break; + } + } + + /** + * Register a callback to be executed when DrainNotification is encountered. + * + * @param callback the callback to register. + * @return Returns {@code true} if registration was successful and {@code false} if not. + * Registration can fail it the DrainMonitor is stopped or a callback is already registered. + * */ + public boolean registerDrainCallback(DrainCallback callback) { + Preconditions.checkNotNull(callback); + + switch (state) { + case RUNNING: + case DRAINING: + case STOPPED: + LOG.warn("Cannot register callback when it is in {} state. Please register callback before calling start " + + "on DrainMonitor.", state); + return false; + case INIT: + if (this.callback != null) { + LOG.warn("Cannot register callback as a callback is already registered."); + return false; + } + this.callback = callback; + return true; + default: + return false; + } + } + + /** + * Callback for any action to executed by DrainMonitor implementations once Drain is encountered. + * Registered using {@link #registerDrainCallback(DrainCallback)}. + * */ + public interface DrainCallback { + void onDrain(); + } + + /** + * Check if the DrainMonitor is in Draining state. + * */ + public boolean isDraining() { + return state == State.DRAINING; + } + + /** + * One time check check to see if there are any DrainNotification messages available in the + * metadata store for the current deployment. + * */ + public boolean shouldDrain() { Review Comment: private? ########## samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.drain; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.samza.SamzaException; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; +import org.apache.samza.metadatastore.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DrainMonitor is intended to monitor the MetadataStore for {@link DrainNotification} and invokes + * the {@link DrainCallback}. + * */ +public class DrainMonitor { + private static final Logger LOG = LoggerFactory.getLogger(DrainMonitor.class); + + /** + * Describes the state of the monitor. + * */ + private enum State { + /** + * Initial state when DrainMonitor is not polling for DrainNotifications. + * */ + INIT, + /** + * When Drain Monitor is started, it moves from INIT to RUNNING state and starts polling + * for Drain Notifications. + * */ + RUNNING, + /** + * Indicates that the DrainMonitor is stopped as Drain was found. + * */ + DRAINING, + /** + * Indicates that the Drain Monitor is explictly stopped. + * */ + STOPPED + } + + private static final int POLLING_INTERVAL_MILLIS = 60_000; + private static final int INITIAL_POLL_DELAY_MILLIS = 0; + + private final ScheduledExecutorService schedulerService = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("Samza DrainMonitor Thread-%d") + .setDaemon(true) + .build()); + private final String appRunId; + private final long pollingIntervalMillis; + private final NamespaceAwareCoordinatorStreamStore drainMetadataStore; + private final ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper(); + + private State state = State.INIT; + private DrainCallback callback; + + private static final Integer VERSION = 1; + // namespace for the underlying metadata store + private static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" + VERSION; + + public DrainMonitor(MetadataStore metadataStore, Config config) { + this(metadataStore, config, POLLING_INTERVAL_MILLIS); + } + + public DrainMonitor(MetadataStore metadataStore, Config config, long pollingIntervalMillis) { + Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot be null."); + Preconditions.checkNotNull(config, "Config parameter cannot be null."); + Preconditions.checkArgument(pollingIntervalMillis > 0, + String.format("Polling interval specified is %d ms. It should be greater than 0.", pollingIntervalMillis)); + this.drainMetadataStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE); + ApplicationConfig applicationConfig = new ApplicationConfig(config); + this.appRunId = applicationConfig.getRunId(); + this.pollingIntervalMillis = pollingIntervalMillis; + } + + /** + * Starts the DrainMonitor. + * */ + public void start() { + Preconditions.checkState(callback != null, + "Drain Callback needs to be set using registerCallback(callback) prior to starting the DrainManager."); + switch (state) { + case INIT: + state = State.RUNNING; + schedulerService.scheduleAtFixedRate(() -> { + if (shouldDrain()) { + LOG.info("Received Drain Notification for deployment: {}", appRunId); + callback.onDrain(); + drain(); + } + }, INITIAL_POLL_DELAY_MILLIS, pollingIntervalMillis, TimeUnit.MILLISECONDS); + LOG.info("Started DrainMonitor."); + break; + case RUNNING: + case DRAINING: + case STOPPED: + LOG.warn("Cannot call start() on the DrainMonitor when it is in {} state.", state); + break; + } + } + + /** + * Stops the DrainMonitor. + * + * @param shouldClearDrainMessages parameter to indicate if the DrainMonitor should perform cleanup of old + * drain messages. + * */ + public void stop(boolean shouldClearDrainMessages) { + switch (state) { + case RUNNING: + schedulerService.shutdownNow(); + state = State.STOPPED; + LOG.info("Stopped DrainMonitor."); + break; + case INIT: + case STOPPED: + case DRAINING: + // no-op + LOG.info("Cannot call stop() on the DrainMonitor when it is in {} state.", state); + break; + } + if (shouldClearDrainMessages) { + deleteAllDrainNotifications(); + } + } + + private void drain() { + switch (state) { + case RUNNING: + schedulerService.shutdownNow(); + state = State.DRAINING; + LOG.info("Stopped DrainMonitor."); + break; + case INIT: + case STOPPED: + case DRAINING: + LOG.info("Cannot call drain() on the DrainMonitor when it is in {} state.", state); + break; + } + } + + /** + * Register a callback to be executed when DrainNotification is encountered. + * + * @param callback the callback to register. + * @return Returns {@code true} if registration was successful and {@code false} if not. + * Registration can fail it the DrainMonitor is stopped or a callback is already registered. + * */ + public boolean registerDrainCallback(DrainCallback callback) { + Preconditions.checkNotNull(callback); + + switch (state) { + case RUNNING: + case DRAINING: + case STOPPED: + LOG.warn("Cannot register callback when it is in {} state. Please register callback before calling start " + + "on DrainMonitor.", state); + return false; + case INIT: + if (this.callback != null) { + LOG.warn("Cannot register callback as a callback is already registered."); + return false; + } + this.callback = callback; + return true; + default: + return false; + } + } + + /** + * Callback for any action to executed by DrainMonitor implementations once Drain is encountered. + * Registered using {@link #registerDrainCallback(DrainCallback)}. + * */ + public interface DrainCallback { + void onDrain(); + } + + /** + * Check if the DrainMonitor is in Draining state. + * */ + public boolean isDraining() { + return state == State.DRAINING; + } + + /** + * One time check check to see if there are any DrainNotification messages available in the + * metadata store for the current deployment. + * */ + public boolean shouldDrain() { + final Optional<List<DrainNotification>> drainNotifications = readDrainNotificationMessages(); + if (drainNotifications.isPresent()) { + final ImmutableList<DrainNotification> filteredDrainNotifications = drainNotifications.get() + .stream() + .filter(notification -> appRunId.equals(notification.getDeploymentId())) + .collect(ImmutableList.toImmutableList()); + return !filteredDrainNotifications.isEmpty(); + } + return false; + } + + /** + * Reads all DrainNotification messages from the metadata store. + * */ + public Optional<List<DrainNotification>> readDrainNotificationMessages() { + final ImmutableList<DrainNotification> drainNotifications = drainMetadataStore.all() + .values() + .stream() + .map(bytes -> { + try { + return objectMapper.readValue(bytes, DrainNotification.class); + } catch (IOException e) { + LOG.error("Unable to deserialize DrainNotification from the metadata store", e); + throw new SamzaException(e); + } + }) + .collect(ImmutableList.toImmutableList()); + return drainNotifications.size() > 0 + ? Optional.of(drainNotifications) + : Optional.empty(); + } + + /** + * Deletes all DrainNotifications. + * */ + public void deleteAllDrainNotifications() { Review Comment: As discussed, move this to the write util class. ########## samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.drain; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.samza.SamzaException; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; +import org.apache.samza.metadatastore.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DrainMonitor is intended to monitor the MetadataStore for {@link DrainNotification} and invokes + * the {@link DrainCallback}. + * */ +public class DrainMonitor { + private static final Logger LOG = LoggerFactory.getLogger(DrainMonitor.class); + + /** + * Describes the state of the monitor. + * */ + private enum State { + /** + * Initial state when DrainMonitor is not polling for DrainNotifications. + * */ + INIT, + /** + * When Drain Monitor is started, it moves from INIT to RUNNING state and starts polling + * for Drain Notifications. + * */ + RUNNING, + /** + * Indicates that the DrainMonitor is stopped as Drain was found. + * */ + DRAINING, + /** + * Indicates that the Drain Monitor is explictly stopped. + * */ + STOPPED + } + + private static final int POLLING_INTERVAL_MILLIS = 60_000; + private static final int INITIAL_POLL_DELAY_MILLIS = 0; + + private final ScheduledExecutorService schedulerService = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("Samza DrainMonitor Thread-%d") + .setDaemon(true) + .build()); + private final String appRunId; + private final long pollingIntervalMillis; + private final NamespaceAwareCoordinatorStreamStore drainMetadataStore; + private final ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper(); + + private State state = State.INIT; + private DrainCallback callback; + + private static final Integer VERSION = 1; + // namespace for the underlying metadata store + private static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" + VERSION; + + public DrainMonitor(MetadataStore metadataStore, Config config) { + this(metadataStore, config, POLLING_INTERVAL_MILLIS); + } + + public DrainMonitor(MetadataStore metadataStore, Config config, long pollingIntervalMillis) { + Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot be null."); + Preconditions.checkNotNull(config, "Config parameter cannot be null."); + Preconditions.checkArgument(pollingIntervalMillis > 0, + String.format("Polling interval specified is %d ms. It should be greater than 0.", pollingIntervalMillis)); + this.drainMetadataStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE); + ApplicationConfig applicationConfig = new ApplicationConfig(config); + this.appRunId = applicationConfig.getRunId(); + this.pollingIntervalMillis = pollingIntervalMillis; + } + + /** + * Starts the DrainMonitor. + * */ + public void start() { + Preconditions.checkState(callback != null, + "Drain Callback needs to be set using registerCallback(callback) prior to starting the DrainManager."); + switch (state) { + case INIT: + state = State.RUNNING; + schedulerService.scheduleAtFixedRate(() -> { + if (shouldDrain()) { + LOG.info("Received Drain Notification for deployment: {}", appRunId); + callback.onDrain(); + drain(); + } + }, INITIAL_POLL_DELAY_MILLIS, pollingIntervalMillis, TimeUnit.MILLISECONDS); + LOG.info("Started DrainMonitor."); + break; + case RUNNING: + case DRAINING: + case STOPPED: + LOG.warn("Cannot call start() on the DrainMonitor when it is in {} state.", state); + break; + } + } + + /** + * Stops the DrainMonitor. + * + * @param shouldClearDrainMessages parameter to indicate if the DrainMonitor should perform cleanup of old + * drain messages. + * */ + public void stop(boolean shouldClearDrainMessages) { + switch (state) { + case RUNNING: + schedulerService.shutdownNow(); + state = State.STOPPED; + LOG.info("Stopped DrainMonitor."); + break; + case INIT: + case STOPPED: + case DRAINING: + // no-op + LOG.info("Cannot call stop() on the DrainMonitor when it is in {} state.", state); + break; + } + if (shouldClearDrainMessages) { + deleteAllDrainNotifications(); + } + } + + private void drain() { + switch (state) { + case RUNNING: + schedulerService.shutdownNow(); + state = State.DRAINING; + LOG.info("Stopped DrainMonitor."); + break; + case INIT: + case STOPPED: + case DRAINING: + LOG.info("Cannot call drain() on the DrainMonitor when it is in {} state.", state); + break; + } + } + + /** + * Register a callback to be executed when DrainNotification is encountered. + * + * @param callback the callback to register. + * @return Returns {@code true} if registration was successful and {@code false} if not. + * Registration can fail it the DrainMonitor is stopped or a callback is already registered. + * */ + public boolean registerDrainCallback(DrainCallback callback) { + Preconditions.checkNotNull(callback); + + switch (state) { + case RUNNING: + case DRAINING: + case STOPPED: + LOG.warn("Cannot register callback when it is in {} state. Please register callback before calling start " + + "on DrainMonitor.", state); + return false; + case INIT: + if (this.callback != null) { + LOG.warn("Cannot register callback as a callback is already registered."); + return false; + } + this.callback = callback; + return true; + default: + return false; + } + } + + /** + * Callback for any action to executed by DrainMonitor implementations once Drain is encountered. + * Registered using {@link #registerDrainCallback(DrainCallback)}. + * */ + public interface DrainCallback { + void onDrain(); + } + + /** + * Check if the DrainMonitor is in Draining state. + * */ + public boolean isDraining() { + return state == State.DRAINING; + } + + /** + * One time check check to see if there are any DrainNotification messages available in the + * metadata store for the current deployment. + * */ + public boolean shouldDrain() { + final Optional<List<DrainNotification>> drainNotifications = readDrainNotificationMessages(); + if (drainNotifications.isPresent()) { + final ImmutableList<DrainNotification> filteredDrainNotifications = drainNotifications.get() + .stream() + .filter(notification -> appRunId.equals(notification.getDeploymentId())) + .collect(ImmutableList.toImmutableList()); + return !filteredDrainNotifications.isEmpty(); + } + return false; + } + + /** + * Reads all DrainNotification messages from the metadata store. + * */ + public Optional<List<DrainNotification>> readDrainNotificationMessages() { Review Comment: package private? ########## samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java: ########## @@ -233,6 +241,10 @@ public ClusterBasedJobCoordinator(MetricsRegistryMap metrics, MetadataStore meta new ApplicationConfig(config)); this.containerPlacementRequestAllocatorThread = new Thread(containerPlacementRequestAllocator, "Samza-" + ContainerPlacementRequestAllocator.class.getSimpleName()); + + // build DrainMonitor + this.drainMonitor = new DrainMonitor(metadataStore, config); Review Comment: As discussed offline, we don't need to listen to the drain message in JC. So let's not create DrainMonitor here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org