This is an automated email from the ASF dual-hosted git repository.

wylee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 3115974  [NEMO-335] DB for storing metrics (#192)
3115974 is described below

commit 3115974570bc431d5f00dfaa2be4823c91fbc634
Author: Won Wook SONG <won...@apache.org>
AuthorDate: Fri Feb 15 22:05:19 2019 +0900

    [NEMO-335] DB for storing metrics (#192)
    
    JIRA: [NEMO-335: DB for storing 
metrics](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-335)
    
    **Major changes:**
    - Brings `sqlite-jdbc` and `postgresql` library for interacting with the 
sqlite db and PostgreSQL.
    - Automatically writes the (0) DAG specifications (# of (root) vertices & 
edges), (1) duration, (2) memory specs (jvm memory and machine memory), (3) the 
execution properties of the IR DAG to the DB (located at the project root with 
the `LICENSE` file), or to a PostgreSQL server upon each execution.
    
    **Minor changes to note:**
    - Overall cleanup and refactoring of metric classes.
    
    **Tests for the changes:**
    - Existing tests work.
    
    **Other comments:**
    - None
    
    Closes #192
---
 .gitignore                                         |   5 +
 .../org/apache/nemo/client/ClientEndpointTest.java |   2 +-
 .../org/apache/nemo/common/PairKeyExtractor.java   |  35 ++-
 .../nemo/common/coder/BytesDecoderFactory.java     |   5 +
 .../nemo/common/coder/BytesEncoderFactory.java     |   5 +
 .../nemo/common/coder/IntDecoderFactory.java       |   5 +
 .../nemo/common/coder/IntEncoderFactory.java       |   5 +
 .../nemo/common/coder/LongDecoderFactory.java      |   5 +
 .../nemo/common/coder/LongEncoderFactory.java      |   5 +
 .../nemo/common/exception/MetricException.java     |  32 ++-
 .../main/java/org/apache/nemo/common/ir/IRDAG.java |  23 +-
 .../ClonedSchedulingProperty.java                  |  30 ++
 common/src/main/resources/log4j.properties         |   4 +-
 .../compiler/frontend/beam/BeamKeyExtractor.java   |  19 ++
 .../frontend/beam/coder/BeamDecoderFactory.java    |  10 +-
 .../frontend/beam/coder/BeamEncoderFactory.java    |  10 +-
 .../compiler/frontend/spark/SparkKeyExtractor.java |  20 ++
 .../frontend/spark/coder/SparkDecoderFactory.java  |   7 +
 .../frontend/spark/coder/SparkEncoderFactory.java  |   7 +
 .../main/java/org/apache/nemo/conf/JobConf.java    |  22 ++
 log4j.properties                                   |   4 +-
 pom.xml                                            |   2 +
 runtime/common/pom.xml                             |   5 +
 .../nemo/runtime/common/metric/JobMetric.java      |  84 +++++-
 .../nemo/runtime/common/metric/MetricUtils.java    | 316 +++++++++++++++++++++
 .../java/org/apache/nemo/driver/NemoDriver.java    |   2 +
 .../apache/nemo/driver/UserApplicationRunner.java  |   1 +
 .../datatransfer/NemoEventDecoderFactory.java      |   7 +
 .../datatransfer/NemoEventEncoderFactory.java      |   7 +
 .../nemo/runtime/executor/MetricFlushTest.java     |   4 +-
 .../executor/datatransfer/DataTransferTest.java    |   3 +
 runtime/master/pom.xml                             |   5 +
 .../nemo/runtime/master/PlanStateManager.java      |   6 +-
 .../apache/nemo/runtime/master/RuntimeMaster.java  |  55 +++-
 .../master/{ => metric}/MetricBroadcaster.java     |   2 +-
 .../master/{ => metric}/MetricManagerMaster.java   |   2 +-
 .../master/{ => metric}/MetricMessageHandler.java  |   2 +-
 .../runtime/master/{ => metric}/MetricStore.java   | 174 +++++++++---
 .../runtime/master/servlet/AllMetricServlet.java   |   2 +-
 .../runtime/master/servlet/JobMetricServlet.java   |   2 +-
 .../runtime/master/servlet/StageMetricServlet.java |   2 +-
 .../runtime/master/servlet/TaskMetricServlet.java  |   2 +-
 .../master/servlet/WebSocketMetricAdapter.java     |   2 +-
 .../nemo/runtime/master/MetricStoreTest.java       |   1 +
 .../nemo/runtime/master/PlanStateManagerTest.java  |   1 +
 .../master/scheduler/BatchSchedulerTest.java       |   2 +-
 .../runtime/master/scheduler/TaskRetryTest.java    |   2 +-
 47 files changed, 829 insertions(+), 124 deletions(-)

diff --git a/.gitignore b/.gitignore
index 82c9630..bd9b5c3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,6 +14,11 @@ build
 docs/
 #
 # ----------------------------------------------------------------------
+# DB Files
+# ----------------------------------------------------------------------
+*.sqlite3
+#
+# ----------------------------------------------------------------------
 # Files generated by OutputService during runtime
 # ----------------------------------------------------------------------
 .test_output*
diff --git 
a/client/src/test/java/org/apache/nemo/client/ClientEndpointTest.java 
b/client/src/test/java/org/apache/nemo/client/ClientEndpointTest.java
index 1a12968..7095f39 100644
--- a/client/src/test/java/org/apache/nemo/client/ClientEndpointTest.java
+++ b/client/src/test/java/org/apache/nemo/client/ClientEndpointTest.java
@@ -21,7 +21,7 @@ package org.apache.nemo.client;
 import org.apache.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.nemo.runtime.common.state.PlanState;
 import org.apache.nemo.runtime.common.state.TaskState;
-import org.apache.nemo.runtime.master.MetricMessageHandler;
+import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
 import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.common.plan.TestPlanGenerator;
 import org.apache.reef.tang.Injector;
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
 b/common/src/main/java/org/apache/nemo/common/PairKeyExtractor.java
similarity index 59%
copy from 
compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
copy to common/src/main/java/org/apache/nemo/common/PairKeyExtractor.java
index f948022..ea1fd09 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
+++ b/common/src/main/java/org/apache/nemo/common/PairKeyExtractor.java
@@ -16,22 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.spark;
 
-import org.apache.nemo.common.KeyExtractor;
-import scala.Tuple2;
+package org.apache.nemo.common;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 /**
- * Extracts the key from a KV element.
- * For non-KV elements, the elements themselves become the key.
+ * A KeyExtractor for Pair class.
  */
-public final class SparkKeyExtractor implements KeyExtractor {
+public final class PairKeyExtractor implements KeyExtractor {
   @Override
   public Object extractKey(final Object element) {
-    if (element instanceof Tuple2) {
-      return ((Tuple2) element)._1;
+    if (element instanceof Pair) {
+      return ((Pair) element).left();
     } else {
-      return element;
+      throw new IllegalStateException(element.toString());
+    }
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
     }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(133, 37).toHashCode();
   }
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java 
b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
index 5bf1e64..14d02b0 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
@@ -54,6 +54,11 @@ public final class BytesDecoderFactory implements 
DecoderFactory<byte[]> {
     return new BytesDecoder(inputStream);
   }
 
+  @Override
+  public String toString() {
+    return "BytesDecoderFactory{}";
+  }
+
   /**
    * BytesDecoder.
    */
diff --git 
a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java 
b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
index 970f20e..6666a7b 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
@@ -51,6 +51,11 @@ public final class BytesEncoderFactory implements 
EncoderFactory<byte[]> {
     return new BytesEncoder(outputStream);
   }
 
+  @Override
+  public String toString() {
+    return "BytesEncoderFactory{}";
+  }
+
   /**
    * BytesEncoder.
    */
diff --git 
a/common/src/main/java/org/apache/nemo/common/coder/IntDecoderFactory.java 
b/common/src/main/java/org/apache/nemo/common/coder/IntDecoderFactory.java
index 1d25deb..41653f2 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/IntDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/IntDecoderFactory.java
@@ -47,6 +47,11 @@ public final class IntDecoderFactory implements 
DecoderFactory<Integer> {
     return new IntDecoder(inputStream);
   }
 
+  @Override
+  public String toString() {
+    return "IntDecoderFactory{}";
+  }
+
   /**
    * IntDecoder.
    */
diff --git 
a/common/src/main/java/org/apache/nemo/common/coder/IntEncoderFactory.java 
b/common/src/main/java/org/apache/nemo/common/coder/IntEncoderFactory.java
index 4747ae9..6be865e 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/IntEncoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/IntEncoderFactory.java
@@ -47,6 +47,11 @@ public final class IntEncoderFactory implements 
EncoderFactory<Integer> {
     return new IntEncoder(outputStream);
   }
 
+  @Override
+  public String toString() {
+    return "IntEncoderFactory{}";
+  }
+
   /**
    * IntEncoder.
    */
diff --git 
a/common/src/main/java/org/apache/nemo/common/coder/LongDecoderFactory.java 
b/common/src/main/java/org/apache/nemo/common/coder/LongDecoderFactory.java
index f212c1f..622ada2 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/LongDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/LongDecoderFactory.java
@@ -49,6 +49,11 @@ public final class LongDecoderFactory implements 
DecoderFactory<Long> {
     return new LongDecoder(inputStream);
   }
 
+  @Override
+  public String toString() {
+    return "LongDecoderFactory{}";
+  }
+
   /**
    * LongDecoder.
    */
diff --git 
a/common/src/main/java/org/apache/nemo/common/coder/LongEncoderFactory.java 
b/common/src/main/java/org/apache/nemo/common/coder/LongEncoderFactory.java
index 13ad573..d7c6f08 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/LongEncoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/LongEncoderFactory.java
@@ -49,6 +49,11 @@ public final class LongEncoderFactory implements 
EncoderFactory<Long> {
     return new LongEncoder(outputStream);
   }
 
+  @Override
+  public String toString() {
+    return "LongEncoderFactory{}";
+  }
+
   /**
    * LongEncoder.
    */
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
 b/common/src/main/java/org/apache/nemo/common/exception/MetricException.java
similarity index 63%
copy from 
compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
copy to 
common/src/main/java/org/apache/nemo/common/exception/MetricException.java
index f948022..93d46fc 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
+++ b/common/src/main/java/org/apache/nemo/common/exception/MetricException.java
@@ -16,22 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.spark;
 
-import org.apache.nemo.common.KeyExtractor;
-import scala.Tuple2;
+package org.apache.nemo.common.exception;
 
 /**
- * Extracts the key from a KV element.
- * For non-KV elements, the elements themselves become the key.
+ * MetricException.
+ * Thrown when the cause is related to the metrics.
  */
-public final class SparkKeyExtractor implements KeyExtractor {
-  @Override
-  public Object extractKey(final Object element) {
-    if (element instanceof Tuple2) {
-      return ((Tuple2) element)._1;
-    } else {
-      return element;
-    }
+public class MetricException extends RuntimeException {
+
+  /**
+   * MetricException.
+   * @param cause the cause of the exception.
+   */
+  public MetricException(final Throwable cause) {
+    super(cause);
+  }
+
+  /**
+   * MetricException.
+   * @param cause the cause of the exception.
+   */
+  public MetricException(final String cause) {
+    super(cause);
   }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java 
b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index 4b3db16..a6cd1c7 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -19,8 +19,7 @@
 package org.apache.nemo.common.ir;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.nemo.common.KeyExtractor;
-import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.PairKeyExtractor;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.dag.DAGInterface;
@@ -89,6 +88,12 @@ public final class IRDAG implements DAGInterface<IRVertex, 
IREdge> {
     return canAdvance;
   }
 
+  public String irDAGSummary() {
+    return "RV" + getRootVertices().size() + "_V" + getVertices().size() + 
"_E" + getVertices().stream()
+      .mapToInt(v -> getIncomingEdgesOf(v).size())
+      .sum();
+  }
+
   ////////////////////////////////////////////////// Methods for reshaping the 
DAG topology.
 
   /**
@@ -250,14 +255,7 @@ public final class IRDAG implements DAGInterface<IRVertex, 
IREdge> {
     
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
     
newEdge.setPropertyPermanently(MessageIdProperty.of(currentMetricCollectionId));
-    final KeyExtractor pairKeyExtractor = (element) -> {
-      if (element instanceof Pair) {
-        return ((Pair) element).left();
-      } else {
-        throw new IllegalStateException(element.toString());
-      }
-    };
-    newEdge.setProperty(KeyExtractorProperty.of(pairKeyExtractor));
+    newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
     newEdge.setPropertyPermanently(encoder);
     newEdge.setPropertyPermanently(decoder);
     return newEdge;
@@ -383,4 +381,9 @@ public final class IRDAG implements DAGInterface<IRVertex, 
IREdge> {
   public List<IRVertex> filterVertices(final Predicate<IRVertex> condition) {
     return modifiedDAG.filterVertices(condition);
   }
+
+  @Override
+  public String toString() {
+    return asJsonNode().toString();
+  }
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
index f4641b4..edce1d7 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java
@@ -18,6 +18,8 @@
  */
 package org.apache.nemo.common.ir.vertex.executionproperty;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 import java.io.Serializable;
@@ -119,5 +121,33 @@ public final class ClonedSchedulingProperty extends 
VertexExecutionProperty<Clon
       sb.append(medianTimeMultiplier);
       return sb.toString();
     }
+
+    @Override
+    public boolean equals(final Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      CloneConf cloneConf = (CloneConf) o;
+
+      return new EqualsBuilder()
+        .append(isUpFrontCloning(), cloneConf.isUpFrontCloning())
+        .append(getFractionToWaitFor(), cloneConf.getFractionToWaitFor())
+        .append(getMedianTimeMultiplier(), cloneConf.getMedianTimeMultiplier())
+        .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder(17, 37)
+        .append(isUpFrontCloning())
+        .append(getFractionToWaitFor())
+        .append(getMedianTimeMultiplier())
+        .toHashCode();
+    }
   }
 }
diff --git a/common/src/main/resources/log4j.properties 
b/common/src/main/resources/log4j.properties
index d6e3198..ba44df6 100644
--- a/common/src/main/resources/log4j.properties
+++ b/common/src/main/resources/log4j.properties
@@ -18,8 +18,8 @@
 #
 log4j.rootLogger=INFO, STDOUT
 
-log4j.logger.org.apache.nemo.runtime.master.MetricManagerMaster=INFO, METRIC
-log4j.additivity.org.apache.nemo.runtime.master.MetricManagerMaster=false
+log4j.logger.org.apache.nemo.runtime.master.metric.MetricManagerMaster=INFO, 
METRIC
+log4j.additivity.org.apache.nemo.runtime.master.metric.MetricManagerMaster=false
 
 log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
 log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
index 8207320..9ea0591 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.compiler.frontend.beam;
 
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nemo.common.KeyExtractor;
 import org.apache.beam.sdk.values.KV;
 
@@ -38,4 +39,22 @@ final class BeamKeyExtractor implements KeyExtractor {
       return element;
     }
   }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(2117, 37).toHashCode();
+  }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
index c1ff6f0..12a20d3 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
@@ -55,6 +55,11 @@ public final class BeamDecoderFactory<T> implements 
DecoderFactory<T> {
     }
   }
 
+  @Override
+  public String toString() {
+    return beamCoder.getClass().getName();
+  }
+
   /**
    * Abstract class for Beam Decoder.
    * @param <T2> the type of element to decode.
@@ -151,9 +156,4 @@ public final class BeamDecoderFactory<T> implements 
DecoderFactory<T> {
       return decodeInternal();
     }
   }
-
-  @Override
-  public String toString() {
-    return beamCoder.toString();
-  }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
index 090c24b..a000ead 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
@@ -55,6 +55,11 @@ public final class BeamEncoderFactory<T> implements 
EncoderFactory<T> {
     }
   }
 
+  @Override
+  public String toString() {
+    return beamCoder.getClass().getName();
+  }
+
   /**
    * Beam Encoder for non void objects.
    *
@@ -110,9 +115,4 @@ public final class BeamEncoderFactory<T> implements 
EncoderFactory<T> {
       outputStream.write(0); // emit 0 instead of null to enable to count 
emitted elements.
     }
   }
-
-  @Override
-  public String toString() {
-    return beamCoder.toString();
-  }
 }
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
index f948022..de05149 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nemo.common.KeyExtractor;
 import scala.Tuple2;
 
@@ -34,4 +35,23 @@ public final class SparkKeyExtractor implements KeyExtractor 
{
       return element;
     }
   }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(2437, 17).toHashCode();
+  }
 }
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java
index ea2b4da..cec71d2 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java
@@ -47,6 +47,13 @@ public final class SparkDecoderFactory<T> implements 
DecoderFactory<T> {
     return new SparkDecoder<>(inputStream, serializer.newInstance());
   }
 
+  @Override
+  public String toString() {
+    return "SparkDecoderFactory{"
+      + "serializer=" + serializer
+      + '}';
+  }
+
   /**
    * SparkDecoder.
    * @param <T2> type of the object to deserialize.
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
index b23c772..a9d6d59 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
@@ -47,6 +47,13 @@ public final class SparkEncoderFactory<T> implements 
EncoderFactory<T> {
     return new SparkEncoder<>(outputStream, serializer.newInstance());
   }
 
+  @Override
+  public String toString() {
+    return "SparkEncoderFactory{"
+      + "serializer=" + serializer
+      + '}';
+  }
+
   /**
    * SparkEncoder.
    * @param <T2> type of the object to serialize.
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java 
b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index 7b31372..c0df1a6 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -76,6 +76,28 @@ public final class JobConf extends 
ConfigurationModuleBuilder {
   public final class GlusterVolumeDirectory implements Name<String> {
   }
 
+  /**
+   * Address pointing to the DB for saving metrics.
+   */
+  @NamedParameter(doc = "DB address", short_name = "db_dir", default_value =
+    
"jdbc:postgresql://nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com:5432/nemo_optimization")
+  public final class DBAddress implements Name<String> {
+  }
+
+  /**
+   * ID for the pointed DB address for saving metrics.
+   */
+  @NamedParameter(doc = "DB ID", short_name = "db_id", default_value = 
"postgres")
+  public final class DBId implements Name<String> {
+  }
+
+  /**
+   * Password for the pointed DB address for saving metrics.
+   */
+  @NamedParameter(doc = "DB Password", short_name = "db_password", 
default_value = "fake_password")
+  public final class DBPasswd implements Name<String> {
+  }
+
   //////////////////////////////// Client-Driver RPC
 
   /**
diff --git a/log4j.properties b/log4j.properties
index d6e3198..ba44df6 100644
--- a/log4j.properties
+++ b/log4j.properties
@@ -18,8 +18,8 @@
 #
 log4j.rootLogger=INFO, STDOUT
 
-log4j.logger.org.apache.nemo.runtime.master.MetricManagerMaster=INFO, METRIC
-log4j.additivity.org.apache.nemo.runtime.master.MetricManagerMaster=false
+log4j.logger.org.apache.nemo.runtime.master.metric.MetricManagerMaster=INFO, 
METRIC
+log4j.additivity.org.apache.nemo.runtime.master.metric.MetricManagerMaster=false
 
 log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
 log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
diff --git a/pom.xml b/pom.xml
index 3eb7e54..7d07f3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,8 @@ under the License.
         <jetty-server.version>9.4.10.v20180503</jetty-server.version>
         <jetty-servlet.version>9.4.10.v20180503</jetty-servlet.version>
         <commons-math.version>3.6.1</commons-math.version>
+        <sqlite-jdbc.version>3.25.2</sqlite-jdbc.version>
+        <postgresql.version>42.2.5</postgresql.version>
         <slf4j.version>1.7.20</slf4j.version>
         <!-- Tests -->
         <mockito.version>2.13.0</mockito.version>
diff --git a/runtime/common/pom.xml b/runtime/common/pom.xml
index 00eed84..5272292 100644
--- a/runtime/common/pom.xml
+++ b/runtime/common/pom.xml
@@ -41,6 +41,11 @@ under the License.
             <artifactId>nemo-conf</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${postgresql.version}</version>
+        </dependency>
         <!-- GRPC -->
         <dependency>
             <groupId>io.grpc</groupId>
diff --git 
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
 
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
index 149d456..5baea63 100644
--- 
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
+++ 
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java
@@ -21,8 +21,14 @@ package org.apache.nemo.runtime.common.metric;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.exception.MetricException;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
 import org.apache.nemo.runtime.common.state.PlanState;
 
 import java.io.IOException;
@@ -33,30 +39,96 @@ import java.util.List;
  * Metric class for Job (or {@link PhysicalPlan}).
  */
 public final class JobMetric implements StateMetric<PlanState.State> {
-  private String id;
-  private List<StateTransitionEvent<PlanState.State>> stateTransitionEvents = 
new ArrayList<>();
+  private final String id;
+  private final List<StateTransitionEvent<PlanState.State>> 
stateTransitionEvents;
+  private String irDagSummary;
+  private Integer inputSize;
+  private String vertexProperties;
+  private String edgeProperties;
+  private JsonNode irDagJson;
   private JsonNode stageDagJson;
 
+  /**
+   * Constructor.
+   * @param physicalPlan physical plan to derive the id from.
+   */
   public JobMetric(final PhysicalPlan physicalPlan) {
-    this.id = physicalPlan.getPlanId();
+    this(physicalPlan.getPlanId());
   }
 
+  /**
+   * Constructor with the designated id.
+   * @param id the id.
+   */
   public JobMetric(final String id) {
     this.id = id;
+    this.stateTransitionEvents = new ArrayList<>();
   }
 
-  @JsonProperty("dag")
+  @JsonProperty("ir-dag")
+  public JsonNode getIRDAG() {
+    return irDagJson;
+  }
+
+  public String getIrDagSummary() {
+    return this.irDagSummary;
+  }
+
+  public Integer getInputSize() {
+    return this.inputSize;
+  }
+
+  public String getVertexProperties() {
+    return this.vertexProperties;
+  }
+
+  public String getEdgeProperties() {
+    return this.edgeProperties;
+  }
+
+  /**
+   * Setter for the IR DAG.
+   * @param irDag the IR DAG.
+   */
+  public void setIRDAG(final IRDAG irDag) {
+    this.irDagSummary = irDag.irDAGSummary();
+    this.inputSize = irDag.getRootVertices().stream()
+      .filter(irVertex -> irVertex instanceof SourceVertex)
+      .mapToInt(irVertex -> {
+        try {
+          return ((SourceVertex) irVertex).getReadables(1).size();
+        } catch (Exception e) {
+          throw new MetricException(e);
+        }
+      })
+      .sum();
+    final Pair<String, String> stringifiedProperties = 
MetricUtils.stringifyIRDAGProperties(irDag);
+    this.vertexProperties = stringifiedProperties.left();
+    this.edgeProperties = stringifiedProperties.right();
+    final ObjectMapper objectMapper = new ObjectMapper();
+    try {
+      this.irDagJson = objectMapper.readTree(irDag.toString());
+    } catch (final IOException e) {
+      throw new MetricException(e);
+    }
+  }
+
+  @JsonProperty("stage-dag")
   public JsonNode getStageDAG() {
     return stageDagJson;
   }
 
-  public void setStageDAG(final DAG dag) {
+  /**
+   * Setter for the stage DAG.
+   * @param dag the stage DAG.
+   */
+  public void setStageDAG(final DAG<Stage, StageEdge> dag) {
     final String dagJson = dag.toString();
     final ObjectMapper objectMapper = new ObjectMapper();
     try {
       this.stageDagJson = objectMapper.readTree(dagJson);
     } catch (final IOException e) {
-      throw new RuntimeException(e);
+      throw new MetricException(e);
     }
   }
 
diff --git 
a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
 
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
new file mode 100644
index 0000000..845d795
--- /dev/null
+++ 
b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.common.metric;
+
+import com.google.common.collect.HashBiMap;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.coder.DecoderFactory;
+import org.apache.nemo.common.coder.EncoderFactory;
+import org.apache.nemo.common.exception.MetricException;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.executionproperty.ExecutionProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.*;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Stream;
+
+/**
+ * Utility class for metrics.
+ */
+public final class MetricUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtils.class.getName());
+
+  private static final CountDownLatch METADATA_LOADED = new CountDownLatch(1);
+  private static final CountDownLatch MUST_UPDATE_EP_KEY_METADATA = new 
CountDownLatch(1);
+  private static final CountDownLatch MUST_UPDATE_EP_METADATA = new 
CountDownLatch(1);
+
+  private static final Pair<HashBiMap<Integer, Class<? extends 
ExecutionProperty>>,
+    HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>> METADATA = 
loadMetaData();
+  // BiMap of (1) INDEX and (2) the Execution Property class
+  private static final HashBiMap<Integer, Class<? extends ExecutionProperty>>
+    EP_KEY_METADATA = METADATA.left();
+  // BiMap of (1) the Execution Property class INDEX and the value INDEX pair 
and (2) the Execution Property.
+  private static final HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>
+    EP_METADATA = METADATA.right();
+
+  private static final int VERTEX = 1;
+  private static final int EDGE = 2;
+
+  public static final String SQLITE_DB_NAME =
+    "jdbc:sqlite:" + MetricUtils.fetchProjectRootPath() + 
"/optimization_db.sqlite3";
+  public static final String POSTGRESQL_METADATA_DB_NAME =
+    
"jdbc:postgresql://nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com:5432/nemo_optimization";
+  private static final String METADATA_TABLE_NAME = "nemo_optimization_meta";
+
+  /**
+   * Private constructor.
+   */
+  private MetricUtils() {
+  }
+
+  /**
+   * Load the BiMaps (lightweight) Metadata from the DB.
+   * @return the loaded BiMaps, or initialized ones.
+   */
+  private static Pair<HashBiMap<Integer, Class<? extends ExecutionProperty>>,
+    HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>> loadMetaData() {
+    deregisterBeamDriver();
+    try (final Connection c = 
DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME,
+      "postgres", "fake_password")) {
+      try (final Statement statement = c.createStatement()) {
+        statement.setQueryTimeout(30);  // set timeout to 30 sec.
+
+        statement.executeUpdate(
+          "CREATE TABLE IF NOT EXISTS " + METADATA_TABLE_NAME
+          + " (key TEXT NOT NULL UNIQUE, data BYTEA NOT NULL);");
+
+        final ResultSet rsl = statement.executeQuery(
+          "SELECT * FROM " + METADATA_TABLE_NAME + " WHERE 
key='EP_KEY_METADATA';");
+        LOG.info("Metadata can be loaded.");
+        if (rsl.next()) {
+          final HashBiMap<Integer, Class<? extends ExecutionProperty>> 
indexEpKeyBiMap =
+            SerializationUtils.deserialize(rsl.getBytes("Data"));
+          rsl.close();
+
+          final ResultSet rsr = statement.executeQuery(
+            "SELECT * FROM " + METADATA_TABLE_NAME + " WHERE 
key='EP_METADATA';");
+          if (rsr.next()) {
+            final HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>> 
indexEpBiMap =
+              SerializationUtils.deserialize(rsr.getBytes("Data"));
+            rsr.close();
+
+            METADATA_LOADED.countDown();
+            LOG.info("Metadata successfully loaded from DB.");
+            return Pair.of(indexEpKeyBiMap, indexEpBiMap);
+          } else {
+            METADATA_LOADED.countDown();
+            LOG.info("No initial metadata for EP.");
+            return Pair.of(indexEpKeyBiMap, HashBiMap.create());
+          }
+        } else {
+          METADATA_LOADED.countDown();
+          LOG.info("No initial metadata.");
+          return Pair.of(HashBiMap.create(), HashBiMap.create());
+        }
+      } catch (Exception e) {
+        LOG.warn("Loading metadata from DB failed: ", e);
+        return Pair.of(HashBiMap.create(), HashBiMap.create());
+      }
+    } catch (Exception e) {
+      LOG.warn("Loading metadata from DB failed : ", e);
+      return Pair.of(HashBiMap.create(), HashBiMap.create());
+    }
+  }
+
+  public static Boolean metaDataLoaded() {
+    return METADATA_LOADED.getCount() == 0;
+  }
+
+  /**
+   * Save the BiMaps to DB if changes are necessary (rarely executed).
+   */
+  private static void updateMetaData() {
+    if (!metaDataLoaded()
+      || (MUST_UPDATE_EP_METADATA.getCount() + 
MUST_UPDATE_EP_KEY_METADATA.getCount() == 2)) {
+      // no need to update
+      LOG.info("Not saving Metadata: metadata loaded: {}, Index-EP data: {}, 
Index-EP Key data: {}",
+        metaDataLoaded(), MUST_UPDATE_EP_METADATA.getCount() == 0, 
MUST_UPDATE_EP_KEY_METADATA.getCount() == 0);
+      return;
+    }
+    LOG.info("Saving Metadata..");
+
+    deregisterBeamDriver();
+    try (final Connection c = 
DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME,
+      "postgres", "fake_password")) {
+      try (final Statement statement = c.createStatement()) {
+        statement.setQueryTimeout(30);  // set timeout to 30 sec.
+
+        if (MUST_UPDATE_EP_KEY_METADATA.getCount() == 0) {
+          try (final PreparedStatement pstmt = c.prepareStatement(
+            "INSERT INTO " + METADATA_TABLE_NAME + " (key, data) "
+              + "VALUES ('EP_KEY_METADATA', ?) ON CONFLICT (key) DO UPDATE SET 
data = excluded.data;")) {
+            pstmt.setBinaryStream(1,
+              new 
ByteArrayInputStream(SerializationUtils.serialize(EP_KEY_METADATA)));
+            pstmt.executeUpdate();
+            LOG.info("EP Key Metadata saved to DB.");
+          }
+        }
+
+        if (MUST_UPDATE_EP_METADATA.getCount() == 0) {
+          try (final PreparedStatement pstmt =
+                 c.prepareStatement("INSERT INTO " + METADATA_TABLE_NAME + 
"(key, data) "
+                     + "VALUES ('EP_METADATA', ?) ON CONFLICT (key) DO UPDATE 
SET data = excluded.data;")) {
+            pstmt.setBinaryStream(1,
+              new 
ByteArrayInputStream(SerializationUtils.serialize(EP_METADATA)));
+            pstmt.executeUpdate();
+            LOG.info("EP Metadata saved to DB.");
+          }
+        }
+      }
+    } catch (SQLException e) {
+      LOG.warn("Saving of Metadata to DB failed: ", e);
+    }
+  }
+
+  /**
+   * Stringify execution properties of an IR DAG.
+   * @param irdag IR DAG to observe.
+   * @return the pair of stringified execution properties. Left is for 
vertices, right is for edges.
+   */
+  static Pair<String, String> stringifyIRDAGProperties(final IRDAG irdag) {
+    final StringBuilder vStringBuilder = new StringBuilder();
+    final StringBuilder eStringBuilder = new StringBuilder();
+
+    irdag.getVertices().forEach(v ->
+      v.getExecutionProperties().forEachProperties(ep ->
+        epFormatter(vStringBuilder, VERTEX, v.getNumericId(), ep)));
+
+    irdag.getVertices().forEach(v ->
+      irdag.getIncomingEdgesOf(v).forEach(e ->
+        e.getExecutionProperties().forEachProperties(ep ->
+          epFormatter(eStringBuilder, EDGE, e.getNumericId(), ep))));
+
+    // Update the metric metadata if new execution property key / values have 
been discovered and updates are required.
+    updateMetaData();
+    return Pair.of(vStringBuilder.toString().trim(), 
eStringBuilder.toString().trim());
+  }
+
+  /**
+   * Formatter for execution properties. It updates the metadata for the 
metrics if new EP key / values are discovered.
+   * @param builder string builder to append the metrics to.
+   * @param idx index specifying whether it's a vertex or an edge. This should 
be one digit.
+   * @param numericId numeric ID of the vertex or the edge.
+   * @param ep the execution property.
+   */
+  private static void epFormatter(final StringBuilder builder, final int idx,
+                                  final Integer numericId, final 
ExecutionProperty<?> ep) {
+    builder.append(idx);
+    builder.append(numericId);
+    builder.append("0");
+    final Integer epKeyIndex = 
EP_KEY_METADATA.inverse().computeIfAbsent(ep.getClass(), epClass -> {
+      // Update the metadata if new EP key has been discovered.
+      LOG.info("New EP Key Index: {} for {}", EP_KEY_METADATA.size() + 1, 
epClass.getSimpleName());
+      MUST_UPDATE_EP_KEY_METADATA.countDown();
+      return EP_KEY_METADATA.size() + 1;
+    });
+    builder.append(epKeyIndex);
+
+    builder.append(":");
+    final Integer epIndex = valueToIndex(epKeyIndex, ep);
+    builder.append(epIndex);
+    builder.append(" ");
+  }
+
+  /**
+   * Helper method to convert Execution Property value objects to an integer 
index.
+   * It updates the metadata for the metrics if new EP values are discovered.
+   * @param epKeyIndex the index of the execution property key.
+   * @param ep the execution property containing the value.
+   * @return the converted value index.
+   */
+  private static Integer valueToIndex(final Integer epKeyIndex, final 
ExecutionProperty<?> ep) {
+    final Object o = ep.getValue();
+
+    if (o instanceof Enum) {
+      return ((Enum) o).ordinal();
+    } else if (o instanceof Integer) {
+      return (int) o;
+    } else if (o instanceof Boolean) {
+      return ((Boolean) o) ? 1 : 0;
+    } else {
+      final ExecutionProperty<?> ep1;
+      if (o instanceof EncoderFactory || o instanceof DecoderFactory) {
+        ep1 = EP_METADATA.values().stream()
+          .filter(ep2 -> ep2.getValue().toString().equals(o.toString()))
+          .findFirst().orElse(null);
+      } else {
+        ep1 = EP_METADATA.values().stream()
+          .filter(ep2 -> ep2.getValue().equals(o))
+          .findFirst().orElse(null);
+      }
+
+      if (ep1 != null) {
+        return EP_METADATA.inverse().get(ep1).right();
+      } else {
+        final Integer valueIndex = 
Math.toIntExact(EP_METADATA.keySet().stream()
+          .filter(pair -> pair.left().equals(epKeyIndex))
+          .count()) + 1;
+        // Update the metadata if new EP value has been discovered.
+        EP_METADATA.put(Pair.of(epKeyIndex, valueIndex), ep);
+        LOG.info("New EP Index: ({}, {}) for {}", epKeyIndex, valueIndex, ep);
+        MUST_UPDATE_EP_METADATA.countDown();
+        return valueIndex;
+      }
+    }
+  }
+
+  /**
+   * Finds the project root path.
+   * @return the project root path.
+   */
+  private static String fetchProjectRootPath() {
+    return recursivelyFindLicense(Paths.get(System.getProperty("user.dir")));
+  }
+
+  /**
+   * Helper method to recursively find the LICENSE file.
+   * @param path the path to search for.
+   * @return the path containing the LICENSE file.
+   */
+  private static String recursivelyFindLicense(final Path path) {
+    try (final Stream stream = Files.find(path, 1, (p, attributes) -> 
p.endsWith("LICENSE"))) {
+      if (stream.count() > 0) {
+        return path.toAbsolutePath().toString();
+      } else {
+        return recursivelyFindLicense(path.getParent());
+      }
+    } catch (IOException e) {
+      throw new MetricException(e);
+    }
+  }
+
+  /**
+   * De-register Beam JDBC driver, which produces inconsistent results.
+   */
+  public static void deregisterBeamDriver() {
+    final String beamDriver = 
"org.apache.beam.sdk.extensions.sql.impl.JdbcDriver";
+    final Enumeration<Driver> drivers = DriverManager.getDrivers();
+    while (drivers.hasMoreElements()) {
+      final Driver d = drivers.nextElement();
+      if (d.getClass().getName().equals(beamDriver)) {
+        try {
+          DriverManager.deregisterDriver(d);
+        } catch (SQLException e) {
+          throw new MetricException(e);
+        }
+        break;
+      }
+    }
+  }
+}
diff --git 
a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java 
b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
index b8d5885..ed2b20e 100644
--- a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
@@ -189,6 +189,8 @@ public final class NemoDriver {
       // send driver notification that user application is done.
       clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
           
.setType(ControlMessage.DriverToClientMessageType.ExecutionDone).build());
+      // flush metrics
+      runtimeMaster.flushMetrics();
     });
   }
 
