HDDS-129. Support for ReportManager in Datanode.
Contributed by Nanda Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/baebe4d5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/baebe4d5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/baebe4d5

Branch: refs/heads/HDFS-12943
Commit: baebe4d52bc0e1ee3be062b61efa1de1d19a3bca
Parents: 920d154
Author: Anu Engineer <aengin...@apache.org>
Authored: Tue Jun 5 10:31:42 2018 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Tue Jun 5 10:31:42 2018 -0700

----------------------------------------------------------------------
 .../common/report/ContainerReportPublisher.java |  70 +++++++++
 .../common/report/NodeReportPublisher.java      |  40 +++++
 .../container/common/report/ReportManager.java  | 147 +++++++++++++++++++
 .../common/report/ReportPublisher.java          |  96 ++++++++++++
 .../common/report/ReportPublisherFactory.java   |  71 +++++++++
 .../container/common/report/package-info.java   |  80 ++++++++++
 .../statemachine/DatanodeStateMachine.java      |  18 ++-
 .../common/statemachine/StateContext.java       |  59 ++++++--
 .../states/endpoint/HeartbeatEndpointTask.java  |  24 ++-
 .../common/report/TestReportManager.java        |  52 +++++++
 .../common/report/TestReportPublisher.java      | 106 +++++++++++++
 .../report/TestReportPublisherFactory.java      |  68 +++++++++
 .../container/common/report/package-info.java   |  22 +++
 13 files changed, 834 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
