xinyuiscool commented on code in PR #1605:
URL: https://github.com/apache/samza/pull/1605#discussion_r891676596


##########
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java:
##########
@@ -399,6 +415,18 @@ private void onShutDown() {
     }
   }
 
+  private void stopDrainMonitorAndCleanup() {
+    // Stop the Drain Monitor.
+    // Also, garbage collect all DrainNotifications from Drain metadata-store 
of the job if the following conditions
+    // are met:
+    // 1) Job is already draining
+    // 2) All containers shutdown successfully (due to drain)
+    // 3) There was no exception in the coordinator
+    drainMonitor.stop(drainMonitor.isDraining()

Review Comment:
   Let's add a method in the DrainWriter/Util class to do cleanup(). Basically 
it will check the drain message and delete it if needed.



##########
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:
##########
@@ -1026,6 +1036,34 @@ class SamzaContainer(
     }
   }
 
+  def startDrainMonitor: Unit = {
+    if (drainMonitor != null) {
+      drainMonitor.registerDrainCallback(new DrainCallback {
+        override def onDrain(): Unit = drain()
+      })
+
+      /**
+       * Prior to starting the DrainMonitor, we are doing a one-time check to 
see if DrainNotification is present
+       * in the metadata store for the current deployment.
+       * This check is to deal with the case where a container might have 
re-started during Drain.
+       * If yes, we will set the container to drain mode to prevent it from 
processing any new messages. This will
+       * in-turn guarantee that intermediate Drain control messages from the 
previous incarnation of the container are
+       * processed and there are no duplicate intermediate control messages 
for the same deployment.
+       * */
+      if (drainMonitor.shouldDrain()) {

Review Comment:
   Let's do it part of the drainMonitor.start(). So start() will do:
   1. do a one-time check to see whether it's draining.
   2. If yes, then invoke the callback, no need to schedule the timer
   3. If no, schedule the timer.
   
   Let's not expose the internal here.



##########
samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainMonitor is intended to monitor the MetadataStore for {@link 
DrainNotification} and invokes
+ * the {@link DrainCallback}.
+ * */
+public class DrainMonitor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DrainMonitor.class);
+
+  /**
+   * Describes the state of the monitor.
+   * */
+  private enum State {
+    /**
+     * Initial state when DrainMonitor is not polling for DrainNotifications.
+     * */
+    INIT,
+    /**
+     * When Drain Monitor is started, it moves from INIT to RUNNING state and 
starts polling
+     * for Drain Notifications.
+     * */
+    RUNNING,
+    /**
+     * Indicates that the DrainMonitor is stopped as Drain was found.
+     * */
+    DRAINING,
+    /**
+     * Indicates that the Drain Monitor is explictly stopped.
+     * */
+    STOPPED
+  }
+
+  private static final int POLLING_INTERVAL_MILLIS = 60_000;
+  private static final int INITIAL_POLL_DELAY_MILLIS = 0;
+
+  private final ScheduledExecutorService schedulerService =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("Samza DrainMonitor Thread-%d")
+              .setDaemon(true)
+              .build());
+  private final String appRunId;
+  private final long pollingIntervalMillis;
+  private final NamespaceAwareCoordinatorStreamStore drainMetadataStore;
+  private final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+
+  private State state = State.INIT;
+  private DrainCallback callback;
+
+  private static final Integer VERSION = 1;
+  // namespace for the underlying metadata store
+  private static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" 
+ VERSION;
+
+  public DrainMonitor(MetadataStore metadataStore, Config config) {
+    this(metadataStore, config, POLLING_INTERVAL_MILLIS);
+  }
+
+  public DrainMonitor(MetadataStore metadataStore, Config config, long 
pollingIntervalMillis) {
+    Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot 
be null.");
+    Preconditions.checkNotNull(config, "Config parameter cannot be null.");
+    Preconditions.checkArgument(pollingIntervalMillis > 0,
+        String.format("Polling interval specified is %d ms. It should be 
greater than 0.", pollingIntervalMillis));
+    this.drainMetadataStore = new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+    ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    this.appRunId = applicationConfig.getRunId();
+    this.pollingIntervalMillis = pollingIntervalMillis;
+  }
+
+  /**
+   * Starts the DrainMonitor.
+   * */
+  public void start() {
+    Preconditions.checkState(callback != null,
+        "Drain Callback needs to be set using registerCallback(callback) prior 
to starting the DrainManager.");
+    switch (state) {
+      case INIT:
+        state = State.RUNNING;
+        schedulerService.scheduleAtFixedRate(() -> {
+          if (shouldDrain()) {
+            LOG.info("Received Drain Notification for deployment: {}", 
appRunId);
+            callback.onDrain();
+            drain();
+          }
+        }, INITIAL_POLL_DELAY_MILLIS, pollingIntervalMillis, 
TimeUnit.MILLISECONDS);
+        LOG.info("Started DrainMonitor.");
+        break;
+      case RUNNING:
+      case DRAINING:
+      case STOPPED:
+        LOG.warn("Cannot call start() on the DrainMonitor when it is in {} 
state.", state);
+        break;
+    }
+  }
+
+  /**
+   * Stops the DrainMonitor.
+   *
+   * @param shouldClearDrainMessages parameter to indicate if the DrainMonitor 
should perform cleanup of old
+   *                                 drain messages.
+   * */
+  public void stop(boolean shouldClearDrainMessages) {
+    switch (state) {
+      case RUNNING:
+        schedulerService.shutdownNow();
+        state = State.STOPPED;
+        LOG.info("Stopped DrainMonitor.");
+        break;
+      case INIT:
+      case STOPPED:
+      case DRAINING:
+        // no-op
+        LOG.info("Cannot call stop() on the DrainMonitor when it is in {} 
state.", state);
+        break;
+    }
+    if (shouldClearDrainMessages) {
+      deleteAllDrainNotifications();
+    }
+  }
+
+  private void drain() {
+    switch (state) {
+      case RUNNING:
+        schedulerService.shutdownNow();
+        state = State.DRAINING;
+        LOG.info("Stopped DrainMonitor.");
+        break;
+      case INIT:
+      case STOPPED:
+      case DRAINING:
+        LOG.info("Cannot call drain() on the DrainMonitor when it is in {} 
state.", state);
+        break;
+    }
+  }
+
+  /**
+   * Register a callback to be executed when DrainNotification is encountered.
+   *
+   * @param callback the callback to register.
+   * @return Returns {@code true} if registration was successful and {@code 
false} if not.
+   * Registration can fail it the DrainMonitor is stopped or a callback is 
already registered.
+   * */
+  public boolean registerDrainCallback(DrainCallback callback) {
+    Preconditions.checkNotNull(callback);
+
+    switch (state) {
+      case RUNNING:
+      case DRAINING:
+      case STOPPED:
+        LOG.warn("Cannot register callback when it is in {} state. Please 
register callback before calling start "
+            + "on DrainMonitor.", state);
+        return false;
+      case INIT:
+        if (this.callback != null) {
+          LOG.warn("Cannot register callback as a callback is already 
registered.");
+          return false;
+        }
+        this.callback = callback;
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Callback for any action to executed by DrainMonitor implementations once 
Drain is encountered.
+   * Registered using {@link #registerDrainCallback(DrainCallback)}.
+   * */
+  public interface DrainCallback {
+    void onDrain();
+  }
+
+  /**
+   * Check if the DrainMonitor is in Draining state.
+   * */
+  public boolean isDraining() {
+    return state == State.DRAINING;
+  }
+
+  /**
+   * One time check check to see if there are any DrainNotification messages 
available in the
+   * metadata store for the current deployment.
+   * */
+  public boolean shouldDrain() {

Review Comment:
   private?



##########
samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainMonitor is intended to monitor the MetadataStore for {@link 
DrainNotification} and invokes
+ * the {@link DrainCallback}.
+ * */
+public class DrainMonitor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DrainMonitor.class);
+
+  /**
+   * Describes the state of the monitor.
+   * */
+  private enum State {
+    /**
+     * Initial state when DrainMonitor is not polling for DrainNotifications.
+     * */
+    INIT,
+    /**
+     * When Drain Monitor is started, it moves from INIT to RUNNING state and 
starts polling
+     * for Drain Notifications.
+     * */
+    RUNNING,
+    /**
+     * Indicates that the DrainMonitor is stopped as Drain was found.
+     * */
+    DRAINING,
+    /**
+     * Indicates that the Drain Monitor is explictly stopped.
+     * */
+    STOPPED
+  }
+
+  private static final int POLLING_INTERVAL_MILLIS = 60_000;
+  private static final int INITIAL_POLL_DELAY_MILLIS = 0;
+
+  private final ScheduledExecutorService schedulerService =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("Samza DrainMonitor Thread-%d")
+              .setDaemon(true)
+              .build());
+  private final String appRunId;
+  private final long pollingIntervalMillis;
+  private final NamespaceAwareCoordinatorStreamStore drainMetadataStore;
+  private final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+
+  private State state = State.INIT;
+  private DrainCallback callback;
+
+  private static final Integer VERSION = 1;
+  // namespace for the underlying metadata store
+  private static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" 
+ VERSION;
+
+  public DrainMonitor(MetadataStore metadataStore, Config config) {
+    this(metadataStore, config, POLLING_INTERVAL_MILLIS);
+  }
+
+  public DrainMonitor(MetadataStore metadataStore, Config config, long 
pollingIntervalMillis) {
+    Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot 
be null.");
+    Preconditions.checkNotNull(config, "Config parameter cannot be null.");
+    Preconditions.checkArgument(pollingIntervalMillis > 0,
+        String.format("Polling interval specified is %d ms. It should be 
greater than 0.", pollingIntervalMillis));
+    this.drainMetadataStore = new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+    ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    this.appRunId = applicationConfig.getRunId();
+    this.pollingIntervalMillis = pollingIntervalMillis;
+  }
+
+  /**
+   * Starts the DrainMonitor.
+   * */
+  public void start() {
+    Preconditions.checkState(callback != null,
+        "Drain Callback needs to be set using registerCallback(callback) prior 
to starting the DrainManager.");
+    switch (state) {
+      case INIT:
+        state = State.RUNNING;
+        schedulerService.scheduleAtFixedRate(() -> {
+          if (shouldDrain()) {
+            LOG.info("Received Drain Notification for deployment: {}", 
appRunId);
+            callback.onDrain();
+            drain();
+          }
+        }, INITIAL_POLL_DELAY_MILLIS, pollingIntervalMillis, 
TimeUnit.MILLISECONDS);
+        LOG.info("Started DrainMonitor.");
+        break;
+      case RUNNING:
+      case DRAINING:
+      case STOPPED:
+        LOG.warn("Cannot call start() on the DrainMonitor when it is in {} 
state.", state);
+        break;
+    }
+  }
+
+  /**
+   * Stops the DrainMonitor.
+   *
+   * @param shouldClearDrainMessages parameter to indicate if the DrainMonitor 
should perform cleanup of old
+   *                                 drain messages.
+   * */
+  public void stop(boolean shouldClearDrainMessages) {
+    switch (state) {
+      case RUNNING:
+        schedulerService.shutdownNow();
+        state = State.STOPPED;
+        LOG.info("Stopped DrainMonitor.");
+        break;
+      case INIT:
+      case STOPPED:
+      case DRAINING:
+        // no-op
+        LOG.info("Cannot call stop() on the DrainMonitor when it is in {} 
state.", state);
+        break;
+    }
+    if (shouldClearDrainMessages) {
+      deleteAllDrainNotifications();
+    }
+  }
+
+  private void drain() {
+    switch (state) {
+      case RUNNING:
+        schedulerService.shutdownNow();
+        state = State.DRAINING;
+        LOG.info("Stopped DrainMonitor.");
+        break;
+      case INIT:
+      case STOPPED:
+      case DRAINING:
+        LOG.info("Cannot call drain() on the DrainMonitor when it is in {} 
state.", state);
+        break;
+    }
+  }
+
+  /**
+   * Register a callback to be executed when DrainNotification is encountered.
+   *
+   * @param callback the callback to register.
+   * @return Returns {@code true} if registration was successful and {@code 
false} if not.
+   * Registration can fail it the DrainMonitor is stopped or a callback is 
already registered.
+   * */
+  public boolean registerDrainCallback(DrainCallback callback) {
+    Preconditions.checkNotNull(callback);
+
+    switch (state) {
+      case RUNNING:
+      case DRAINING:
+      case STOPPED:
+        LOG.warn("Cannot register callback when it is in {} state. Please 
register callback before calling start "
+            + "on DrainMonitor.", state);
+        return false;
+      case INIT:
+        if (this.callback != null) {
+          LOG.warn("Cannot register callback as a callback is already 
registered.");
+          return false;
+        }
+        this.callback = callback;
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Callback for any action to executed by DrainMonitor implementations once 
Drain is encountered.
+   * Registered using {@link #registerDrainCallback(DrainCallback)}.
+   * */
+  public interface DrainCallback {
+    void onDrain();
+  }
+
+  /**
+   * Check if the DrainMonitor is in Draining state.
+   * */
+  public boolean isDraining() {
+    return state == State.DRAINING;
+  }
+
+  /**
+   * One time check check to see if there are any DrainNotification messages 
available in the
+   * metadata store for the current deployment.
+   * */
+  public boolean shouldDrain() {
+    final Optional<List<DrainNotification>> drainNotifications = 
readDrainNotificationMessages();
+    if (drainNotifications.isPresent()) {
+      final ImmutableList<DrainNotification> filteredDrainNotifications = 
drainNotifications.get()
+          .stream()
+          .filter(notification -> 
appRunId.equals(notification.getDeploymentId()))
+          .collect(ImmutableList.toImmutableList());
+      return !filteredDrainNotifications.isEmpty();
+    }
+    return false;
+  }
+
+  /**
+   * Reads all DrainNotification messages from the metadata store.
+   * */
+  public Optional<List<DrainNotification>> readDrainNotificationMessages() {
+    final ImmutableList<DrainNotification> drainNotifications = 
drainMetadataStore.all()
+        .values()
+        .stream()
+        .map(bytes -> {
+          try {
+            return objectMapper.readValue(bytes, DrainNotification.class);
+          } catch (IOException e) {
+            LOG.error("Unable to deserialize DrainNotification from the 
metadata store", e);
+            throw new SamzaException(e);
+          }
+        })
+        .collect(ImmutableList.toImmutableList());
+    return drainNotifications.size() > 0
+        ? Optional.of(drainNotifications)
+        : Optional.empty();
+  }
+
+  /**
+   * Deletes all DrainNotifications.
+   * */
+  public void deleteAllDrainNotifications() {

Review Comment:
   As discussed, move this to the write util class.



##########
samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainMonitor is intended to monitor the MetadataStore for {@link 
DrainNotification} and invokes
+ * the {@link DrainCallback}.
+ * */
+public class DrainMonitor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DrainMonitor.class);
+
+  /**
+   * Describes the state of the monitor.
+   * */
+  private enum State {
+    /**
+     * Initial state when DrainMonitor is not polling for DrainNotifications.
+     * */
+    INIT,
+    /**
+     * When Drain Monitor is started, it moves from INIT to RUNNING state and 
starts polling
+     * for Drain Notifications.
+     * */
+    RUNNING,
+    /**
+     * Indicates that the DrainMonitor is stopped as Drain was found.
+     * */
+    DRAINING,
+    /**
+     * Indicates that the Drain Monitor is explictly stopped.
+     * */
+    STOPPED
+  }
+
+  private static final int POLLING_INTERVAL_MILLIS = 60_000;
+  private static final int INITIAL_POLL_DELAY_MILLIS = 0;
+
+  private final ScheduledExecutorService schedulerService =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("Samza DrainMonitor Thread-%d")
+              .setDaemon(true)
+              .build());
+  private final String appRunId;
+  private final long pollingIntervalMillis;
+  private final NamespaceAwareCoordinatorStreamStore drainMetadataStore;
+  private final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+
+  private State state = State.INIT;
+  private DrainCallback callback;
+
+  private static final Integer VERSION = 1;
+  // namespace for the underlying metadata store
+  private static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" 
+ VERSION;
+
+  public DrainMonitor(MetadataStore metadataStore, Config config) {
+    this(metadataStore, config, POLLING_INTERVAL_MILLIS);
+  }
+
+  public DrainMonitor(MetadataStore metadataStore, Config config, long 
pollingIntervalMillis) {
+    Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot 
be null.");
+    Preconditions.checkNotNull(config, "Config parameter cannot be null.");
+    Preconditions.checkArgument(pollingIntervalMillis > 0,
+        String.format("Polling interval specified is %d ms. It should be 
greater than 0.", pollingIntervalMillis));
+    this.drainMetadataStore = new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+    ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    this.appRunId = applicationConfig.getRunId();
+    this.pollingIntervalMillis = pollingIntervalMillis;
+  }
+
+  /**
+   * Starts the DrainMonitor.
+   * */
+  public void start() {
+    Preconditions.checkState(callback != null,
+        "Drain Callback needs to be set using registerCallback(callback) prior 
to starting the DrainManager.");
+    switch (state) {
+      case INIT:
+        state = State.RUNNING;
+        schedulerService.scheduleAtFixedRate(() -> {
+          if (shouldDrain()) {
+            LOG.info("Received Drain Notification for deployment: {}", 
appRunId);
+            callback.onDrain();
+            drain();
+          }
+        }, INITIAL_POLL_DELAY_MILLIS, pollingIntervalMillis, 
TimeUnit.MILLISECONDS);
+        LOG.info("Started DrainMonitor.");
+        break;
+      case RUNNING:
+      case DRAINING:
+      case STOPPED:
+        LOG.warn("Cannot call start() on the DrainMonitor when it is in {} 
state.", state);
+        break;
+    }
+  }
+
+  /**
+   * Stops the DrainMonitor.
+   *
+   * @param shouldClearDrainMessages parameter to indicate if the DrainMonitor 
should perform cleanup of old
+   *                                 drain messages.
+   * */
+  public void stop(boolean shouldClearDrainMessages) {
+    switch (state) {
+      case RUNNING:
+        schedulerService.shutdownNow();
+        state = State.STOPPED;
+        LOG.info("Stopped DrainMonitor.");
+        break;
+      case INIT:
+      case STOPPED:
+      case DRAINING:
+        // no-op
+        LOG.info("Cannot call stop() on the DrainMonitor when it is in {} 
state.", state);
+        break;
+    }
+    if (shouldClearDrainMessages) {
+      deleteAllDrainNotifications();
+    }
+  }
+
+  private void drain() {
+    switch (state) {
+      case RUNNING:
+        schedulerService.shutdownNow();
+        state = State.DRAINING;
+        LOG.info("Stopped DrainMonitor.");
+        break;
+      case INIT:
+      case STOPPED:
+      case DRAINING:
+        LOG.info("Cannot call drain() on the DrainMonitor when it is in {} 
state.", state);
+        break;
+    }
+  }
+
+  /**
+   * Register a callback to be executed when DrainNotification is encountered.
+   *
+   * @param callback the callback to register.
+   * @return Returns {@code true} if registration was successful and {@code 
false} if not.
+   * Registration can fail it the DrainMonitor is stopped or a callback is 
already registered.
+   * */
+  public boolean registerDrainCallback(DrainCallback callback) {
+    Preconditions.checkNotNull(callback);
+
+    switch (state) {
+      case RUNNING:
+      case DRAINING:
+      case STOPPED:
+        LOG.warn("Cannot register callback when it is in {} state. Please 
register callback before calling start "
+            + "on DrainMonitor.", state);
+        return false;
+      case INIT:
+        if (this.callback != null) {
+          LOG.warn("Cannot register callback as a callback is already 
registered.");
+          return false;
+        }
+        this.callback = callback;
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Callback for any action to executed by DrainMonitor implementations once 
Drain is encountered.
+   * Registered using {@link #registerDrainCallback(DrainCallback)}.
+   * */
+  public interface DrainCallback {
+    void onDrain();
+  }
+
+  /**
+   * Check if the DrainMonitor is in Draining state.
+   * */
+  public boolean isDraining() {
+    return state == State.DRAINING;
+  }
+
+  /**
+   * One time check check to see if there are any DrainNotification messages 
available in the
+   * metadata store for the current deployment.
+   * */
+  public boolean shouldDrain() {
+    final Optional<List<DrainNotification>> drainNotifications = 
readDrainNotificationMessages();
+    if (drainNotifications.isPresent()) {
+      final ImmutableList<DrainNotification> filteredDrainNotifications = 
drainNotifications.get()
+          .stream()
+          .filter(notification -> 
appRunId.equals(notification.getDeploymentId()))
+          .collect(ImmutableList.toImmutableList());
+      return !filteredDrainNotifications.isEmpty();
+    }
+    return false;
+  }
+
+  /**
+   * Reads all DrainNotification messages from the metadata store.
+   * */
+  public Optional<List<DrainNotification>> readDrainNotificationMessages() {

Review Comment:
   package private?



##########
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java:
##########
@@ -233,6 +241,10 @@ public ClusterBasedJobCoordinator(MetricsRegistryMap 
metrics, MetadataStore meta
             new ApplicationConfig(config));
     this.containerPlacementRequestAllocatorThread =
         new Thread(containerPlacementRequestAllocator, "Samza-" + 
ContainerPlacementRequestAllocator.class.getSimpleName());
+
+    // build DrainMonitor
+    this.drainMonitor = new DrainMonitor(metadataStore, config);

Review Comment:
   As discussed offline, we don't need to listen to the drain message in JC. So 
let's not create DrainMonitor here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to