diff --git 
a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
 
b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
index 1e77052..8c6c9d3 100644
--- 
a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
+++ 
b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
@@ -83,6 +83,7 @@ public final class UserApplicationRunner {
       // Execute!
       final Pair<PlanStateManager, ScheduledExecutorService> executionResult =
           runtimeMaster.execute(physicalPlan, maxScheduleAttempt);
+      runtimeMaster.recordIRDAGMetrics(optimizedDAG, physicalPlan.getPlanId());
 
       // Wait for the job to finish and stop logging
       final PlanStateManager planStateManager = executionResult.left();
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
index 1367e70..2de9fdf 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
@@ -44,6 +44,13 @@ public final class NemoEventDecoderFactory implements 
DecoderFactory {
     return new NemoEventDecoder(valueDecoderFactory.create(inputStream), 
inputStream);
   }
 
+  @Override
+  public String toString() {
+    return "NemoEventDecoderFactory{"
+      + "valueDecoderFactory=" + valueDecoderFactory
+      + '}';
+  }
+
   /**
    * This class decodes receive data into two types.
    * - normal data
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
index c49beda..d9b0836 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
@@ -44,6 +44,13 @@ public final class NemoEventEncoderFactory implements 
EncoderFactory {
     return new NemoEventEncoder(valueEncoderFactory.create(outputStream), 
outputStream);
   }
 
+  @Override
+  public String toString() {
+    return "NemoEventEncoderFactory{"
+      + "valueEncoderFactory=" + valueEncoderFactory
+      + '}';
+  }
+
   /**
    * This encodes normal data and WatermarkWithIndex.
    * @param <T>
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/MetricFlushTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/MetricFlushTest.java
index 221b739..ee4b511 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/MetricFlushTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/MetricFlushTest.java
@@ -22,12 +22,10 @@ import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.*;
 import org.apache.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import org.apache.nemo.runtime.common.message.local.LocalMessageEnvironment;
-import org.apache.nemo.runtime.master.MetricManagerMaster;
+import org.apache.nemo.runtime.master.metric.MetricManagerMaster;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.nemo.runtime.master.scheduler.ExecutorRegistry;
-import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
index f796946..2a3d390 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -50,6 +50,8 @@ import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.data.SerializerManager;
 import org.apache.nemo.runtime.master.*;
 import org.apache.commons.io.FileUtils;
+import org.apache.nemo.runtime.master.metric.MetricManagerMaster;
+import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
 import org.apache.nemo.runtime.master.scheduler.BatchScheduler;
 import org.apache.nemo.runtime.master.scheduler.Scheduler;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
@@ -145,6 +147,7 @@ public final class DataTransferTest {
     injector.bindVolatileInstance(MetricManagerMaster.class, 
mock(MetricManagerMaster.class));
     injector.bindVolatileInstance(MetricMessageHandler.class, 
mock(MetricMessageHandler.class));
     injector.bindVolatileParameter(JobConf.DAGDirectory.class, 
EMPTY_DAG_DIRECTORY);
+    injector.bindVolatileParameter(JobConf.JobId.class, "jobId");
 
     // Necessary for wiring up the message environments
     injector.bindVolatileInstance(Scheduler.class, 
injector.getInstance(BatchScheduler.class));
diff --git a/runtime/master/pom.xml b/runtime/master/pom.xml
index 9110d0b..0a5058a 100644
--- a/runtime/master/pom.xml
+++ b/runtime/master/pom.xml
@@ -77,6 +77,11 @@ under the License.
             <version>${jackson.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.xerial</groupId>
+            <artifactId>sqlite-jdbc</artifactId>
+            <version>${sqlite-jdbc.version}</version>
+        </dependency>
+        <dependency>
             <!--
             This is needed to view the logs when running unit tests.
             See https://dzone.com/articles/how-configure-slf4j-different for 
details.
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
index 5da9466..88f3686 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
@@ -44,6 +44,8 @@ import org.apache.nemo.runtime.common.state.TaskState;
 import org.apache.nemo.runtime.common.metric.JobMetric;
 import org.apache.nemo.runtime.common.metric.StageMetric;
 import org.apache.nemo.runtime.common.metric.TaskMetric;
+import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
+import org.apache.nemo.runtime.master.metric.MetricStore;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.Parameter;
 import org.slf4j.Logger;
@@ -343,7 +345,7 @@ public final class PlanStateManager {
     switch (newTaskState) {
       // INCOMPLETE stage
       case SHOULD_RETRY:
-        final boolean isAPeerAttemptCompleted = 
getPeerAttemptsforTheSameTaskIndex(taskId).stream()
+        final boolean isAPeerAttemptCompleted = 
getPeerAttemptsForTheSameTaskIndex(taskId).stream()
           .anyMatch(state -> state.equals(TaskState.State.COMPLETE));
         if (!isAPeerAttemptCompleted) {
           // None of the peers has completed, hence this stage is incomplete
@@ -563,7 +565,7 @@ public final class PlanStateManager {
       || state.equals(TaskState.State.ON_HOLD);
   }
 
-  private List<TaskState.State> getPeerAttemptsforTheSameTaskIndex(final 
String taskId) {
+  private List<TaskState.State> getPeerAttemptsForTheSameTaskIndex(final 
String taskId) {
     final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
     final int taskIndex = RuntimeIdManager.getIndexFromTaskId(taskId);
     final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId);
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
index e5d8e30..119f23e 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
@@ -22,6 +22,7 @@ import com.google.protobuf.ByteString;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.exception.*;
+import org.apache.nemo.common.ir.IRDAG;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
@@ -29,8 +30,12 @@ import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageContext;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
 import org.apache.nemo.runtime.common.message.MessageListener;
+import org.apache.nemo.runtime.common.metric.JobMetric;
 import org.apache.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.nemo.runtime.common.state.TaskState;
+import org.apache.nemo.runtime.master.metric.MetricManagerMaster;
+import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
+import org.apache.nemo.runtime.master.metric.MetricStore;
 import org.apache.nemo.runtime.master.scheduler.BatchScheduler;
 import org.apache.nemo.runtime.master.servlet.*;
 import org.apache.nemo.runtime.master.resource.ContainerManager;
@@ -94,7 +99,11 @@ public final class RuntimeMaster {
   private final PlanStateManager planStateManager;
   // For converting json data. This is a thread safe.
   private final ObjectMapper objectMapper;
+  private final String jobId;
   private final String dagDirectory;
+  private final String dbAddress;
+  private final String dbId;
+  private final String dbPassword;
   private final Set<IRVertex> irVertices;
   private final AtomicInteger resourceRequestCount;
   private CountDownLatch metricCountDownLatch;
@@ -110,6 +119,10 @@ public final class RuntimeMaster {
                         final ClientRPC clientRPC,
                         final MetricManagerMaster metricManagerMaster,
                         final PlanStateManager planStateManager,
+                        @Parameter(JobConf.JobId.class) final String jobId,
+                        @Parameter(JobConf.DBAddress.class) final String 
dbAddress,
+                        @Parameter(JobConf.DBId.class) final String dbId,
+                        @Parameter(JobConf.DBPasswd.class) final String 
dbPassword,
                         @Parameter(JobConf.DAGDirectory.class) final String 
dagDirectory) {
     // We would like to use a single thread for runtime master operations
     // since the processing logic in master takes a very short amount of time
@@ -135,7 +148,11 @@ public final class RuntimeMaster {
         .setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, 
new MasterControlMessageReceiver());
     this.clientRPC = clientRPC;
     this.metricManagerMaster = metricManagerMaster;
+    this.jobId = jobId;
     this.dagDirectory = dagDirectory;
+    this.dbAddress = dbAddress;
+    this.dbId = dbId;
+    this.dbPassword = dbPassword;
     this.irVertices = new HashSet<>();
     this.resourceRequestCount = new AtomicInteger(0);
     this.objectMapper = new ObjectMapper();
@@ -144,6 +161,10 @@ public final class RuntimeMaster {
     this.planStateManager = planStateManager;
   }
 
+  /**
+   * Start Metric Server.
+   * @return the metric server.
+   */
   private Server startRestMetricServer() {
     final Server server = new Server(REST_SERVER_PORT);
 
@@ -159,12 +180,28 @@ public final class RuntimeMaster {
     try {
       server.start();
     } catch (final Exception e) {
-      throw new RuntimeException("Failed to start REST API server.");
+      throw new MetricException("Failed to start REST API server: " + e);
     }
 
     return server;
   }
 
+  public void recordIRDAGMetrics(final IRDAG irdag, final String planId) {
+    metricStore.getOrCreateMetric(JobMetric.class, planId).setIRDAG(irdag);
+  }
+
+  /**
+   * Flush metrics.
+   */
+  public void flushMetrics() {
+    // send metric flush request to all executors
+    metricManagerMaster.sendMetricFlushRequest();
+
+    metricStore.dumpAllMetricToFile(Paths.get(dagDirectory,
+      "Metric_" + jobId + "_" + System.currentTimeMillis() + 
".json").toString());
+    metricStore.saveOptimizationMetricsToDB(dbAddress, dbId, dbPassword);
+  }
+
   /**
    * Submits the {@link PhysicalPlan} to Runtime.
    * At now, we are assuming that a single job submit multiple plans.
@@ -199,8 +236,6 @@ public final class RuntimeMaster {
     // No need to speculate anymore
     speculativeTaskCloningThread.shutdown();
 
-    // send metric flush request to all executors
-    metricManagerMaster.sendMetricFlushRequest();
     try {
       // wait for metric flush
       if (!metricCountDownLatch.await(METRIC_ARRIVE_TIMEOUT, 
TimeUnit.MILLISECONDS)) {
@@ -225,10 +260,8 @@ public final class RuntimeMaster {
       try {
         metricServer.stop();
       } catch (final Exception e) {
-        throw new RuntimeException("Failed to stop rest api server.");
+        throw new MetricException("Failed to stop rest api server: " + e);
       }
-
-      metricStore.dumpAllMetricToFile(Paths.get(dagDirectory, 
"metric.json").toString());
     });
 
     // Do not shutdown runtimeMasterThread. We need it to clean things up.
@@ -278,9 +311,8 @@ public final class RuntimeMaster {
   public void onContainerAllocated(final String executorId,
                                    final AllocatedEvaluator allocatedEvaluator,
                                    final Configuration executorConfiguration) {
-    runtimeMasterThread.execute(() -> {
-      containerManager.onContainerAllocated(executorId, allocatedEvaluator, 
executorConfiguration);
-    });
+    runtimeMasterThread.execute(() ->
+      containerManager.onContainerAllocated(executorId, allocatedEvaluator, 
executorConfiguration));
   }
 
   /**
@@ -335,9 +367,8 @@ public final class RuntimeMaster {
   public final class MasterControlMessageReceiver implements 
MessageListener<ControlMessage.Message> {
     @Override
     public void onMessage(final ControlMessage.Message message) {
-      runtimeMasterThread.execute(() -> {
-        handleControlMessage(message);
-      });
+      runtimeMasterThread.execute(() ->
+        handleControlMessage(message));
     }
 
     @Override
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricBroadcaster.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricBroadcaster.java
similarity index 98%
rename from 
runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricBroadcaster.java
rename to 
runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricBroadcaster.java
index 1dbd0e6..f2bca8d 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricBroadcaster.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricBroadcaster.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.master;
+package org.apache.nemo.runtime.master.metric;
 
 import org.eclipse.jetty.websocket.api.Session;
 import org.slf4j.Logger;
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricManagerMaster.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricManagerMaster.java
similarity index 98%
rename from 
runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricManagerMaster.java
rename to 
runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricManagerMaster.java
index d6e18c0..6eabcbe 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricManagerMaster.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricManagerMaster.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.master;
+package org.apache.nemo.runtime.master.metric;
 
 import javax.inject.Inject;
 
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricMessageHandler.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricMessageHandler.java
similarity index 96%
rename from 
runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricMessageHandler.java
rename to 
runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricMessageHandler.java
index 23cd748..b0ab1b5 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricMessageHandler.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricMessageHandler.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.master;
+package org.apache.nemo.runtime.master.metric;
 
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricStore.java 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
similarity index 54%
rename from 
runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricStore.java
rename to 
runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
index 42f22aa..c4ad75f 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricStore.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
@@ -16,17 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.master;
+package org.apache.nemo.runtime.master.metric;
 
 import com.fasterxml.jackson.core.JsonEncoding;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nemo.common.exception.MetricException;
 import org.apache.nemo.common.exception.UnsupportedMetricException;
 import org.apache.nemo.runtime.common.metric.*;
+import org.apache.nemo.runtime.common.state.PlanState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.sql.*;
 import java.util.*;
 
 /**
@@ -34,9 +40,10 @@ import java.util.*;
  * All metric classes should be JSON-serializable by {@link ObjectMapper}.
  */
 public final class MetricStore {
-  private final Map<Class, Map<String, Object>> metricMap = new HashMap<>();
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class.getName());
+  private final Map<Class<? extends Metric>, Map<String, Object>> metricMap = 
new HashMap<>();
   // You can add more metrics by adding item to this metricList list.
-  private final Map<String, Class> metricList = new HashMap<>();
+  private final Map<String, Class<? extends Metric>> metricList = new 
HashMap<>();
   /**
    * Private constructor.
    */
@@ -66,7 +73,7 @@ public final class MetricStore {
       throw new NoSuchElementException();
     }
 
-    return metricList.get(className);
+    return (Class<T>) metricList.get(className);
   }
 
   /**
@@ -76,7 +83,7 @@ public final class MetricStore {
    * @param <T> class of metric
    */
   public <T extends Metric> void putMetric(final T metric) {
-    final Class metricClass = metric.getClass();
+    final Class<? extends Metric> metricClass = metric.getClass();
     if (!metricList.values().contains(metricClass)) {
       throw new UnsupportedMetricException(new Throwable("Unsupported 
metric"));
     }
@@ -106,11 +113,7 @@ public final class MetricStore {
    * @return a metric object.
    */
   public <T extends Metric> Map<String, Object> getMetricMap(final Class<T> 
metricClass) {
-    final Map<String, Object> metric = metricMap.computeIfAbsent(metricClass, 
k -> new HashMap<>());
-    if (metric == null) {
-      throw new NoSuchElementException("No metric found");
-    }
-    return metric;
+    return metricMap.computeIfAbsent(metricClass, k -> new HashMap<>());
   }
 
   /**
@@ -122,13 +125,13 @@ public final class MetricStore {
    * @return a metric object. If there was no such metric, newly create one.
    */
   public <T extends Metric> T getOrCreateMetric(final Class<T> metricClass, 
final String id) {
-    T metric =  (T) metricMap.computeIfAbsent(metricClass, k -> new 
HashMap<>()).get(id);
+    T metric = (T) metricMap.computeIfAbsent(metricClass, k -> new 
HashMap<>()).get(id);
     if (metric == null) {
       try {
         metric = metricClass.getConstructor(new 
Class[]{String.class}).newInstance(id);
         putMetric(metric);
       } catch (final Exception e) {
-        throw new RuntimeException(e);
+        throw new MetricException(e);
       }
     }
     return metric;
@@ -158,19 +161,19 @@ public final class MetricStore {
     final ObjectMapper objectMapper = new ObjectMapper();
     final JsonFactory jsonFactory = new JsonFactory();
     final ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, 
JsonEncoding.UTF8);
-    jsonGenerator.setCodec(objectMapper);
 
-    jsonGenerator.writeStartObject();
-    jsonGenerator.writeFieldName(metricClass.getSimpleName());
-    jsonGenerator.writeStartObject();
-    for (final Map.Entry<String, Object> idToMetricEntry : 
getMetricMap(metricClass).entrySet()) {
-      generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, 
objectMapper);
-    }
-    jsonGenerator.writeEndObject();
-    jsonGenerator.writeEndObject();
+    try (final JsonGenerator jsonGenerator = 
jsonFactory.createGenerator(stream, JsonEncoding.UTF8)) {
+      jsonGenerator.setCodec(objectMapper);
 
-    jsonGenerator.close();
+      jsonGenerator.writeStartObject();
+      jsonGenerator.writeFieldName(metricClass.getSimpleName());
+      jsonGenerator.writeStartObject();
+      for (final Map.Entry<String, Object> idToMetricEntry : 
getMetricMap(metricClass).entrySet()) {
+        generatePreprocessedJsonFromMetricEntry(idToMetricEntry, 
jsonGenerator, objectMapper);
+      }
+      jsonGenerator.writeEndObject();
+      jsonGenerator.writeEndObject();
+    }
     return stream.toString();
   }
 
@@ -183,21 +186,22 @@ public final class MetricStore {
     final ObjectMapper objectMapper = new ObjectMapper();
     final JsonFactory jsonFactory = new JsonFactory();
     final ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, 
JsonEncoding.UTF8);
-    jsonGenerator.setCodec(objectMapper);
 
-    jsonGenerator.writeStartObject();
-    for (final Map.Entry<Class, Map<String, Object>> metricMapEntry : 
metricMap.entrySet()) {
-      jsonGenerator.writeFieldName(metricMapEntry.getKey().getSimpleName());
+    try (final JsonGenerator jsonGenerator = 
jsonFactory.createGenerator(stream, JsonEncoding.UTF8)) {
+      jsonGenerator.setCodec(objectMapper);
+
       jsonGenerator.writeStartObject();
-      for (final Map.Entry<String, Object> idToMetricEntry : 
metricMapEntry.getValue().entrySet()) {
-        generatePreprocessedJsonFromMetricEntry(idToMetricEntry, 
jsonGenerator, objectMapper);
+      for (final Map.Entry<Class<? extends Metric>, Map<String, Object>> 
metricMapEntry : metricMap.entrySet()) {
+        jsonGenerator.writeFieldName(metricMapEntry.getKey().getSimpleName());
+        jsonGenerator.writeStartObject();
+        for (final Map.Entry<String, Object> idToMetricEntry : 
metricMapEntry.getValue().entrySet()) {
+          generatePreprocessedJsonFromMetricEntry(idToMetricEntry, 
jsonGenerator, objectMapper);
+        }
+        jsonGenerator.writeEndObject();
       }
       jsonGenerator.writeEndObject();
     }
-    jsonGenerator.writeEndObject();
 
-    jsonGenerator.close();
     return stream.toString();
   }
 
@@ -206,17 +210,104 @@ public final class MetricStore {
    * @param filePath path to dump JSON.
    */
   public void dumpAllMetricToFile(final String filePath) {
-    try {
+    try (final BufferedWriter writer = new BufferedWriter(new 
FileWriter(filePath))) {
       final String jsonDump = dumpAllMetricToJson();
-      final BufferedWriter writer = new BufferedWriter(new 
FileWriter(filePath));
-
       writer.write(jsonDump);
-      writer.close();
     } catch (final IOException e) {
-      throw new RuntimeException(e);
+      throw new MetricException(e);
     }
   }
 
+  /**
+   * Save the job metrics for the optimization to the DB, in the form of 
LibSVM, to a local SQLite DB.
+   * The metrics are as follows: the JCT (duration), and the IR DAG execution 
properties.
+   */
+  private void saveOptimizationMetricsToLocal() {
+    final String[] syntax = {"INTEGER PRIMARY KEY AUTOINCREMENT"};
+
+    try (final Connection c = 
DriverManager.getConnection(MetricUtils.SQLITE_DB_NAME)) {
+      LOG.info("Opened database successfully at {}", 
MetricUtils.SQLITE_DB_NAME);
+      saveOptimizationMetrics(c, syntax);
+    } catch (SQLException e) {
+      LOG.error("Error while saving optimization metrics to SQLite: {}", e);
+    }
+  }
+
+  /**
+   * Save the job metrics for the optimization to the DB, in the form of 
LibSVM, to a remote DB, if applicable.
+   * The metrics are as follows: the JCT (duration), and the IR DAG execution 
properties.
+   */
+  public void saveOptimizationMetricsToDB(final String address, final String 
id, final String passwd) {
+    final String[] syntax = {"SERIAL PRIMARY KEY"};
+
+    if (!MetricUtils.metaDataLoaded()) {
+      saveOptimizationMetricsToLocal();
+      return;
+    }
+
+    try (final Connection c = DriverManager.getConnection(address, id, 
passwd)) {
+      LOG.info("Opened database successfully at {}", 
MetricUtils.POSTGRESQL_METADATA_DB_NAME);
+      MetricUtils.deregisterBeamDriver();
+      saveOptimizationMetrics(c, syntax);
+    } catch (SQLException e) {
+      LOG.error("Error while saving optimization metrics to PostgreSQL: {}", 
e);
+      LOG.info("Saving metrics on the local SQLite DB");
+      saveOptimizationMetricsToLocal();
+    }
+  }
+
+  /**
+   * Save the job metrics for the optimization to the DB, in the form of 
LibSVM.
+   * @param c the connection to the DB.
+   * @param syntax the db-specific syntax.
+   */
+  private void saveOptimizationMetrics(final Connection c, final String[] 
syntax) {
+    try (final Statement statement = c.createStatement()) {
+      statement.setQueryTimeout(30);  // set timeout to 30 sec.
+
+      getMetricMap(JobMetric.class).values().forEach(o -> {
+        final JobMetric jobMetric = (JobMetric) o;
+        final String tableName = jobMetric.getIrDagSummary();
+
+        final long startTime = jobMetric.getStateTransitionEvents().stream()
+          .filter(ste -> ste.getPrevState().equals(PlanState.State.READY)
+            && ste.getNewState().equals(PlanState.State.EXECUTING))
+          .findFirst().orElseThrow(() -> new MetricException("job has never 
started"))
+          .getTimestamp();
+        final long endTime = jobMetric.getStateTransitionEvents().stream()
+          .filter(ste -> ste.getNewState().equals(PlanState.State.COMPLETE))
+          .findFirst().orElseThrow(() -> new MetricException("job has never 
completed"))
+          .getTimestamp();
+        final long duration = endTime - startTime;  // ms
+        final String vertexProperties = jobMetric.getVertexProperties();
+        final String edgeProperties = jobMetric.getEdgeProperties();
+        final Integer inputSize = jobMetric.getInputSize();
+        final long jvmMemSize = Runtime.getRuntime().maxMemory();
+        final long memSize = ((com.sun.management.OperatingSystemMXBean) 
ManagementFactory
+          .getOperatingSystemMXBean()).getTotalPhysicalMemorySize();
+
+        try {
+          statement.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName
+              + " (id " + syntax[0] + ", duration INTEGER NOT NULL, inputsize 
INTEGER NOT NULL, "
+              + "jvmmemsize BIGINT NOT NULL, memsize BIGINT NOT NULL, "
+              + "vertex_properties TEXT NOT NULL, edge_properties TEXT NOT 
NULL, "
+              + "created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);");
+          LOG.info("CREATED TABLE For {} IF NOT PRESENT", tableName);
+
+          statement.executeUpdate("INSERT INTO " + tableName
+            + " (duration, inputsize, jvmmemsize, memsize, vertex_properties, 
edge_properties) "
+            + "VALUES (" + duration + ", " + inputSize + ", "
+            + jvmMemSize + ", " + memSize + ", '"
+            + vertexProperties + "', '" + edgeProperties + "');");
+          LOG.info("Recorded metrics on the table for {}", tableName);
+        } catch (SQLException e) {
+          LOG.error("Error while saving optimization metrics: {}", e);
+        }
+      });
+    } catch (SQLException e) {
+      LOG.error("Error while saving optimization metrics: {}", e);
+    }
+  }
 
   /**
    * Send changed metric data to {@link MetricBroadcaster}, which will 
broadcast it to
@@ -232,10 +323,7 @@ public final class MetricStore {
     final T metric = getMetricWithId(metricClass, id);
     final JsonFactory jsonFactory = new JsonFactory();
     final ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    final JsonGenerator jsonGenerator;
-    try {
-      jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8);
-
+    try (final JsonGenerator jsonGenerator = 
jsonFactory.createGenerator(stream, JsonEncoding.UTF8)) {
       jsonGenerator.setCodec(objectMapper);
 
       jsonGenerator.writeStartObject();
@@ -246,11 +334,9 @@ public final class MetricStore {
       jsonGenerator.writeObject(metric);
       jsonGenerator.writeEndObject();
 
-      jsonGenerator.close();
-
       metricBroadcaster.broadcast(stream.toString());
     } catch (final IOException e) {
-      throw new RuntimeException(e);
+      throw new MetricException(e);
     }
   }
 }
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/AllMetricServlet.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/AllMetricServlet.java
index 66ce890..e09c579 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/AllMetricServlet.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/AllMetricServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.master.servlet;
 
-import org.apache.nemo.runtime.master.MetricStore;
+import org.apache.nemo.runtime.master.metric.MetricStore;
 
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/JobMetricServlet.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/JobMetricServlet.java
index a9666ca..d965489 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/JobMetricServlet.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/JobMetricServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.master.servlet;
 
-import org.apache.nemo.runtime.master.MetricStore;
+import org.apache.nemo.runtime.master.metric.MetricStore;
 import org.apache.nemo.runtime.common.metric.JobMetric;
 
 import javax.servlet.http.HttpServlet;
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/StageMetricServlet.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/StageMetricServlet.java
index 52741f8..5d77d46 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/StageMetricServlet.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/StageMetricServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.master.servlet;
 
-import org.apache.nemo.runtime.master.MetricStore;
+import org.apache.nemo.runtime.master.metric.MetricStore;
 import org.apache.nemo.runtime.common.metric.StageMetric;
 
 import javax.servlet.http.HttpServlet;
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/TaskMetricServlet.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/TaskMetricServlet.java
index e225a3e..f80033c 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/TaskMetricServlet.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/TaskMetricServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.master.servlet;
 
-import org.apache.nemo.runtime.master.MetricStore;
+import org.apache.nemo.runtime.master.metric.MetricStore;
 import org.apache.nemo.runtime.common.metric.TaskMetric;
 
 import javax.servlet.http.HttpServlet;
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/WebSocketMetricAdapter.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/WebSocketMetricAdapter.java
index c1b7907..62821c6 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/WebSocketMetricAdapter.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/WebSocketMetricAdapter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.master.servlet;
 
-import org.apache.nemo.runtime.master.MetricBroadcaster;
+import org.apache.nemo.runtime.master.metric.MetricBroadcaster;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.StatusCode;
 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
diff --git 
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/MetricStoreTest.java
 
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/MetricStoreTest.java
index 932c203..afcaeb8 100644
--- 
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/MetricStoreTest.java
+++ 
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/MetricStoreTest.java
@@ -21,6 +21,7 @@ package org.apache.nemo.runtime.master;
 import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.master.metric.MetricStore;
 import org.junit.Test;
 
 import java.io.IOException;
diff --git 
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/PlanStateManagerTest.java
 
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/PlanStateManagerTest.java
index f2339cf..4d771da 100644
--- 
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/PlanStateManagerTest.java
+++ 
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/PlanStateManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.nemo.runtime.common.state.PlanState;
 import org.apache.nemo.runtime.common.state.StageState;
 import org.apache.nemo.runtime.common.state.TaskState;
 import org.apache.nemo.runtime.common.plan.TestPlanGenerator;
+import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
 import org.apache.reef.tang.Injector;
 import org.junit.Before;
 import org.junit.Test;
diff --git 
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
 
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index 2aeb335..1092954 100644
--- 
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
+++ 
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -25,7 +25,7 @@ import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageSender;
 import org.apache.nemo.runtime.common.plan.*;
 import org.apache.nemo.runtime.master.PlanStateManager;
-import org.apache.nemo.runtime.master.MetricMessageHandler;
+import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
 import org.apache.nemo.runtime.master.BlockManagerMaster;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.nemo.runtime.master.resource.ResourceSpecification;
diff --git 
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
 
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index 2b8adf2..e2e078e 100644
--- 
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ 
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -31,7 +31,7 @@ import org.apache.nemo.runtime.common.state.BlockState;
 import org.apache.nemo.runtime.common.state.PlanState;
 import org.apache.nemo.runtime.common.state.TaskState;
 import org.apache.nemo.runtime.master.BlockManagerMaster;
-import org.apache.nemo.runtime.master.MetricMessageHandler;
+import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
 import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.nemo.runtime.master.resource.ResourceSpecification;

Reply via email to