xinyuiscool commented on code in PR #1605:
URL: https://github.com/apache/samza/pull/1605#discussion_r895941945


##########
samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ .* with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.UUID;
+import joptsimple.internal.Strings;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainUtils provides utility methods for managing {@link DrainNotification} 
in the the provided {@link MetadataStore}.
+ * */
+public class DrainUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(DrainUtils.class);
+  private static final Integer VERSION = 1;
+  // namespace for the underlying metadata store
+  public static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" 
+ VERSION;
+
+  private DrainUtils() {
+  }
+
+  /**
+   * Writes a {@link DrainNotification} to the underlying metastore. This 
method should be used by external controllers
+   * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+   * @param metadataStore Metadata store to write drain notification to.
+   * @param deploymentId deploymentId for the DrainNotification
+   *
+   * @return generated uuid for the DrainNotification
+   */
+  public static UUID writeDrainNotification(MetadataStore metadataStore, 
String deploymentId) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(deploymentId), 
"deploymentId should be non-null.");
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+    final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+    final UUID uuid = UUID.randomUUID();
+    final DrainNotification message = new DrainNotification(uuid, 
deploymentId);
+    try {
+      drainMetadataStore.put(message.getUuid().toString(), 
objectMapper.writeValueAsBytes(message));
+      drainMetadataStore.flush();
+      LOG.info("DrainNotification with id {} written to metadata-store for the 
deployment ID {}", uuid, deploymentId);
+    } catch (Exception ex) {
+      throw new SamzaException(
+          String.format("DrainNotification might have been not written to 
metastore %s", message), ex);
+    }
+    return uuid;
+  }
+
+  /**
+   * Cleans up DrainNotifications for the current deployment from the 
underlying metadata store.
+   * The current deploymentId is extracted from the config.
+   *
+   * @param metadataStore underlying metadata store
+   * @param config Config for the job. Used to extract the deploymentId of the 
job.
+   * */
+  public static void cleanup(MetadataStore metadataStore, Config config) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    Preconditions.checkNotNull(config, "Config parameter cannot be null.");
+
+    final ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    final String deploymentId = applicationConfig.getRunId();
+    final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+
+    if (DrainMonitor.shouldDrain(drainMetadataStore, deploymentId)) {
+      drainMetadataStore.all()

Review Comment:
   Let's add a line here to state we are doing drain for better trouble 
shooting.



##########
samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ .* with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.UUID;
+import joptsimple.internal.Strings;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainUtils provides utility methods for managing {@link DrainNotification} 
in the the provided {@link MetadataStore}.
+ * */
+public class DrainUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(DrainUtils.class);
+  private static final Integer VERSION = 1;
+  // namespace for the underlying metadata store
+  public static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" 
+ VERSION;
+
+  private DrainUtils() {
+  }
+
+  /**
+   * Writes a {@link DrainNotification} to the underlying metastore. This 
method should be used by external controllers
+   * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+   * @param metadataStore Metadata store to write drain notification to.
+   * @param deploymentId deploymentId for the DrainNotification
+   *
+   * @return generated uuid for the DrainNotification
+   */
+  public static UUID writeDrainNotification(MetadataStore metadataStore, 
String deploymentId) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(deploymentId), 
"deploymentId should be non-null.");
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+    final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+    final UUID uuid = UUID.randomUUID();
+    final DrainNotification message = new DrainNotification(uuid, 
deploymentId);
+    try {
+      drainMetadataStore.put(message.getUuid().toString(), 
objectMapper.writeValueAsBytes(message));
+      drainMetadataStore.flush();
+      LOG.info("DrainNotification with id {} written to metadata-store for the 
deployment ID {}", uuid, deploymentId);
+    } catch (Exception ex) {
+      throw new SamzaException(
+          String.format("DrainNotification might have been not written to 
metastore %s", message), ex);
+    }
+    return uuid;
+  }
+
+  /**
+   * Cleans up DrainNotifications for the current deployment from the 
underlying metadata store.
+   * The current deploymentId is extracted from the config.
+   *
+   * @param metadataStore underlying metadata store
+   * @param config Config for the job. Used to extract the deploymentId of the 
job.
+   * */
+  public static void cleanup(MetadataStore metadataStore, Config config) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    Preconditions.checkNotNull(config, "Config parameter cannot be null.");
+
+    final ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    final String deploymentId = applicationConfig.getRunId();
+    final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+
+    if (DrainMonitor.shouldDrain(drainMetadataStore, deploymentId)) {
+      drainMetadataStore.all()
+          .values()
+          .stream()
+          .map(bytes -> {
+            try {
+              return objectMapper.readValue(bytes, DrainNotification.class);
+            } catch (IOException e) {
+              LOG.error("Unable to deserialize DrainNotification from the 
metadata store", e);
+              throw new SamzaException(e);
+            }
+          })
+          .filter(notification -> 
deploymentId.equals(notification.getDeploymentId()))
+          .forEach(notification -> 
drainMetadataStore.delete(notification.getUuid().toString()));
+
+      drainMetadataStore.flush();
+      LOG.info("Successfully cleaned up DrainNotifications from the 
metadata-store for the current deployment {}", deploymentId);
+    } else {
+      LOG.info("No DrainNotification found in the metadata-store for the 
current deployment {}. No need to cleanup.",
+          deploymentId);
+    }
+  }
+
+  /**
+   * Cleans up all DrainNotifications irrespective of the deploymentId.
+   * */
+  public static void cleanupAll(MetadataStore metadataStore) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =

Review Comment:
   Do a log in the first line.



##########
samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ .* with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.UUID;
+import joptsimple.internal.Strings;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainUtils provides utility methods for managing {@link DrainNotification} 
in the the provided {@link MetadataStore}.
+ * */
+public class DrainUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(DrainUtils.class);
+  private static final Integer VERSION = 1;
+  // namespace for the underlying metadata store
+  public static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" 
+ VERSION;
+
+  private DrainUtils() {
+  }
+
+  /**
+   * Writes a {@link DrainNotification} to the underlying metastore. This 
method should be used by external controllers
+   * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+   * @param metadataStore Metadata store to write drain notification to.
+   * @param deploymentId deploymentId for the DrainNotification
+   *
+   * @return generated uuid for the DrainNotification
+   */
+  public static UUID writeDrainNotification(MetadataStore metadataStore, 
String deploymentId) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(deploymentId), 
"deploymentId should be non-null.");
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =

Review Comment:
   Let's do a log up front to indicate we are starting to write drain message 
for this deploymentId.



##########
samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java:
##########
@@ -141,6 +142,8 @@ static void run(
               samzaEpochId, config);
       MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
 
+      DrainMonitor drainMonitor = new DrainMonitor(coordinatorStreamStore, 
config);

Review Comment:
   I think it might be safer to use a config to control the creation of this 
drainMonitor. For example, "samza.drainmonitor.enabled" or some thing. We pass 
in Option.of(null) if this config is set to force. Then we can turn it on for a 
few jobs first before setting default as true.



##########
samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainMonitor is intended to monitor the MetadataStore for {@link 
DrainNotification} and invoke
+ * the {@link DrainCallback}.
+ * */
+public class DrainMonitor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DrainMonitor.class);
+
+  /**
+   * Describes the state of the monitor.
+   * */
+  public enum State {
+    /**
+     * Initial state when DrainMonitor is not polling for DrainNotifications.
+     * */
+    INIT,
+    /**
+     * When Drain Monitor is started, it moves from INIT to RUNNING state and 
starts polling
+     * for Drain Notifications.
+     * */
+    RUNNING,
+    /**
+     * Indicates that the DrainMonitor is stopped as Drain was found.
+     * */
+    DRAINING,

Review Comment:
   Seems DRAINING state is not being used that much. The logic below mainly 
transition from INIT -> RUNNING -> STOP. I suggest removing this state.



##########
samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala:
##########
@@ -85,6 +86,8 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
       }
     }
 
+    val drainMonitor = new DrainMonitor(coordinatorStreamStore, config)

Review Comment:
   Same above, Using the config to create.



##########
samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainMonitor is intended to monitor the MetadataStore for {@link 
DrainNotification} and invoke
+ * the {@link DrainCallback}.
+ * */
+public class DrainMonitor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DrainMonitor.class);
+
+  /**
+   * Describes the state of the monitor.
+   * */
+  public enum State {
+    /**
+     * Initial state when DrainMonitor is not polling for DrainNotifications.
+     * */
+    INIT,
+    /**
+     * When Drain Monitor is started, it moves from INIT to RUNNING state and 
starts polling
+     * for Drain Notifications.
+     * */
+    RUNNING,
+    /**
+     * Indicates that the DrainMonitor is stopped as Drain was found.
+     * */
+    DRAINING,
+    /**
+     * Indicates that the Drain Monitor is explictly stopped.
+     * */
+    STOPPED
+  }
+
+  private static final int POLLING_INTERVAL_MILLIS = 60_000;
+  private static final int INITIAL_POLL_DELAY_MILLIS = 0;
+
+  private final ScheduledExecutorService schedulerService =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("Samza DrainMonitor Thread-%d")
+              .setDaemon(true)
+              .build());
+  private final String appRunId;
+  private final long pollingIntervalMillis;
+  private final NamespaceAwareCoordinatorStreamStore drainMetadataStore;
+  // Used to guard write access to state.
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  private State state = State.INIT;
+  private DrainCallback callback;
+
+  public DrainMonitor(MetadataStore metadataStore, Config config) {
+    this(metadataStore, config, POLLING_INTERVAL_MILLIS);
+  }
+
+  public DrainMonitor(MetadataStore metadataStore, Config config, long 
pollingIntervalMillis) {
+    Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot 
be null.");
+    Preconditions.checkNotNull(config, "Config parameter cannot be null.");
+    Preconditions.checkArgument(pollingIntervalMillis > 0,
+        String.format("Polling interval specified is %d ms. It should be 
greater than 0.", pollingIntervalMillis));
+    this.drainMetadataStore =
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
DrainUtils.DRAIN_METADATA_STORE_NAMESPACE);
+    ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    this.appRunId = applicationConfig.getRunId();
+    this.pollingIntervalMillis = pollingIntervalMillis;
+  }
+
+  /**
+   * Starts the DrainMonitor.
+   * */
+  public void start() {
+    Preconditions.checkState(callback != null,
+        "Drain Callback needs to be set using registerCallback(callback) prior 
to starting the DrainManager.");
+    synchronized (lock) {
+      switch (state) {
+        case INIT:
+          if (shouldDrain(drainMetadataStore, appRunId)) {
+            /*
+             * Prior to starting the periodic polling, we are doing a one-time 
check on the calling(container main) thread
+             * to see if DrainNotification is present in the metadata store 
for the current deployment.
+             * This check is to deal with the case where a container might 
have re-started during Drain.
+             * If yes, we will set the container to drain mode to prevent it 
from processing any new messages. This will
+             * in-turn guarantee that intermediate Drain control messages from 
the previous incarnation of the container are
+             * processed and there are no duplicate intermediate control 
messages for the same deployment.
+             * */
+            LOG.info("Found DrainNotification message on container start. 
Skipping poll of DrainNotifications.");
+            callback.onDrain();
+          } else {
+            state = State.RUNNING;
+            schedulerService.scheduleAtFixedRate(() -> {
+              if (shouldDrain(drainMetadataStore, appRunId)) {
+                LOG.info("Received Drain Notification for deployment: {}", 
appRunId);
+                drain();
+                callback.onDrain();
+              }
+            }, INITIAL_POLL_DELAY_MILLIS, pollingIntervalMillis, 
TimeUnit.MILLISECONDS);
+            LOG.info("Started DrainMonitor.");
+          }
+          break;
+        case RUNNING:
+        case DRAINING:
+        case STOPPED:
+          LOG.info("Cannot call start() on the DrainMonitor when it is in {} 
state.", state);
+          break;
+      }
+    }
+  }
+
+  /**
+   * Stops the DrainMonitor.
+   * */
+  public void stop() {
+    synchronized (lock) {
+      switch (state) {
+        case RUNNING:
+          schedulerService.shutdownNow();
+          state = State.STOPPED;
+          LOG.info("Stopped DrainMonitor.");
+          break;
+        case INIT:
+        case STOPPED:
+        case DRAINING:
+          LOG.info("Cannot stop DrainMonitor as it is not running. State: 
{}.", state);
+          break;
+      }
+    }
+  }
+
+  /**
+   * Invoked whenever DrainNotification is encountered in the metadata store.
+   * Sets the DrainMonitor to DRAINING state.
+   * */
+  private void drain() {

Review Comment:
   Remove this?



##########
samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainMonitor is intended to monitor the MetadataStore for {@link 
DrainNotification} and invoke
+ * the {@link DrainCallback}.
+ * */
+public class DrainMonitor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DrainMonitor.class);
+
+  /**
+   * Describes the state of the monitor.
+   * */
+  public enum State {
+    /**
+     * Initial state when DrainMonitor is not polling for DrainNotifications.
+     * */
+    INIT,
+    /**
+     * When Drain Monitor is started, it moves from INIT to RUNNING state and 
starts polling
+     * for Drain Notifications.
+     * */
+    RUNNING,
+    /**
+     * Indicates that the DrainMonitor is stopped as Drain was found.
+     * */
+    DRAINING,
+    /**
+     * Indicates that the Drain Monitor is explictly stopped.
+     * */
+    STOPPED
+  }
+
+  private static final int POLLING_INTERVAL_MILLIS = 60_000;
+  private static final int INITIAL_POLL_DELAY_MILLIS = 0;
+
+  private final ScheduledExecutorService schedulerService =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("Samza DrainMonitor Thread-%d")
+              .setDaemon(true)
+              .build());
+  private final String appRunId;
+  private final long pollingIntervalMillis;
+  private final NamespaceAwareCoordinatorStreamStore drainMetadataStore;
+  // Used to guard write access to state.
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  private State state = State.INIT;
+  private DrainCallback callback;
+
+  public DrainMonitor(MetadataStore metadataStore, Config config) {
+    this(metadataStore, config, POLLING_INTERVAL_MILLIS);
+  }
+
+  public DrainMonitor(MetadataStore metadataStore, Config config, long 
pollingIntervalMillis) {
+    Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot 
be null.");
+    Preconditions.checkNotNull(config, "Config parameter cannot be null.");
+    Preconditions.checkArgument(pollingIntervalMillis > 0,
+        String.format("Polling interval specified is %d ms. It should be 
greater than 0.", pollingIntervalMillis));
+    this.drainMetadataStore =
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
DrainUtils.DRAIN_METADATA_STORE_NAMESPACE);
+    ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    this.appRunId = applicationConfig.getRunId();
+    this.pollingIntervalMillis = pollingIntervalMillis;
+  }
+
+  /**
+   * Starts the DrainMonitor.
+   * */
+  public void start() {
+    Preconditions.checkState(callback != null,
+        "Drain Callback needs to be set using registerCallback(callback) prior 
to starting the DrainManager.");
+    synchronized (lock) {
+      switch (state) {
+        case INIT:
+          if (shouldDrain(drainMetadataStore, appRunId)) {
+            /*
+             * Prior to starting the periodic polling, we are doing a one-time 
check on the calling(container main) thread
+             * to see if DrainNotification is present in the metadata store 
for the current deployment.
+             * This check is to deal with the case where a container might 
have re-started during Drain.
+             * If yes, we will set the container to drain mode to prevent it 
from processing any new messages. This will
+             * in-turn guarantee that intermediate Drain control messages from 
the previous incarnation of the container are
+             * processed and there are no duplicate intermediate control 
messages for the same deployment.
+             * */
+            LOG.info("Found DrainNotification message on container start. 
Skipping poll of DrainNotifications.");
+            callback.onDrain();
+          } else {
+            state = State.RUNNING;
+            schedulerService.scheduleAtFixedRate(() -> {
+              if (shouldDrain(drainMetadataStore, appRunId)) {
+                LOG.info("Received Drain Notification for deployment: {}", 
appRunId);
+                drain();

Review Comment:
   Seems we can just call stop() here since drain() actually just does stop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to