new file mode 100644
index 0000000..ea2b987
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Publishes ContainerReport which will be sent to SCM as part of heartbeat.
+ * ContainerReport consist of the following information about each containers:
+ *   - containerID
+ *   - size
+ *   - used
+ *   - keyCount
+ *   - readCount
+ *   - writeCount
+ *   - readBytes
+ *   - writeBytes
+ *   - finalHash
+ *   - LifeCycleState
+ *
+ */
+public class ContainerReportPublisher extends
+    ReportPublisher<ContainerReportsProto> {
+
+  private Long containerReportInterval = null;
+
+  @Override
+  protected long getReportFrequency() {
+    if (containerReportInterval == null) {
+      containerReportInterval = getConf().getTimeDuration(
+          OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL,
+          OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+    }
+    // Add a random delay (0~30s) on top of the container report
+    // interval (60s) so tha the SCM is overwhelmed by the container reports
+    // sent in sync.
+    return containerReportInterval + getRandomReportDelay();
+  }
+
+  private long getRandomReportDelay() {
+    return RandomUtils.nextLong(0, containerReportInterval);
+  }
+
+  @Override
+  protected ContainerReportsProto getReport() {
+    return ContainerReportsProto.getDefaultInstance();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/NodeReportPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/NodeReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/NodeReportPublisher.java
new file mode 100644
index 0000000..704b1f5
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/NodeReportPublisher.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+
+/**
+ * Publishes NodeReport which will be sent to SCM as part of heartbeat.
+ * NodeReport consist of:
+ *   - NodeIOStats
+ *   - VolumeReports
+ */
+public class NodeReportPublisher extends ReportPublisher<NodeReportProto> {
+
+  @Override
+  protected long getReportFrequency() {
+    return 90000L;
+  }
+
+  @Override
+  protected NodeReportProto getReport() {
+    return NodeReportProto.getDefaultInstance();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
new file mode 100644
index 0000000..c09282e
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * ReportManager is responsible for managing all the {@link ReportPublisher}
+ * and also provides {@link ScheduledExecutorService} to ReportPublisher
+ * which should be used for scheduling the reports.
+ */
+public final class ReportManager {
+
+  private final StateContext context;
+  private final List<ReportPublisher> publishers;
+  private final ScheduledExecutorService executorService;
+
+  /**
+   * Construction of {@link ReportManager} should be done via
+   * {@link ReportManager.Builder}.
+   *
+   * @param context StateContext which holds the report
+   * @param publishers List of publishers which generates report
+   */
+  private ReportManager(StateContext context,
+                        List<ReportPublisher> publishers) {
+    this.context = context;
+    this.publishers = publishers;
+    this.executorService = HadoopExecutors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Datanode ReportManager Thread - %d").build());
+  }
+
+  /**
+   * Initializes ReportManager, also initializes all the configured
+   * report publishers.
+   */
+  public void init() {
+    for (ReportPublisher publisher : publishers) {
+      publisher.init(context, executorService);
+    }
+  }
+
+  /**
+   * Shutdown the ReportManager.
+   */
+  public void shutdown() {
+    executorService.shutdown();
+  }
+
+  /**
+   * Returns new {@link ReportManager.Builder} which can be used to construct.
+   * {@link ReportManager}
+   * @param conf  - Conf
+   * @return builder - Builder.
+   */
+  public static Builder newBuilder(Configuration conf) {
+    return new Builder(conf);
+  }
+
+  /**
+   * Builder to construct {@link ReportManager}.
+   */
+  public static final class Builder {
+
+    private StateContext stateContext;
+    private List<ReportPublisher> reportPublishers;
+    private ReportPublisherFactory publisherFactory;
+
+
+    private Builder(Configuration conf) {
+      this.reportPublishers = new ArrayList<>();
+      this.publisherFactory = new ReportPublisherFactory(conf);
+    }
+
+    /**
+     * Sets the {@link StateContext}.
+     *
+     * @param context StateContext
+
+     * @return ReportManager.Builder
+     */
+    public Builder setStateContext(StateContext context) {
+      stateContext = context;
+      return this;
+    }
+
+    /**
+     * Adds publisher for the corresponding report.
+     *
+     * @param report report for which publisher needs to be added
+     *
+     * @return ReportManager.Builder
+     */
+    public Builder addPublisherFor(Class<? extends GeneratedMessage> report) {
+      reportPublishers.add(publisherFactory.getPublisherFor(report));
+      return this;
+    }
+
+    /**
+     * Adds new ReportPublisher to the ReportManager.
+     *
+     * @param publisher ReportPublisher
+     *
+     * @return ReportManager.Builder
+     */
+    public Builder addPublisher(ReportPublisher publisher) {
+      reportPublishers.add(publisher);
+      return this;
+    }
+
+    /**
+     * Build and returns ReportManager.
+     *
+     * @return {@link ReportManager}
+     */
+    public ReportManager build() {
+      Preconditions.checkNotNull(stateContext);
+      return new ReportManager(stateContext, reportPublishers);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
new file mode 100644
index 0000000..4ff47a0
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine.DatanodeStates;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Abstract class responsible for scheduling the reports based on the
+ * configured interval. All the ReportPublishers should extend this class.
+ */
+public abstract class ReportPublisher<T extends GeneratedMessage>
+    implements Configurable, Runnable {
+
+  private Configuration config;
+  private StateContext context;
+  private ScheduledExecutorService executor;
+
+  /**
+   * Initializes ReportPublisher with stateContext and executorService.
+   *
+   * @param stateContext Datanode state context
+   * @param executorService ScheduledExecutorService to schedule reports
+   */
+  public void init(StateContext stateContext,
+                   ScheduledExecutorService executorService) {
+    this.context = stateContext;
+    this.executor = executorService;
+    this.executor.schedule(this,
+        getReportFrequency(), TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    config = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return config;
+  }
+
+  @Override
+  public void run() {
+    publishReport();
+    if (!executor.isShutdown() ||
+        !(context.getState() == DatanodeStates.SHUTDOWN)) {
+      executor.schedule(this,
+          getReportFrequency(), TimeUnit.MILLISECONDS);
+    }
+  }
+
+  /**
+   * Generates and publishes the report to datanode state context.
+   */
+  private void publishReport() {
+    context.addReport(getReport());
+  }
+
+  /**
+   * Returns the frequency in which this particular report has to be scheduled.
+   *
+   * @return report interval in milliseconds
+   */
+  protected abstract long getReportFrequency();
+
+  /**
+   * Generate and returns the report which has to be sent as part of heartbeat.
+   *
+   * @return datanode report
+   */
+  protected abstract T getReport();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
new file mode 100644
index 0000000..dc246d9
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Factory class to construct {@link ReportPublisher} for a report.
+ */
+public class ReportPublisherFactory {
+
+  private final Configuration conf;
+  private final Map<Class<? extends GeneratedMessage>,
+      Class<? extends ReportPublisher>> report2publisher;
+
+  /**
+   * Constructs {@link ReportPublisherFactory} instance.
+   *
+   * @param conf Configuration to be passed to the {@link ReportPublisher}
+   */
+  public ReportPublisherFactory(Configuration conf) {
+    this.conf = conf;
+    this.report2publisher = new HashMap<>();
+
+    report2publisher.put(NodeReportProto.class, NodeReportPublisher.class);
+    report2publisher.put(ContainerReportsProto.class,
+        ContainerReportPublisher.class);
+  }
+
+  /**
+   * Returns the ReportPublisher for the corresponding report.
+   *
+   * @param report report
+   *
+   * @return report publisher
+   */
+  public ReportPublisher getPublisherFor(
+      Class<? extends GeneratedMessage> report) {
+    Class<? extends ReportPublisher> publisherClass =
+        report2publisher.get(report);
+    if (publisherClass == null) {
+      throw new RuntimeException("No publisher found for report " + report);
+    }
+    return ReflectionUtils.newInstance(publisherClass, conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/package-info.java
new file mode 100644
index 0000000..404b37a
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/package-info.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.report;
+/**
+ * Datanode Reports: As part of heartbeat, datanode has to share its current
+ * state with SCM. The state of datanode is split into multiple reports which
+ * are sent along with heartbeat in a configured frequency.
+ *
+ * This package contains code which is responsible for sending reports from
+ * datanode to SCM.
+ *
+ * ReportPublisherFactory: Given a report this constructs corresponding
+ * {@link org.apache.hadoop.ozone.container.common.report.ReportPublisher}.
+ *
+ * ReportManager: Manages and initializes all the available ReportPublishers.
+ *
+ * ReportPublisher: Abstract class responsible for scheduling the reports
+ * based on the configured interval. All the ReportPublishers should extend
+ * {@link org.apache.hadoop.ozone.container.common.report.ReportPublisher}
+ *
+ * How to add new report:
+ *
+ * 1. Create a new ReportPublisher class which extends
+ * {@link org.apache.hadoop.ozone.container.common.report.ReportPublisher}.
+ *
+ * 2. Add a mapping Report to ReportPublisher entry in ReportPublisherFactory.
+ *
+ * 3. In DatanodeStateMachine add the report to ReportManager instance.
+ *
+ *
+ *
+ * Datanode Reports State Diagram:
+ *
+ *   DatanodeStateMachine  ReportManager  ReportPublisher           SCM
+ *            |                  |              |                    |
+ *            |                  |              |                    |
+ *            |    construct     |              |                    |
+ *            |----------------->|              |                    |
+ *            |                  |              |                    |
+ *            |     init         |              |                    |
+ *            |----------------->|              |                    |
+ *            |                  |     init     |                    |
+ *            |                  |------------->|                    |
+ *            |                  |              |                    |
+ *   +--------+------------------+--------------+--------------------+------+
+ *   |loop    |                  |              |                    |      |
+ *   |        |                  |   publish    |                    |      |
+ *   |        |<-----------------+--------------|                    |      |
+ *   |        |                  |   report     |                    |      |
+ *   |        |                  |              |                    |      |
+ *   |        |                  |              |                    |      |
+ *   |        |   heartbeat(rpc) |              |                    |      |
+ *   |        |------------------+--------------+------------------->|      |
+ *   |        |                  |              |                    |      |
+ *   |        |                  |              |                    |      |
+ *   +--------+------------------+--------------+--------------------+------+
+ *            |                  |              |                    |
+ *            |                  |              |                    |
+ *            |                  |              |                    |
+ *            |     shutdown     |              |                    |
+ *            |----------------->|              |                    |
+ *            |                  |              |                    |
+ *            |                  |              |                    |
+ *            -                  -              -                    -
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index d0a4217..cb4319d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -21,7 +21,13 @@ import 
com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.ozone.container.common.report.ReportManager;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .CloseContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
@@ -56,6 +62,7 @@ public class DatanodeStateMachine implements Closeable {
   private final OzoneContainer container;
   private DatanodeDetails datanodeDetails;
   private final CommandDispatcher commandDispatcher;
+  private final ReportManager reportManager;
   private long commandsHandled;
   private AtomicLong nextHB;
   private Thread stateMachineThread = null;
@@ -92,6 +99,12 @@ public class DatanodeStateMachine implements Closeable {
         .setContainer(container)
         .setContext(context)
         .build();
+
+    reportManager = ReportManager.newBuilder(conf)
+        .setStateContext(context)
+        .addPublisherFor(NodeReportProto.class)
+        .addPublisherFor(ContainerReportsProto.class)
+        .build();
   }
 
   /**
@@ -125,12 +138,12 @@ public class DatanodeStateMachine implements Closeable {
     long now = 0;
 
     container.start();
+    reportManager.init();
     initCommandHandlerThread(conf);
     while (context.getState() != DatanodeStates.SHUTDOWN) {
       try {
         LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
         nextHB.set(Time.monotonicNow() + heartbeatFrequency);
-        context.setNodeReport(container.getNodeReport());
         context.execute(executorService, heartbeatFrequency,
             TimeUnit.MILLISECONDS);
         now = Time.monotonicNow();
@@ -307,6 +320,7 @@ public class DatanodeStateMachine implements Closeable {
   public synchronized void stopDaemon() {
     try {
       context.setState(DatanodeStates.SHUTDOWN);
+      reportManager.shutdown();
       this.close();
       LOG.info("Ozone container server stopped.");
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 4e3c610..98eb7a0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -16,9 +16,8 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine;
 
+import com.google.protobuf.GeneratedMessage;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .InitDatanodeState;
@@ -28,7 +27,9 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -51,8 +52,8 @@ public class StateContext {
   private final DatanodeStateMachine parent;
   private final AtomicLong stateExecutionCount;
   private final Configuration conf;
+  private final Queue<GeneratedMessage> reports;
   private DatanodeStateMachine.DatanodeStates state;
-  private NodeReportProto dnReport;
 
   /**
    * Constructs a StateContext.
@@ -67,9 +68,9 @@ public class StateContext {
     this.state = state;
     this.parent = parent;
     commandQueue = new LinkedList<>();
+    reports = new LinkedList<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
-    dnReport = NodeReportProto.getDefaultInstance();
   }
 
   /**
@@ -141,19 +142,53 @@ public class StateContext {
   }
 
   /**
-   * Returns the node report of the datanode state context.
-   * @return the node report.
+   * Adds the report to report queue.
+   *
+   * @param report report to be added
+   */
+  public void addReport(GeneratedMessage report) {
+    synchronized (reports) {
+      reports.add(report);
+    }
+  }
+
+  /**
+   * Returns the next report, or null if the report queue is empty.
+   *
+   * @return report
+   */
+  public GeneratedMessage getNextReport() {
+    synchronized (reports) {
+      return reports.poll();
+    }
+  }
+
+  /**
+   * Returns all the available reports from the report queue, or empty list if
+   * the queue is empty.
+   *
+   * @return List<reports>
    */
-  public NodeReportProto getNodeReport() {
-    return dnReport;
+  public List<GeneratedMessage> getAllAvailableReports() {
+    return getReports(Integer.MAX_VALUE);
   }
 
   /**
-   * Sets the storage location report of the datanode state context.
-   * @param nodeReport node report
+   * Returns available reports from the report queue with a max limit on
+   * list size, or empty list if the queue is empty.
+   *
+   * @return List<reports>
    */
-  public void setNodeReport(NodeReportProto nodeReport) {
-    this.dnReport = nodeReport;
+  public List<GeneratedMessage> getReports(int maxLimit) {
+    List<GeneratedMessage> results = new ArrayList<>();
+    synchronized (reports) {
+      GeneratedMessage report = reports.poll();
+      while(results.size() < maxLimit && report != null) {
+        results.add(report);
+        report = reports.poll();
+      }
+    }
+    return results;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 337cdfb..3986faf 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.common.states.endpoint;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.GeneratedMessage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
@@ -99,13 +100,13 @@ public class HeartbeatEndpointTask
     try {
       Preconditions.checkState(this.datanodeDetailsProto != null);
 
-      SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
-          .setDatanodeDetails(datanodeDetailsProto)
-          .setNodeReport(context.getNodeReport())
-          .build();
+      SCMHeartbeatRequestProto.Builder requestBuilder =
+          SCMHeartbeatRequestProto.newBuilder()
+              .setDatanodeDetails(datanodeDetailsProto);
+      addReports(requestBuilder);
 
       SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
-          .sendHeartbeat(request);
+          .sendHeartbeat(requestBuilder.build());
       processResponse(reponse, datanodeDetailsProto);
       rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
       rpcEndpoint.zeroMissedCount();
@@ -118,6 +119,19 @@ public class HeartbeatEndpointTask
   }
 
   /**
+   * Adds all the available reports to heartbeat.
+   *
+   * @param requestBuilder builder to which the report has to be added.
+   */
+  private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
+    for (GeneratedMessage report : context.getAllAvailableReports()) {
+      requestBuilder.setField(
+          SCMHeartbeatRequestProto.getDescriptor().findFieldByName(
+              report.getDescriptorForType().getName()), report);
+    }
+  }
+
+  /**
    * Returns a builder class for HeartbeatEndpointTask task.
    * @return   Builder.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java
new file mode 100644
index 0000000..aae388d
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test cases to test {@link ReportManager}.
+ */
+public class TestReportManager {
+
+  @Test
+  public void testReportManagerInit() {
+    Configuration conf = new OzoneConfiguration();
+    StateContext dummyContext = Mockito.mock(StateContext.class);
+    ReportPublisher dummyPublisher = Mockito.mock(ReportPublisher.class);
+    ReportManager.Builder builder = ReportManager.newBuilder(conf);
+    builder.setStateContext(dummyContext);
+    builder.addPublisher(dummyPublisher);
+    ReportManager reportManager = builder.build();
+    reportManager.init();
+    verify(dummyPublisher, times(1)).init(eq(dummyContext),
+        any(ScheduledExecutorService.class));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
new file mode 100644
index 0000000..067c562
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test cases to test {@link ReportPublisher}.
+ */
+public class TestReportPublisher {
+
+  /**
+   * Dummy report publisher for testing.
+   */
+  private class DummyReportPublisher extends ReportPublisher {
+
+    private final long frequency;
+    private int getReportCount = 0;
+
+    DummyReportPublisher(long frequency) {
+      this.frequency = frequency;
+    }
+
+    @Override
+    protected long getReportFrequency() {
+      return frequency;
+    }
+
+    @Override
+    protected GeneratedMessage getReport() {
+      getReportCount++;
+      return null;
+    }
+  }
+
+  @Test
+  public void testReportPublisherInit() {
+    ReportPublisher publisher = new DummyReportPublisher(0);
+    StateContext dummyContext = Mockito.mock(StateContext.class);
+    ScheduledExecutorService dummyExecutorService = Mockito.mock(
+        ScheduledExecutorService.class);
+    publisher.init(dummyContext, dummyExecutorService);
+    verify(dummyExecutorService, times(1)).schedule(publisher,
+        0, TimeUnit.MILLISECONDS);
+  }
+
+  @Test
+  public void testScheduledReport() throws InterruptedException {
+    ReportPublisher publisher = new DummyReportPublisher(100);
+    StateContext dummyContext = Mockito.mock(StateContext.class);
+    ScheduledExecutorService executorService = HadoopExecutors
+        .newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("Unit test ReportManager Thread - %d").build());
+    publisher.init(dummyContext, executorService);
+    Thread.sleep(150);
+    Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
+    Thread.sleep(150);
+    Assert.assertEquals(2, ((DummyReportPublisher)publisher).getReportCount);
+    executorService.shutdown();
+  }
+
+  @Test
+  public void testPublishReport() throws InterruptedException {
+    ReportPublisher publisher = new DummyReportPublisher(100);
+    StateContext dummyContext = Mockito.mock(StateContext.class);
+    ScheduledExecutorService executorService = HadoopExecutors
+        .newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("Unit test ReportManager Thread - %d").build());
+    publisher.init(dummyContext, executorService);
+    Thread.sleep(150);
+    executorService.shutdown();
+    Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
+    verify(dummyContext, times(1)).addReport(null);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
new file mode 100644
index 0000000..f8c5fe5
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test cases to test ReportPublisherFactory.
+ */
+public class TestReportPublisherFactory {
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testGetContainerReportPublisher() {
+    Configuration conf = new OzoneConfiguration();
+    ReportPublisherFactory factory = new ReportPublisherFactory(conf);
+    ReportPublisher publisher = factory
+        .getPublisherFor(ContainerReportsProto.class);
+    Assert.assertEquals(ContainerReportPublisher.class, publisher.getClass());
+    Assert.assertEquals(conf, publisher.getConf());
+  }
+
+  @Test
+  public void testGetNodeReportPublisher() {
+    Configuration conf = new OzoneConfiguration();
+    ReportPublisherFactory factory = new ReportPublisherFactory(conf);
+    ReportPublisher publisher = factory
+        .getPublisherFor(NodeReportProto.class);
+    Assert.assertEquals(NodeReportPublisher.class, publisher.getClass());
+    Assert.assertEquals(conf, publisher.getConf());
+  }
+
+  @Test
+  public void testInvalidReportPublisher() {
+    Configuration conf = new OzoneConfiguration();
+    ReportPublisherFactory factory = new ReportPublisherFactory(conf);
+    exception.expect(RuntimeException.class);
+    exception.expectMessage("No publisher found for report");
+    factory.getPublisherFor(HddsProtos.DatanodeDetailsProto.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/baebe4d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/package-info.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/package-info.java
new file mode 100644
index 0000000..37615bc
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.report;
+/**
+ * This package has test cases for all the report publishers which generates
+ * reports that are sent to SCM via heartbeat.
+ */
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to