ajothomas commented on code in PR #1605:
URL: https://github.com/apache/samza/pull/1605#discussion_r876381344


##########
samza-core/src/main/java/org/apache/samza/drain/DrainManager.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ .* with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.samza.SamzaException;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainManager reads and writes {@link DrainNotification} to the provided 
{@link MetadataStore}.
+ *
+ * It uses {@link NamespaceAwareCoordinatorStreamStore} for namespace-aware 
read/write/delete operations on the
+ * metadata store.
+ * */
+public class DrainManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DrainManager.class);
+
+  private static final Integer VERSION = 1;
+  // namespace for the underlying metadata store
+  private static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" 
+ VERSION;
+
+  private final NamespaceAwareCoordinatorStreamStore drainMetadataStore;
+  private final ObjectMapper objectMapper = 
DrainNotificationObjectMapper.getObjectMapper();
+
+  private boolean running = false;
+
+  public DrainManager(MetadataStore metadataStore) {
+    Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
+    this.drainMetadataStore = new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
DRAIN_METADATA_STORE_NAMESPACE);
+  }
+
+  /**
+   * Perform startup operations.
+   */
+  public void start() {
+    if (running) {
+      LOG.warn("DrainManager already started.");
+    } else {
+      drainMetadataStore.init();
+      running = true;
+      LOG.info("Started DrainManager.");
+    }
+  }
+
+  /**
+   * Perform teardown operations.
+   */
+  public void stop() {
+    if (running) {
+      drainMetadataStore.close();
+      running = false;
+      LOG.info("Stopped DrainManager.");
+    } else {
+      LOG.warn("DrainManager already stopped.");
+    }
+  }
+
+  /**
+   * Writes a {@link DrainNotification} to the underlying metastore. This 
method should be used by external controllers
+   * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+   *
+   * @return uuid generated for the request
+   */
+  public UUID writeDrainNotification(String deploymentId) {

Review Comment:
   Will add a uitl and a shell script.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to