ajothomas commented on code in PR #1605:
URL: https://github.com/apache/samza/pull/1605#discussion_r896089498


##########
samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ .* with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.UUID;
+import joptsimple.internal.Strings;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainUtils provides utility methods for managing {@link DrainNotification} 
in the the provided {@link MetadataStore}.
+ * */
+public class DrainUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(DrainUtils.class);
+  private static final Integer VERSION = 1;
+  // namespace for the underlying metadata store
+  public static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" 
+ VERSION;
+
+  private DrainUtils() {
+  }
+
+  /**
+   * Writes a {@link DrainNotification} to the underlying metastore. This 
method should be used by external controllers
+   * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+   * @param metadataStore Metadata store to write drain notification to.
+   * @param deploymentId deploymentId for the DrainNotification
+   *
+   * @return generated uuid for the DrainNotification
+   */
+  public static UUID writeDrainNotification(MetadataStore metadataStore, 
String deploymentId) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(deploymentId), 
"deploymentId should be non-null.");
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+    final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+    final UUID uuid = UUID.randomUUID();
+    final DrainNotification message = new DrainNotification(uuid, 
deploymentId);
+    try {
+      drainMetadataStore.put(message.getUuid().toString(), 
objectMapper.writeValueAsBytes(message));
+      drainMetadataStore.flush();
+      LOG.info("DrainNotification with id {} written to metadata-store for the 
deployment ID {}", uuid, deploymentId);
+    } catch (Exception ex) {
+      throw new SamzaException(
+          String.format("DrainNotification might have been not written to 
metastore %s", message), ex);
+    }
+    return uuid;
+  }
+
+  /**
+   * Cleans up DrainNotifications for the current deployment from the 
underlying metadata store.
+   * The current deploymentId is extracted from the config.
+   *
+   * @param metadataStore underlying metadata store
+   * @param config Config for the job. Used to extract the deploymentId of the 
job.
+   * */
+  public static void cleanup(MetadataStore metadataStore, Config config) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    Preconditions.checkNotNull(config, "Config parameter cannot be null.");
+
+    final ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    final String deploymentId = applicationConfig.getRunId();
+    final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+
+    if (DrainMonitor.shouldDrain(drainMetadataStore, deploymentId)) {
+      drainMetadataStore.all()
+          .values()
+          .stream()
+          .map(bytes -> {
+            try {
+              return objectMapper.readValue(bytes, DrainNotification.class);
+            } catch (IOException e) {
+              LOG.error("Unable to deserialize DrainNotification from the 
metadata store", e);
+              throw new SamzaException(e);
+            }
+          })
+          .filter(notification -> 
deploymentId.equals(notification.getDeploymentId()))
+          .forEach(notification -> 
drainMetadataStore.delete(notification.getUuid().toString()));
+
+      drainMetadataStore.flush();
+      LOG.info("Successfully cleaned up DrainNotifications from the 
metadata-store for the current deployment {}", deploymentId);
+    } else {
+      LOG.info("No DrainNotification found in the metadata-store for the 
current deployment {}. No need to cleanup.",
+          deploymentId);
+    }
+  }
+
+  /**
+   * Cleans up all DrainNotifications irrespective of the deploymentId.
+   * */
+  public static void cleanupAll(MetadataStore metadataStore) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =

Review Comment:
   Added extra logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to