This is an automated email from the ASF dual-hosted git repository. wylee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push: new 3115974 [NEMO-335] DB for storing metrics (#192) 3115974 is described below commit 3115974570bc431d5f00dfaa2be4823c91fbc634 Author: Won Wook SONG <won...@apache.org> AuthorDate: Fri Feb 15 22:05:19 2019 +0900 [NEMO-335] DB for storing metrics (#192) JIRA: [NEMO-335: DB for storing metrics](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-335) **Major changes:** - Brings `sqlite-jdbc` and `postgresql` library for interacting with the sqlite db and PostgreSQL. - Automatically writes the (0) DAG specifications (# of (root) vertices & edges), (1) duration, (2) memory specs (jvm memory and machine memory), (3) the execution properties of the IR DAG to the DB (located at the project root with the `LICENSE` file), or to a PostgreSQL server upon each execution. **Minor changes to note:** - Overall cleanup and refactoring of metric classes. **Tests for the changes:** - Existing tests work. **Other comments:** - None Closes #192 --- .gitignore | 5 + .../org/apache/nemo/client/ClientEndpointTest.java | 2 +- .../org/apache/nemo/common/PairKeyExtractor.java | 35 ++- .../nemo/common/coder/BytesDecoderFactory.java | 5 + .../nemo/common/coder/BytesEncoderFactory.java | 5 + .../nemo/common/coder/IntDecoderFactory.java | 5 + .../nemo/common/coder/IntEncoderFactory.java | 5 + .../nemo/common/coder/LongDecoderFactory.java | 5 + .../nemo/common/coder/LongEncoderFactory.java | 5 + .../nemo/common/exception/MetricException.java | 32 ++- .../main/java/org/apache/nemo/common/ir/IRDAG.java | 23 +- .../ClonedSchedulingProperty.java | 30 ++ common/src/main/resources/log4j.properties | 4 +- .../compiler/frontend/beam/BeamKeyExtractor.java | 19 ++ .../frontend/beam/coder/BeamDecoderFactory.java | 10 +- .../frontend/beam/coder/BeamEncoderFactory.java | 10 +- .../compiler/frontend/spark/SparkKeyExtractor.java | 20 ++ .../frontend/spark/coder/SparkDecoderFactory.java | 7 + .../frontend/spark/coder/SparkEncoderFactory.java | 7 + .../main/java/org/apache/nemo/conf/JobConf.java | 22 ++ log4j.properties | 4 +- pom.xml | 2 + runtime/common/pom.xml | 5 + .../nemo/runtime/common/metric/JobMetric.java | 84 +++++- .../nemo/runtime/common/metric/MetricUtils.java | 316 +++++++++++++++++++++ .../java/org/apache/nemo/driver/NemoDriver.java | 2 + .../apache/nemo/driver/UserApplicationRunner.java | 1 + .../datatransfer/NemoEventDecoderFactory.java | 7 + .../datatransfer/NemoEventEncoderFactory.java | 7 + .../nemo/runtime/executor/MetricFlushTest.java | 4 +- .../executor/datatransfer/DataTransferTest.java | 3 + runtime/master/pom.xml | 5 + .../nemo/runtime/master/PlanStateManager.java | 6 +- .../apache/nemo/runtime/master/RuntimeMaster.java | 55 +++- .../master/{ => metric}/MetricBroadcaster.java | 2 +- .../master/{ => metric}/MetricManagerMaster.java | 2 +- .../master/{ => metric}/MetricMessageHandler.java | 2 +- .../runtime/master/{ => metric}/MetricStore.java | 174 +++++++++--- .../runtime/master/servlet/AllMetricServlet.java | 2 +- .../runtime/master/servlet/JobMetricServlet.java | 2 +- .../runtime/master/servlet/StageMetricServlet.java | 2 +- .../runtime/master/servlet/TaskMetricServlet.java | 2 +- .../master/servlet/WebSocketMetricAdapter.java | 2 +- .../nemo/runtime/master/MetricStoreTest.java | 1 + .../nemo/runtime/master/PlanStateManagerTest.java | 1 + .../master/scheduler/BatchSchedulerTest.java | 2 +- .../runtime/master/scheduler/TaskRetryTest.java | 2 +- 47 files changed, 829 insertions(+), 124 deletions(-) diff --git a/.gitignore b/.gitignore index 82c9630..bd9b5c3 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,11 @@ build docs/ # # ---------------------------------------------------------------------- +# DB Files +# ---------------------------------------------------------------------- +*.sqlite3 +# +# ---------------------------------------------------------------------- # Files generated by OutputService during runtime # ---------------------------------------------------------------------- .test_output* diff --git a/client/src/test/java/org/apache/nemo/client/ClientEndpointTest.java b/client/src/test/java/org/apache/nemo/client/ClientEndpointTest.java index 1a12968..7095f39 100644 --- a/client/src/test/java/org/apache/nemo/client/ClientEndpointTest.java +++ b/client/src/test/java/org/apache/nemo/client/ClientEndpointTest.java @@ -21,7 +21,7 @@ package org.apache.nemo.client; import org.apache.nemo.runtime.common.plan.PhysicalPlan; import org.apache.nemo.runtime.common.state.PlanState; import org.apache.nemo.runtime.common.state.TaskState; -import org.apache.nemo.runtime.master.MetricMessageHandler; +import org.apache.nemo.runtime.master.metric.MetricMessageHandler; import org.apache.nemo.runtime.master.PlanStateManager; import org.apache.nemo.runtime.common.plan.TestPlanGenerator; import org.apache.reef.tang.Injector; diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java b/common/src/main/java/org/apache/nemo/common/PairKeyExtractor.java similarity index 59% copy from compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java copy to common/src/main/java/org/apache/nemo/common/PairKeyExtractor.java index f948022..ea1fd09 100644 --- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java +++ b/common/src/main/java/org/apache/nemo/common/PairKeyExtractor.java @@ -16,22 +16,39 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.compiler.frontend.spark; -import org.apache.nemo.common.KeyExtractor; -import scala.Tuple2; +package org.apache.nemo.common; + +import org.apache.commons.lang3.builder.HashCodeBuilder; /** - * Extracts the key from a KV element. - * For non-KV elements, the elements themselves become the key. + * A KeyExtractor for Pair class. */ -public final class SparkKeyExtractor implements KeyExtractor { +public final class PairKeyExtractor implements KeyExtractor { @Override public Object extractKey(final Object element) { - if (element instanceof Tuple2) { - return ((Tuple2) element)._1; + if (element instanceof Pair) { + return ((Pair) element).left(); } else { - return element; + throw new IllegalStateException(element.toString()); + } + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return new HashCodeBuilder(133, 37).toHashCode(); } } diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java index 5bf1e64..14d02b0 100644 --- a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java +++ b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java @@ -54,6 +54,11 @@ public final class BytesDecoderFactory implements DecoderFactory<byte[]> { return new BytesDecoder(inputStream); } + @Override + public String toString() { + return "BytesDecoderFactory{}"; + } + /** * BytesDecoder. */ diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java index 970f20e..6666a7b 100644 --- a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java +++ b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java @@ -51,6 +51,11 @@ public final class BytesEncoderFactory implements EncoderFactory<byte[]> { return new BytesEncoder(outputStream); } + @Override + public String toString() { + return "BytesEncoderFactory{}"; + } + /** * BytesEncoder. */ diff --git a/common/src/main/java/org/apache/nemo/common/coder/IntDecoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/IntDecoderFactory.java index 1d25deb..41653f2 100644 --- a/common/src/main/java/org/apache/nemo/common/coder/IntDecoderFactory.java +++ b/common/src/main/java/org/apache/nemo/common/coder/IntDecoderFactory.java @@ -47,6 +47,11 @@ public final class IntDecoderFactory implements DecoderFactory<Integer> { return new IntDecoder(inputStream); } + @Override + public String toString() { + return "IntDecoderFactory{}"; + } + /** * IntDecoder. */ diff --git a/common/src/main/java/org/apache/nemo/common/coder/IntEncoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/IntEncoderFactory.java index 4747ae9..6be865e 100644 --- a/common/src/main/java/org/apache/nemo/common/coder/IntEncoderFactory.java +++ b/common/src/main/java/org/apache/nemo/common/coder/IntEncoderFactory.java @@ -47,6 +47,11 @@ public final class IntEncoderFactory implements EncoderFactory<Integer> { return new IntEncoder(outputStream); } + @Override + public String toString() { + return "IntEncoderFactory{}"; + } + /** * IntEncoder. */ diff --git a/common/src/main/java/org/apache/nemo/common/coder/LongDecoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/LongDecoderFactory.java index f212c1f..622ada2 100644 --- a/common/src/main/java/org/apache/nemo/common/coder/LongDecoderFactory.java +++ b/common/src/main/java/org/apache/nemo/common/coder/LongDecoderFactory.java @@ -49,6 +49,11 @@ public final class LongDecoderFactory implements DecoderFactory<Long> { return new LongDecoder(inputStream); } + @Override + public String toString() { + return "LongDecoderFactory{}"; + } + /** * LongDecoder. */ diff --git a/common/src/main/java/org/apache/nemo/common/coder/LongEncoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/LongEncoderFactory.java index 13ad573..d7c6f08 100644 --- a/common/src/main/java/org/apache/nemo/common/coder/LongEncoderFactory.java +++ b/common/src/main/java/org/apache/nemo/common/coder/LongEncoderFactory.java @@ -49,6 +49,11 @@ public final class LongEncoderFactory implements EncoderFactory<Long> { return new LongEncoder(outputStream); } + @Override + public String toString() { + return "LongEncoderFactory{}"; + } + /** * LongEncoder. */ diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java b/common/src/main/java/org/apache/nemo/common/exception/MetricException.java similarity index 63% copy from compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java copy to common/src/main/java/org/apache/nemo/common/exception/MetricException.java index f948022..93d46fc 100644 --- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java +++ b/common/src/main/java/org/apache/nemo/common/exception/MetricException.java @@ -16,22 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.compiler.frontend.spark; -import org.apache.nemo.common.KeyExtractor; -import scala.Tuple2; +package org.apache.nemo.common.exception; /** - * Extracts the key from a KV element. - * For non-KV elements, the elements themselves become the key. + * MetricException. + * Thrown when the cause is related to the metrics. */ -public final class SparkKeyExtractor implements KeyExtractor { - @Override - public Object extractKey(final Object element) { - if (element instanceof Tuple2) { - return ((Tuple2) element)._1; - } else { - return element; - } +public class MetricException extends RuntimeException { + + /** + * MetricException. + * @param cause the cause of the exception. + */ + public MetricException(final Throwable cause) { + super(cause); + } + + /** + * MetricException. + * @param cause the cause of the exception. + */ + public MetricException(final String cause) { + super(cause); } } diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java index 4b3db16..a6cd1c7 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java +++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java @@ -19,8 +19,7 @@ package org.apache.nemo.common.ir; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.nemo.common.KeyExtractor; -import org.apache.nemo.common.Pair; +import org.apache.nemo.common.PairKeyExtractor; import org.apache.nemo.common.dag.DAG; import org.apache.nemo.common.dag.DAGBuilder; import org.apache.nemo.common.dag.DAGInterface; @@ -89,6 +88,12 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> { return canAdvance; } + public String irDAGSummary() { + return "RV" + getRootVertices().size() + "_V" + getVertices().size() + "_E" + getVertices().stream() + .mapToInt(v -> getIncomingEdgesOf(v).size()) + .sum(); + } + ////////////////////////////////////////////////// Methods for reshaping the DAG topology. /** @@ -250,14 +255,7 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> { newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep)); newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push)); newEdge.setPropertyPermanently(MessageIdProperty.of(currentMetricCollectionId)); - final KeyExtractor pairKeyExtractor = (element) -> { - if (element instanceof Pair) { - return ((Pair) element).left(); - } else { - throw new IllegalStateException(element.toString()); - } - }; - newEdge.setProperty(KeyExtractorProperty.of(pairKeyExtractor)); + newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor())); newEdge.setPropertyPermanently(encoder); newEdge.setPropertyPermanently(decoder); return newEdge; @@ -383,4 +381,9 @@ public final class IRDAG implements DAGInterface<IRVertex, IREdge> { public List<IRVertex> filterVertices(final Predicate<IRVertex> condition) { return modifiedDAG.filterVertices(condition); } + + @Override + public String toString() { + return asJsonNode().toString(); + } } diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java index f4641b4..edce1d7 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ClonedSchedulingProperty.java @@ -18,6 +18,8 @@ */ package org.apache.nemo.common.ir.vertex.executionproperty; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty; import java.io.Serializable; @@ -119,5 +121,33 @@ public final class ClonedSchedulingProperty extends VertexExecutionProperty<Clon sb.append(medianTimeMultiplier); return sb.toString(); } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + CloneConf cloneConf = (CloneConf) o; + + return new EqualsBuilder() + .append(isUpFrontCloning(), cloneConf.isUpFrontCloning()) + .append(getFractionToWaitFor(), cloneConf.getFractionToWaitFor()) + .append(getMedianTimeMultiplier(), cloneConf.getMedianTimeMultiplier()) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(isUpFrontCloning()) + .append(getFractionToWaitFor()) + .append(getMedianTimeMultiplier()) + .toHashCode(); + } } } diff --git a/common/src/main/resources/log4j.properties b/common/src/main/resources/log4j.properties index d6e3198..ba44df6 100644 --- a/common/src/main/resources/log4j.properties +++ b/common/src/main/resources/log4j.properties @@ -18,8 +18,8 @@ # log4j.rootLogger=INFO, STDOUT -log4j.logger.org.apache.nemo.runtime.master.MetricManagerMaster=INFO, METRIC -log4j.additivity.org.apache.nemo.runtime.master.MetricManagerMaster=false +log4j.logger.org.apache.nemo.runtime.master.metric.MetricManagerMaster=INFO, METRIC +log4j.additivity.org.apache.nemo.runtime.master.metric.MetricManagerMaster=false log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java index 8207320..9ea0591 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java @@ -19,6 +19,7 @@ package org.apache.nemo.compiler.frontend.beam; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nemo.common.KeyExtractor; import org.apache.beam.sdk.values.KV; @@ -38,4 +39,22 @@ final class BeamKeyExtractor implements KeyExtractor { return element; } } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return new HashCodeBuilder(2117, 37).toHashCode(); + } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java index c1ff6f0..12a20d3 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java @@ -55,6 +55,11 @@ public final class BeamDecoderFactory<T> implements DecoderFactory<T> { } } + @Override + public String toString() { + return beamCoder.getClass().getName(); + } + /** * Abstract class for Beam Decoder. * @param <T2> the type of element to decode. @@ -151,9 +156,4 @@ public final class BeamDecoderFactory<T> implements DecoderFactory<T> { return decodeInternal(); } } - - @Override - public String toString() { - return beamCoder.toString(); - } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java index 090c24b..a000ead 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java @@ -55,6 +55,11 @@ public final class BeamEncoderFactory<T> implements EncoderFactory<T> { } } + @Override + public String toString() { + return beamCoder.getClass().getName(); + } + /** * Beam Encoder for non void objects. * @@ -110,9 +115,4 @@ public final class BeamEncoderFactory<T> implements EncoderFactory<T> { outputStream.write(0); // emit 0 instead of null to enable to count emitted elements. } } - - @Override - public String toString() { - return beamCoder.toString(); - } } diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java index f948022..de05149 100644 --- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java +++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/SparkKeyExtractor.java @@ -18,6 +18,7 @@ */ package org.apache.nemo.compiler.frontend.spark; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nemo.common.KeyExtractor; import scala.Tuple2; @@ -34,4 +35,23 @@ public final class SparkKeyExtractor implements KeyExtractor { return element; } } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + + return false; + } + + return true; + } + + @Override + public int hashCode() { + return new HashCodeBuilder(2437, 17).toHashCode(); + } } diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java index ea2b4da..cec71d2 100644 --- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java +++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java @@ -47,6 +47,13 @@ public final class SparkDecoderFactory<T> implements DecoderFactory<T> { return new SparkDecoder<>(inputStream, serializer.newInstance()); } + @Override + public String toString() { + return "SparkDecoderFactory{" + + "serializer=" + serializer + + '}'; + } + /** * SparkDecoder. * @param <T2> type of the object to deserialize. diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java index b23c772..a9d6d59 100644 --- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java +++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java @@ -47,6 +47,13 @@ public final class SparkEncoderFactory<T> implements EncoderFactory<T> { return new SparkEncoder<>(outputStream, serializer.newInstance()); } + @Override + public String toString() { + return "SparkEncoderFactory{" + + "serializer=" + serializer + + '}'; + } + /** * SparkEncoder. * @param <T2> type of the object to serialize. diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java index 7b31372..c0df1a6 100644 --- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java +++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java @@ -76,6 +76,28 @@ public final class JobConf extends ConfigurationModuleBuilder { public final class GlusterVolumeDirectory implements Name<String> { } + /** + * Address pointing to the DB for saving metrics. + */ + @NamedParameter(doc = "DB address", short_name = "db_dir", default_value = + "jdbc:postgresql://nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com:5432/nemo_optimization") + public final class DBAddress implements Name<String> { + } + + /** + * ID for the pointed DB address for saving metrics. + */ + @NamedParameter(doc = "DB ID", short_name = "db_id", default_value = "postgres") + public final class DBId implements Name<String> { + } + + /** + * Password for the pointed DB address for saving metrics. + */ + @NamedParameter(doc = "DB Password", short_name = "db_password", default_value = "fake_password") + public final class DBPasswd implements Name<String> { + } + //////////////////////////////// Client-Driver RPC /** diff --git a/log4j.properties b/log4j.properties index d6e3198..ba44df6 100644 --- a/log4j.properties +++ b/log4j.properties @@ -18,8 +18,8 @@ # log4j.rootLogger=INFO, STDOUT -log4j.logger.org.apache.nemo.runtime.master.MetricManagerMaster=INFO, METRIC -log4j.additivity.org.apache.nemo.runtime.master.MetricManagerMaster=false +log4j.logger.org.apache.nemo.runtime.master.metric.MetricManagerMaster=INFO, METRIC +log4j.additivity.org.apache.nemo.runtime.master.metric.MetricManagerMaster=false log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout diff --git a/pom.xml b/pom.xml index 3eb7e54..7d07f3d 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,8 @@ under the License. <jetty-server.version>9.4.10.v20180503</jetty-server.version> <jetty-servlet.version>9.4.10.v20180503</jetty-servlet.version> <commons-math.version>3.6.1</commons-math.version> + <sqlite-jdbc.version>3.25.2</sqlite-jdbc.version> + <postgresql.version>42.2.5</postgresql.version> <slf4j.version>1.7.20</slf4j.version> <!-- Tests --> <mockito.version>2.13.0</mockito.version> diff --git a/runtime/common/pom.xml b/runtime/common/pom.xml index 00eed84..5272292 100644 --- a/runtime/common/pom.xml +++ b/runtime/common/pom.xml @@ -41,6 +41,11 @@ under the License. <artifactId>nemo-conf</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>${postgresql.version}</version> + </dependency> <!-- GRPC --> <dependency> <groupId>io.grpc</groupId> diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java index 149d456..5baea63 100644 --- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java +++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/JobMetric.java @@ -21,8 +21,14 @@ package org.apache.nemo.runtime.common.metric; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nemo.common.Pair; import org.apache.nemo.common.dag.DAG; +import org.apache.nemo.common.exception.MetricException; +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.common.ir.vertex.SourceVertex; import org.apache.nemo.runtime.common.plan.PhysicalPlan; +import org.apache.nemo.runtime.common.plan.Stage; +import org.apache.nemo.runtime.common.plan.StageEdge; import org.apache.nemo.runtime.common.state.PlanState; import java.io.IOException; @@ -33,30 +39,96 @@ import java.util.List; * Metric class for Job (or {@link PhysicalPlan}). */ public final class JobMetric implements StateMetric<PlanState.State> { - private String id; - private List<StateTransitionEvent<PlanState.State>> stateTransitionEvents = new ArrayList<>(); + private final String id; + private final List<StateTransitionEvent<PlanState.State>> stateTransitionEvents; + private String irDagSummary; + private Integer inputSize; + private String vertexProperties; + private String edgeProperties; + private JsonNode irDagJson; private JsonNode stageDagJson; + /** + * Constructor. + * @param physicalPlan physical plan to derive the id from. + */ public JobMetric(final PhysicalPlan physicalPlan) { - this.id = physicalPlan.getPlanId(); + this(physicalPlan.getPlanId()); } + /** + * Constructor with the designated id. + * @param id the id. + */ public JobMetric(final String id) { this.id = id; + this.stateTransitionEvents = new ArrayList<>(); } - @JsonProperty("dag") + @JsonProperty("ir-dag") + public JsonNode getIRDAG() { + return irDagJson; + } + + public String getIrDagSummary() { + return this.irDagSummary; + } + + public Integer getInputSize() { + return this.inputSize; + } + + public String getVertexProperties() { + return this.vertexProperties; + } + + public String getEdgeProperties() { + return this.edgeProperties; + } + + /** + * Setter for the IR DAG. + * @param irDag the IR DAG. + */ + public void setIRDAG(final IRDAG irDag) { + this.irDagSummary = irDag.irDAGSummary(); + this.inputSize = irDag.getRootVertices().stream() + .filter(irVertex -> irVertex instanceof SourceVertex) + .mapToInt(irVertex -> { + try { + return ((SourceVertex) irVertex).getReadables(1).size(); + } catch (Exception e) { + throw new MetricException(e); + } + }) + .sum(); + final Pair<String, String> stringifiedProperties = MetricUtils.stringifyIRDAGProperties(irDag); + this.vertexProperties = stringifiedProperties.left(); + this.edgeProperties = stringifiedProperties.right(); + final ObjectMapper objectMapper = new ObjectMapper(); + try { + this.irDagJson = objectMapper.readTree(irDag.toString()); + } catch (final IOException e) { + throw new MetricException(e); + } + } + + @JsonProperty("stage-dag") public JsonNode getStageDAG() { return stageDagJson; } - public void setStageDAG(final DAG dag) { + /** + * Setter for the stage DAG. + * @param dag the stage DAG. + */ + public void setStageDAG(final DAG<Stage, StageEdge> dag) { final String dagJson = dag.toString(); final ObjectMapper objectMapper = new ObjectMapper(); try { this.stageDagJson = objectMapper.readTree(dagJson); } catch (final IOException e) { - throw new RuntimeException(e); + throw new MetricException(e); } } diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java new file mode 100644 index 0000000..845d795 --- /dev/null +++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.runtime.common.metric; + +import com.google.common.collect.HashBiMap; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.nemo.common.Pair; +import org.apache.nemo.common.coder.DecoderFactory; +import org.apache.nemo.common.coder.EncoderFactory; +import org.apache.nemo.common.exception.MetricException; +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.common.ir.executionproperty.ExecutionProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.*; +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Stream; + +/** + * Utility class for metrics. + */ +public final class MetricUtils { + private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class.getName()); + + private static final CountDownLatch METADATA_LOADED = new CountDownLatch(1); + private static final CountDownLatch MUST_UPDATE_EP_KEY_METADATA = new CountDownLatch(1); + private static final CountDownLatch MUST_UPDATE_EP_METADATA = new CountDownLatch(1); + + private static final Pair<HashBiMap<Integer, Class<? extends ExecutionProperty>>, + HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>> METADATA = loadMetaData(); + // BiMap of (1) INDEX and (2) the Execution Property class + private static final HashBiMap<Integer, Class<? extends ExecutionProperty>> + EP_KEY_METADATA = METADATA.left(); + // BiMap of (1) the Execution Property class INDEX and the value INDEX pair and (2) the Execution Property. + private static final HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>> + EP_METADATA = METADATA.right(); + + private static final int VERTEX = 1; + private static final int EDGE = 2; + + public static final String SQLITE_DB_NAME = + "jdbc:sqlite:" + MetricUtils.fetchProjectRootPath() + "/optimization_db.sqlite3"; + public static final String POSTGRESQL_METADATA_DB_NAME = + "jdbc:postgresql://nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com:5432/nemo_optimization"; + private static final String METADATA_TABLE_NAME = "nemo_optimization_meta"; + + /** + * Private constructor. + */ + private MetricUtils() { + } + + /** + * Load the BiMaps (lightweight) Metadata from the DB. + * @return the loaded BiMaps, or initialized ones. + */ + private static Pair<HashBiMap<Integer, Class<? extends ExecutionProperty>>, + HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>> loadMetaData() { + deregisterBeamDriver(); + try (final Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME, + "postgres", "fake_password")) { + try (final Statement statement = c.createStatement()) { + statement.setQueryTimeout(30); // set timeout to 30 sec. + + statement.executeUpdate( + "CREATE TABLE IF NOT EXISTS " + METADATA_TABLE_NAME + + " (key TEXT NOT NULL UNIQUE, data BYTEA NOT NULL);"); + + final ResultSet rsl = statement.executeQuery( + "SELECT * FROM " + METADATA_TABLE_NAME + " WHERE key='EP_KEY_METADATA';"); + LOG.info("Metadata can be loaded."); + if (rsl.next()) { + final HashBiMap<Integer, Class<? extends ExecutionProperty>> indexEpKeyBiMap = + SerializationUtils.deserialize(rsl.getBytes("Data")); + rsl.close(); + + final ResultSet rsr = statement.executeQuery( + "SELECT * FROM " + METADATA_TABLE_NAME + " WHERE key='EP_METADATA';"); + if (rsr.next()) { + final HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>> indexEpBiMap = + SerializationUtils.deserialize(rsr.getBytes("Data")); + rsr.close(); + + METADATA_LOADED.countDown(); + LOG.info("Metadata successfully loaded from DB."); + return Pair.of(indexEpKeyBiMap, indexEpBiMap); + } else { + METADATA_LOADED.countDown(); + LOG.info("No initial metadata for EP."); + return Pair.of(indexEpKeyBiMap, HashBiMap.create()); + } + } else { + METADATA_LOADED.countDown(); + LOG.info("No initial metadata."); + return Pair.of(HashBiMap.create(), HashBiMap.create()); + } + } catch (Exception e) { + LOG.warn("Loading metadata from DB failed: ", e); + return Pair.of(HashBiMap.create(), HashBiMap.create()); + } + } catch (Exception e) { + LOG.warn("Loading metadata from DB failed : ", e); + return Pair.of(HashBiMap.create(), HashBiMap.create()); + } + } + + public static Boolean metaDataLoaded() { + return METADATA_LOADED.getCount() == 0; + } + + /** + * Save the BiMaps to DB if changes are necessary (rarely executed). + */ + private static void updateMetaData() { + if (!metaDataLoaded() + || (MUST_UPDATE_EP_METADATA.getCount() + MUST_UPDATE_EP_KEY_METADATA.getCount() == 2)) { + // no need to update + LOG.info("Not saving Metadata: metadata loaded: {}, Index-EP data: {}, Index-EP Key data: {}", + metaDataLoaded(), MUST_UPDATE_EP_METADATA.getCount() == 0, MUST_UPDATE_EP_KEY_METADATA.getCount() == 0); + return; + } + LOG.info("Saving Metadata.."); + + deregisterBeamDriver(); + try (final Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME, + "postgres", "fake_password")) { + try (final Statement statement = c.createStatement()) { + statement.setQueryTimeout(30); // set timeout to 30 sec. + + if (MUST_UPDATE_EP_KEY_METADATA.getCount() == 0) { + try (final PreparedStatement pstmt = c.prepareStatement( + "INSERT INTO " + METADATA_TABLE_NAME + " (key, data) " + + "VALUES ('EP_KEY_METADATA', ?) ON CONFLICT (key) DO UPDATE SET data = excluded.data;")) { + pstmt.setBinaryStream(1, + new ByteArrayInputStream(SerializationUtils.serialize(EP_KEY_METADATA))); + pstmt.executeUpdate(); + LOG.info("EP Key Metadata saved to DB."); + } + } + + if (MUST_UPDATE_EP_METADATA.getCount() == 0) { + try (final PreparedStatement pstmt = + c.prepareStatement("INSERT INTO " + METADATA_TABLE_NAME + "(key, data) " + + "VALUES ('EP_METADATA', ?) ON CONFLICT (key) DO UPDATE SET data = excluded.data;")) { + pstmt.setBinaryStream(1, + new ByteArrayInputStream(SerializationUtils.serialize(EP_METADATA))); + pstmt.executeUpdate(); + LOG.info("EP Metadata saved to DB."); + } + } + } + } catch (SQLException e) { + LOG.warn("Saving of Metadata to DB failed: ", e); + } + } + + /** + * Stringify execution properties of an IR DAG. + * @param irdag IR DAG to observe. + * @return the pair of stringified execution properties. Left is for vertices, right is for edges. + */ + static Pair<String, String> stringifyIRDAGProperties(final IRDAG irdag) { + final StringBuilder vStringBuilder = new StringBuilder(); + final StringBuilder eStringBuilder = new StringBuilder(); + + irdag.getVertices().forEach(v -> + v.getExecutionProperties().forEachProperties(ep -> + epFormatter(vStringBuilder, VERTEX, v.getNumericId(), ep))); + + irdag.getVertices().forEach(v -> + irdag.getIncomingEdgesOf(v).forEach(e -> + e.getExecutionProperties().forEachProperties(ep -> + epFormatter(eStringBuilder, EDGE, e.getNumericId(), ep)))); + + // Update the metric metadata if new execution property key / values have been discovered and updates are required. + updateMetaData(); + return Pair.of(vStringBuilder.toString().trim(), eStringBuilder.toString().trim()); + } + + /** + * Formatter for execution properties. It updates the metadata for the metrics if new EP key / values are discovered. + * @param builder string builder to append the metrics to. + * @param idx index specifying whether it's a vertex or an edge. This should be one digit. + * @param numericId numeric ID of the vertex or the edge. + * @param ep the execution property. + */ + private static void epFormatter(final StringBuilder builder, final int idx, + final Integer numericId, final ExecutionProperty<?> ep) { + builder.append(idx); + builder.append(numericId); + builder.append("0"); + final Integer epKeyIndex = EP_KEY_METADATA.inverse().computeIfAbsent(ep.getClass(), epClass -> { + // Update the metadata if new EP key has been discovered. + LOG.info("New EP Key Index: {} for {}", EP_KEY_METADATA.size() + 1, epClass.getSimpleName()); + MUST_UPDATE_EP_KEY_METADATA.countDown(); + return EP_KEY_METADATA.size() + 1; + }); + builder.append(epKeyIndex); + + builder.append(":"); + final Integer epIndex = valueToIndex(epKeyIndex, ep); + builder.append(epIndex); + builder.append(" "); + } + + /** + * Helper method to convert Execution Property value objects to an integer index. + * It updates the metadata for the metrics if new EP values are discovered. + * @param epKeyIndex the index of the execution property key. + * @param ep the execution property containing the value. + * @return the converted value index. + */ + private static Integer valueToIndex(final Integer epKeyIndex, final ExecutionProperty<?> ep) { + final Object o = ep.getValue(); + + if (o instanceof Enum) { + return ((Enum) o).ordinal(); + } else if (o instanceof Integer) { + return (int) o; + } else if (o instanceof Boolean) { + return ((Boolean) o) ? 1 : 0; + } else { + final ExecutionProperty<?> ep1; + if (o instanceof EncoderFactory || o instanceof DecoderFactory) { + ep1 = EP_METADATA.values().stream() + .filter(ep2 -> ep2.getValue().toString().equals(o.toString())) + .findFirst().orElse(null); + } else { + ep1 = EP_METADATA.values().stream() + .filter(ep2 -> ep2.getValue().equals(o)) + .findFirst().orElse(null); + } + + if (ep1 != null) { + return EP_METADATA.inverse().get(ep1).right(); + } else { + final Integer valueIndex = Math.toIntExact(EP_METADATA.keySet().stream() + .filter(pair -> pair.left().equals(epKeyIndex)) + .count()) + 1; + // Update the metadata if new EP value has been discovered. + EP_METADATA.put(Pair.of(epKeyIndex, valueIndex), ep); + LOG.info("New EP Index: ({}, {}) for {}", epKeyIndex, valueIndex, ep); + MUST_UPDATE_EP_METADATA.countDown(); + return valueIndex; + } + } + } + + /** + * Finds the project root path. + * @return the project root path. + */ + private static String fetchProjectRootPath() { + return recursivelyFindLicense(Paths.get(System.getProperty("user.dir"))); + } + + /** + * Helper method to recursively find the LICENSE file. + * @param path the path to search for. + * @return the path containing the LICENSE file. + */ + private static String recursivelyFindLicense(final Path path) { + try (final Stream stream = Files.find(path, 1, (p, attributes) -> p.endsWith("LICENSE"))) { + if (stream.count() > 0) { + return path.toAbsolutePath().toString(); + } else { + return recursivelyFindLicense(path.getParent()); + } + } catch (IOException e) { + throw new MetricException(e); + } + } + + /** + * De-register Beam JDBC driver, which produces inconsistent results. + */ + public static void deregisterBeamDriver() { + final String beamDriver = "org.apache.beam.sdk.extensions.sql.impl.JdbcDriver"; + final Enumeration<Driver> drivers = DriverManager.getDrivers(); + while (drivers.hasMoreElements()) { + final Driver d = drivers.nextElement(); + if (d.getClass().getName().equals(beamDriver)) { + try { + DriverManager.deregisterDriver(d); + } catch (SQLException e) { + throw new MetricException(e); + } + break; + } + } + } +} diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java index b8d5885..ed2b20e 100644 --- a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java +++ b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java @@ -189,6 +189,8 @@ public final class NemoDriver { // send driver notification that user application is done. clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder() .setType(ControlMessage.DriverToClientMessageType.ExecutionDone).build()); + // flush metrics + runtimeMaster.flushMetrics(); }); } diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java index 1e77052..8c6c9d3 100644 --- a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java +++ b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java @@ -83,6 +83,7 @@ public final class UserApplicationRunner { // Execute! final Pair<PlanStateManager, ScheduledExecutorService> executionResult = runtimeMaster.execute(physicalPlan, maxScheduleAttempt); + runtimeMaster.recordIRDAGMetrics(optimizedDAG, physicalPlan.getPlanId()); // Wait for the job to finish and stop logging final PlanStateManager planStateManager = executionResult.left(); diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java index 1367e70..2de9fdf 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java @@ -44,6 +44,13 @@ public final class NemoEventDecoderFactory implements DecoderFactory { return new NemoEventDecoder(valueDecoderFactory.create(inputStream), inputStream); } + @Override + public String toString() { + return "NemoEventDecoderFactory{" + + "valueDecoderFactory=" + valueDecoderFactory + + '}'; + } + /** * This class decodes receive data into two types. * - normal data diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java index c49beda..d9b0836 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java @@ -44,6 +44,13 @@ public final class NemoEventEncoderFactory implements EncoderFactory { return new NemoEventEncoder(valueEncoderFactory.create(outputStream), outputStream); } + @Override + public String toString() { + return "NemoEventEncoderFactory{" + + "valueEncoderFactory=" + valueEncoderFactory + + '}'; + } + /** * This encodes normal data and WatermarkWithIndex. * @param <T> diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/MetricFlushTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/MetricFlushTest.java index 221b739..ee4b511 100644 --- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/MetricFlushTest.java +++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/MetricFlushTest.java @@ -22,12 +22,10 @@ import org.apache.nemo.runtime.common.comm.ControlMessage; import org.apache.nemo.runtime.common.message.*; import org.apache.nemo.runtime.common.message.local.LocalMessageDispatcher; import org.apache.nemo.runtime.common.message.local.LocalMessageEnvironment; -import org.apache.nemo.runtime.master.MetricManagerMaster; +import org.apache.nemo.runtime.master.metric.MetricManagerMaster; import org.apache.nemo.runtime.master.resource.ExecutorRepresenter; import org.apache.nemo.runtime.master.scheduler.ExecutorRegistry; -import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java index f796946..2a3d390 100644 --- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java +++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java @@ -50,6 +50,8 @@ import org.apache.nemo.runtime.executor.data.DataUtil; import org.apache.nemo.runtime.executor.data.SerializerManager; import org.apache.nemo.runtime.master.*; import org.apache.commons.io.FileUtils; +import org.apache.nemo.runtime.master.metric.MetricManagerMaster; +import org.apache.nemo.runtime.master.metric.MetricMessageHandler; import org.apache.nemo.runtime.master.scheduler.BatchScheduler; import org.apache.nemo.runtime.master.scheduler.Scheduler; import org.apache.reef.driver.evaluator.EvaluatorRequestor; @@ -145,6 +147,7 @@ public final class DataTransferTest { injector.bindVolatileInstance(MetricManagerMaster.class, mock(MetricManagerMaster.class)); injector.bindVolatileInstance(MetricMessageHandler.class, mock(MetricMessageHandler.class)); injector.bindVolatileParameter(JobConf.DAGDirectory.class, EMPTY_DAG_DIRECTORY); + injector.bindVolatileParameter(JobConf.JobId.class, "jobId"); // Necessary for wiring up the message environments injector.bindVolatileInstance(Scheduler.class, injector.getInstance(BatchScheduler.class)); diff --git a/runtime/master/pom.xml b/runtime/master/pom.xml index 9110d0b..0a5058a 100644 --- a/runtime/master/pom.xml +++ b/runtime/master/pom.xml @@ -77,6 +77,11 @@ under the License. <version>${jackson.version}</version> </dependency> <dependency> + <groupId>org.xerial</groupId> + <artifactId>sqlite-jdbc</artifactId> + <version>${sqlite-jdbc.version}</version> + </dependency> + <dependency> <!-- This is needed to view the logs when running unit tests. See https://dzone.com/articles/how-configure-slf4j-different for details. diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java index 5da9466..88f3686 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java @@ -44,6 +44,8 @@ import org.apache.nemo.runtime.common.state.TaskState; import org.apache.nemo.runtime.common.metric.JobMetric; import org.apache.nemo.runtime.common.metric.StageMetric; import org.apache.nemo.runtime.common.metric.TaskMetric; +import org.apache.nemo.runtime.master.metric.MetricMessageHandler; +import org.apache.nemo.runtime.master.metric.MetricStore; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.tang.annotations.Parameter; import org.slf4j.Logger; @@ -343,7 +345,7 @@ public final class PlanStateManager { switch (newTaskState) { // INCOMPLETE stage case SHOULD_RETRY: - final boolean isAPeerAttemptCompleted = getPeerAttemptsforTheSameTaskIndex(taskId).stream() + final boolean isAPeerAttemptCompleted = getPeerAttemptsForTheSameTaskIndex(taskId).stream() .anyMatch(state -> state.equals(TaskState.State.COMPLETE)); if (!isAPeerAttemptCompleted) { // None of the peers has completed, hence this stage is incomplete @@ -563,7 +565,7 @@ public final class PlanStateManager { || state.equals(TaskState.State.ON_HOLD); } - private List<TaskState.State> getPeerAttemptsforTheSameTaskIndex(final String taskId) { + private List<TaskState.State> getPeerAttemptsForTheSameTaskIndex(final String taskId) { final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId); final int taskIndex = RuntimeIdManager.getIndexFromTaskId(taskId); final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId); diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java index e5d8e30..119f23e 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java @@ -22,6 +22,7 @@ import com.google.protobuf.ByteString; import org.apache.commons.lang3.SerializationUtils; import org.apache.nemo.common.Pair; import org.apache.nemo.common.exception.*; +import org.apache.nemo.common.ir.IRDAG; import org.apache.nemo.common.ir.vertex.IRVertex; import org.apache.nemo.conf.JobConf; import org.apache.nemo.runtime.common.RuntimeIdManager; @@ -29,8 +30,12 @@ import org.apache.nemo.runtime.common.comm.ControlMessage; import org.apache.nemo.runtime.common.message.MessageContext; import org.apache.nemo.runtime.common.message.MessageEnvironment; import org.apache.nemo.runtime.common.message.MessageListener; +import org.apache.nemo.runtime.common.metric.JobMetric; import org.apache.nemo.runtime.common.plan.PhysicalPlan; import org.apache.nemo.runtime.common.state.TaskState; +import org.apache.nemo.runtime.master.metric.MetricManagerMaster; +import org.apache.nemo.runtime.master.metric.MetricMessageHandler; +import org.apache.nemo.runtime.master.metric.MetricStore; import org.apache.nemo.runtime.master.scheduler.BatchScheduler; import org.apache.nemo.runtime.master.servlet.*; import org.apache.nemo.runtime.master.resource.ContainerManager; @@ -94,7 +99,11 @@ public final class RuntimeMaster { private final PlanStateManager planStateManager; // For converting json data. This is a thread safe. private final ObjectMapper objectMapper; + private final String jobId; private final String dagDirectory; + private final String dbAddress; + private final String dbId; + private final String dbPassword; private final Set<IRVertex> irVertices; private final AtomicInteger resourceRequestCount; private CountDownLatch metricCountDownLatch; @@ -110,6 +119,10 @@ public final class RuntimeMaster { final ClientRPC clientRPC, final MetricManagerMaster metricManagerMaster, final PlanStateManager planStateManager, + @Parameter(JobConf.JobId.class) final String jobId, + @Parameter(JobConf.DBAddress.class) final String dbAddress, + @Parameter(JobConf.DBId.class) final String dbId, + @Parameter(JobConf.DBPasswd.class) final String dbPassword, @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) { // We would like to use a single thread for runtime master operations // since the processing logic in master takes a very short amount of time @@ -135,7 +148,11 @@ public final class RuntimeMaster { .setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, new MasterControlMessageReceiver()); this.clientRPC = clientRPC; this.metricManagerMaster = metricManagerMaster; + this.jobId = jobId; this.dagDirectory = dagDirectory; + this.dbAddress = dbAddress; + this.dbId = dbId; + this.dbPassword = dbPassword; this.irVertices = new HashSet<>(); this.resourceRequestCount = new AtomicInteger(0); this.objectMapper = new ObjectMapper(); @@ -144,6 +161,10 @@ public final class RuntimeMaster { this.planStateManager = planStateManager; } + /** + * Start Metric Server. + * @return the metric server. + */ private Server startRestMetricServer() { final Server server = new Server(REST_SERVER_PORT); @@ -159,12 +180,28 @@ public final class RuntimeMaster { try { server.start(); } catch (final Exception e) { - throw new RuntimeException("Failed to start REST API server."); + throw new MetricException("Failed to start REST API server: " + e); } return server; } + public void recordIRDAGMetrics(final IRDAG irdag, final String planId) { + metricStore.getOrCreateMetric(JobMetric.class, planId).setIRDAG(irdag); + } + + /** + * Flush metrics. + */ + public void flushMetrics() { + // send metric flush request to all executors + metricManagerMaster.sendMetricFlushRequest(); + + metricStore.dumpAllMetricToFile(Paths.get(dagDirectory, + "Metric_" + jobId + "_" + System.currentTimeMillis() + ".json").toString()); + metricStore.saveOptimizationMetricsToDB(dbAddress, dbId, dbPassword); + } + /** * Submits the {@link PhysicalPlan} to Runtime. * At now, we are assuming that a single job submit multiple plans. @@ -199,8 +236,6 @@ public final class RuntimeMaster { // No need to speculate anymore speculativeTaskCloningThread.shutdown(); - // send metric flush request to all executors - metricManagerMaster.sendMetricFlushRequest(); try { // wait for metric flush if (!metricCountDownLatch.await(METRIC_ARRIVE_TIMEOUT, TimeUnit.MILLISECONDS)) { @@ -225,10 +260,8 @@ public final class RuntimeMaster { try { metricServer.stop(); } catch (final Exception e) { - throw new RuntimeException("Failed to stop rest api server."); + throw new MetricException("Failed to stop rest api server: " + e); } - - metricStore.dumpAllMetricToFile(Paths.get(dagDirectory, "metric.json").toString()); }); // Do not shutdown runtimeMasterThread. We need it to clean things up. @@ -278,9 +311,8 @@ public final class RuntimeMaster { public void onContainerAllocated(final String executorId, final AllocatedEvaluator allocatedEvaluator, final Configuration executorConfiguration) { - runtimeMasterThread.execute(() -> { - containerManager.onContainerAllocated(executorId, allocatedEvaluator, executorConfiguration); - }); + runtimeMasterThread.execute(() -> + containerManager.onContainerAllocated(executorId, allocatedEvaluator, executorConfiguration)); } /** @@ -335,9 +367,8 @@ public final class RuntimeMaster { public final class MasterControlMessageReceiver implements MessageListener<ControlMessage.Message> { @Override public void onMessage(final ControlMessage.Message message) { - runtimeMasterThread.execute(() -> { - handleControlMessage(message); - }); + runtimeMasterThread.execute(() -> + handleControlMessage(message)); } @Override diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricBroadcaster.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricBroadcaster.java similarity index 98% rename from runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricBroadcaster.java rename to runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricBroadcaster.java index 1dbd0e6..f2bca8d 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricBroadcaster.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricBroadcaster.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.runtime.master; +package org.apache.nemo.runtime.master.metric; import org.eclipse.jetty.websocket.api.Session; import org.slf4j.Logger; diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricManagerMaster.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricManagerMaster.java similarity index 98% rename from runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricManagerMaster.java rename to runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricManagerMaster.java index d6e18c0..6eabcbe 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricManagerMaster.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricManagerMaster.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.runtime.master; +package org.apache.nemo.runtime.master.metric; import javax.inject.Inject; diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricMessageHandler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricMessageHandler.java similarity index 96% rename from runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricMessageHandler.java rename to runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricMessageHandler.java index 23cd748..b0ab1b5 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricMessageHandler.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricMessageHandler.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.runtime.master; +package org.apache.nemo.runtime.master.metric; import org.apache.reef.tang.annotations.DefaultImplementation; diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricStore.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java similarity index 54% rename from runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricStore.java rename to runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java index 42f22aa..c4ad75f 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/MetricStore.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java @@ -16,17 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.runtime.master; +package org.apache.nemo.runtime.master.metric; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nemo.common.exception.MetricException; import org.apache.nemo.common.exception.UnsupportedMetricException; import org.apache.nemo.runtime.common.metric.*; +import org.apache.nemo.runtime.common.state.PlanState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.*; +import java.lang.management.ManagementFactory; +import java.sql.*; import java.util.*; /** @@ -34,9 +40,10 @@ import java.util.*; * All metric classes should be JSON-serializable by {@link ObjectMapper}. */ public final class MetricStore { - private final Map<Class, Map<String, Object>> metricMap = new HashMap<>(); + private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class.getName()); + private final Map<Class<? extends Metric>, Map<String, Object>> metricMap = new HashMap<>(); // You can add more metrics by adding item to this metricList list. - private final Map<String, Class> metricList = new HashMap<>(); + private final Map<String, Class<? extends Metric>> metricList = new HashMap<>(); /** * Private constructor. */ @@ -66,7 +73,7 @@ public final class MetricStore { throw new NoSuchElementException(); } - return metricList.get(className); + return (Class<T>) metricList.get(className); } /** @@ -76,7 +83,7 @@ public final class MetricStore { * @param <T> class of metric */ public <T extends Metric> void putMetric(final T metric) { - final Class metricClass = metric.getClass(); + final Class<? extends Metric> metricClass = metric.getClass(); if (!metricList.values().contains(metricClass)) { throw new UnsupportedMetricException(new Throwable("Unsupported metric")); } @@ -106,11 +113,7 @@ public final class MetricStore { * @return a metric object. */ public <T extends Metric> Map<String, Object> getMetricMap(final Class<T> metricClass) { - final Map<String, Object> metric = metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()); - if (metric == null) { - throw new NoSuchElementException("No metric found"); - } - return metric; + return metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()); } /** @@ -122,13 +125,13 @@ public final class MetricStore { * @return a metric object. If there was no such metric, newly create one. */ public <T extends Metric> T getOrCreateMetric(final Class<T> metricClass, final String id) { - T metric = (T) metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).get(id); + T metric = (T) metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).get(id); if (metric == null) { try { metric = metricClass.getConstructor(new Class[]{String.class}).newInstance(id); putMetric(metric); } catch (final Exception e) { - throw new RuntimeException(e); + throw new MetricException(e); } } return metric; @@ -158,19 +161,19 @@ public final class MetricStore { final ObjectMapper objectMapper = new ObjectMapper(); final JsonFactory jsonFactory = new JsonFactory(); final ByteArrayOutputStream stream = new ByteArrayOutputStream(); - final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8); - jsonGenerator.setCodec(objectMapper); - jsonGenerator.writeStartObject(); - jsonGenerator.writeFieldName(metricClass.getSimpleName()); - jsonGenerator.writeStartObject(); - for (final Map.Entry<String, Object> idToMetricEntry : getMetricMap(metricClass).entrySet()) { - generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper); - } - jsonGenerator.writeEndObject(); - jsonGenerator.writeEndObject(); + try (final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8)) { + jsonGenerator.setCodec(objectMapper); - jsonGenerator.close(); + jsonGenerator.writeStartObject(); + jsonGenerator.writeFieldName(metricClass.getSimpleName()); + jsonGenerator.writeStartObject(); + for (final Map.Entry<String, Object> idToMetricEntry : getMetricMap(metricClass).entrySet()) { + generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper); + } + jsonGenerator.writeEndObject(); + jsonGenerator.writeEndObject(); + } return stream.toString(); } @@ -183,21 +186,22 @@ public final class MetricStore { final ObjectMapper objectMapper = new ObjectMapper(); final JsonFactory jsonFactory = new JsonFactory(); final ByteArrayOutputStream stream = new ByteArrayOutputStream(); - final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8); - jsonGenerator.setCodec(objectMapper); - jsonGenerator.writeStartObject(); - for (final Map.Entry<Class, Map<String, Object>> metricMapEntry : metricMap.entrySet()) { - jsonGenerator.writeFieldName(metricMapEntry.getKey().getSimpleName()); + try (final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8)) { + jsonGenerator.setCodec(objectMapper); + jsonGenerator.writeStartObject(); - for (final Map.Entry<String, Object> idToMetricEntry : metricMapEntry.getValue().entrySet()) { - generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper); + for (final Map.Entry<Class<? extends Metric>, Map<String, Object>> metricMapEntry : metricMap.entrySet()) { + jsonGenerator.writeFieldName(metricMapEntry.getKey().getSimpleName()); + jsonGenerator.writeStartObject(); + for (final Map.Entry<String, Object> idToMetricEntry : metricMapEntry.getValue().entrySet()) { + generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper); + } + jsonGenerator.writeEndObject(); } jsonGenerator.writeEndObject(); } - jsonGenerator.writeEndObject(); - jsonGenerator.close(); return stream.toString(); } @@ -206,17 +210,104 @@ public final class MetricStore { * @param filePath path to dump JSON. */ public void dumpAllMetricToFile(final String filePath) { - try { + try (final BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) { final String jsonDump = dumpAllMetricToJson(); - final BufferedWriter writer = new BufferedWriter(new FileWriter(filePath)); - writer.write(jsonDump); - writer.close(); } catch (final IOException e) { - throw new RuntimeException(e); + throw new MetricException(e); } } + /** + * Save the job metrics for the optimization to the DB, in the form of LibSVM, to a local SQLite DB. + * The metrics are as follows: the JCT (duration), and the IR DAG execution properties. + */ + private void saveOptimizationMetricsToLocal() { + final String[] syntax = {"INTEGER PRIMARY KEY AUTOINCREMENT"}; + + try (final Connection c = DriverManager.getConnection(MetricUtils.SQLITE_DB_NAME)) { + LOG.info("Opened database successfully at {}", MetricUtils.SQLITE_DB_NAME); + saveOptimizationMetrics(c, syntax); + } catch (SQLException e) { + LOG.error("Error while saving optimization metrics to SQLite: {}", e); + } + } + + /** + * Save the job metrics for the optimization to the DB, in the form of LibSVM, to a remote DB, if applicable. + * The metrics are as follows: the JCT (duration), and the IR DAG execution properties. + */ + public void saveOptimizationMetricsToDB(final String address, final String id, final String passwd) { + final String[] syntax = {"SERIAL PRIMARY KEY"}; + + if (!MetricUtils.metaDataLoaded()) { + saveOptimizationMetricsToLocal(); + return; + } + + try (final Connection c = DriverManager.getConnection(address, id, passwd)) { + LOG.info("Opened database successfully at {}", MetricUtils.POSTGRESQL_METADATA_DB_NAME); + MetricUtils.deregisterBeamDriver(); + saveOptimizationMetrics(c, syntax); + } catch (SQLException e) { + LOG.error("Error while saving optimization metrics to PostgreSQL: {}", e); + LOG.info("Saving metrics on the local SQLite DB"); + saveOptimizationMetricsToLocal(); + } + } + + /** + * Save the job metrics for the optimization to the DB, in the form of LibSVM. + * @param c the connection to the DB. + * @param syntax the db-specific syntax. + */ + private void saveOptimizationMetrics(final Connection c, final String[] syntax) { + try (final Statement statement = c.createStatement()) { + statement.setQueryTimeout(30); // set timeout to 30 sec. + + getMetricMap(JobMetric.class).values().forEach(o -> { + final JobMetric jobMetric = (JobMetric) o; + final String tableName = jobMetric.getIrDagSummary(); + + final long startTime = jobMetric.getStateTransitionEvents().stream() + .filter(ste -> ste.getPrevState().equals(PlanState.State.READY) + && ste.getNewState().equals(PlanState.State.EXECUTING)) + .findFirst().orElseThrow(() -> new MetricException("job has never started")) + .getTimestamp(); + final long endTime = jobMetric.getStateTransitionEvents().stream() + .filter(ste -> ste.getNewState().equals(PlanState.State.COMPLETE)) + .findFirst().orElseThrow(() -> new MetricException("job has never completed")) + .getTimestamp(); + final long duration = endTime - startTime; // ms + final String vertexProperties = jobMetric.getVertexProperties(); + final String edgeProperties = jobMetric.getEdgeProperties(); + final Integer inputSize = jobMetric.getInputSize(); + final long jvmMemSize = Runtime.getRuntime().maxMemory(); + final long memSize = ((com.sun.management.OperatingSystemMXBean) ManagementFactory + .getOperatingSystemMXBean()).getTotalPhysicalMemorySize(); + + try { + statement.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName + + " (id " + syntax[0] + ", duration INTEGER NOT NULL, inputsize INTEGER NOT NULL, " + + "jvmmemsize BIGINT NOT NULL, memsize BIGINT NOT NULL, " + + "vertex_properties TEXT NOT NULL, edge_properties TEXT NOT NULL, " + + "created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);"); + LOG.info("CREATED TABLE For {} IF NOT PRESENT", tableName); + + statement.executeUpdate("INSERT INTO " + tableName + + " (duration, inputsize, jvmmemsize, memsize, vertex_properties, edge_properties) " + + "VALUES (" + duration + ", " + inputSize + ", " + + jvmMemSize + ", " + memSize + ", '" + + vertexProperties + "', '" + edgeProperties + "');"); + LOG.info("Recorded metrics on the table for {}", tableName); + } catch (SQLException e) { + LOG.error("Error while saving optimization metrics: {}", e); + } + }); + } catch (SQLException e) { + LOG.error("Error while saving optimization metrics: {}", e); + } + } /** * Send changed metric data to {@link MetricBroadcaster}, which will broadcast it to @@ -232,10 +323,7 @@ public final class MetricStore { final T metric = getMetricWithId(metricClass, id); final JsonFactory jsonFactory = new JsonFactory(); final ByteArrayOutputStream stream = new ByteArrayOutputStream(); - final JsonGenerator jsonGenerator; - try { - jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8); - + try (final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8)) { jsonGenerator.setCodec(objectMapper); jsonGenerator.writeStartObject(); @@ -246,11 +334,9 @@ public final class MetricStore { jsonGenerator.writeObject(metric); jsonGenerator.writeEndObject(); - jsonGenerator.close(); - metricBroadcaster.broadcast(stream.toString()); } catch (final IOException e) { - throw new RuntimeException(e); + throw new MetricException(e); } } } diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/AllMetricServlet.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/AllMetricServlet.java index 66ce890..e09c579 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/AllMetricServlet.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/AllMetricServlet.java @@ -18,7 +18,7 @@ */ package org.apache.nemo.runtime.master.servlet; -import org.apache.nemo.runtime.master.MetricStore; +import org.apache.nemo.runtime.master.metric.MetricStore; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/JobMetricServlet.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/JobMetricServlet.java index a9666ca..d965489 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/JobMetricServlet.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/JobMetricServlet.java @@ -18,7 +18,7 @@ */ package org.apache.nemo.runtime.master.servlet; -import org.apache.nemo.runtime.master.MetricStore; +import org.apache.nemo.runtime.master.metric.MetricStore; import org.apache.nemo.runtime.common.metric.JobMetric; import javax.servlet.http.HttpServlet; diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/StageMetricServlet.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/StageMetricServlet.java index 52741f8..5d77d46 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/StageMetricServlet.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/StageMetricServlet.java @@ -18,7 +18,7 @@ */ package org.apache.nemo.runtime.master.servlet; -import org.apache.nemo.runtime.master.MetricStore; +import org.apache.nemo.runtime.master.metric.MetricStore; import org.apache.nemo.runtime.common.metric.StageMetric; import javax.servlet.http.HttpServlet; diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/TaskMetricServlet.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/TaskMetricServlet.java index e225a3e..f80033c 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/TaskMetricServlet.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/TaskMetricServlet.java @@ -18,7 +18,7 @@ */ package org.apache.nemo.runtime.master.servlet; -import org.apache.nemo.runtime.master.MetricStore; +import org.apache.nemo.runtime.master.metric.MetricStore; import org.apache.nemo.runtime.common.metric.TaskMetric; import javax.servlet.http.HttpServlet; diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/WebSocketMetricAdapter.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/WebSocketMetricAdapter.java index c1b7907..62821c6 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/WebSocketMetricAdapter.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/servlet/WebSocketMetricAdapter.java @@ -18,7 +18,7 @@ */ package org.apache.nemo.runtime.master.servlet; -import org.apache.nemo.runtime.master.MetricBroadcaster; +import org.apache.nemo.runtime.master.metric.MetricBroadcaster; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketAdapter; diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/MetricStoreTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/MetricStoreTest.java index 932c203..afcaeb8 100644 --- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/MetricStoreTest.java +++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/MetricStoreTest.java @@ -21,6 +21,7 @@ package org.apache.nemo.runtime.master; import com.fasterxml.jackson.core.TreeNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nemo.runtime.common.metric.JobMetric; +import org.apache.nemo.runtime.master.metric.MetricStore; import org.junit.Test; import java.io.IOException; diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/PlanStateManagerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/PlanStateManagerTest.java index f2339cf..4d771da 100644 --- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/PlanStateManagerTest.java +++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/PlanStateManagerTest.java @@ -29,6 +29,7 @@ import org.apache.nemo.runtime.common.state.PlanState; import org.apache.nemo.runtime.common.state.StageState; import org.apache.nemo.runtime.common.state.TaskState; import org.apache.nemo.runtime.common.plan.TestPlanGenerator; +import org.apache.nemo.runtime.master.metric.MetricMessageHandler; import org.apache.reef.tang.Injector; import org.junit.Before; import org.junit.Test; diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java index 2aeb335..1092954 100644 --- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java +++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java @@ -25,7 +25,7 @@ import org.apache.nemo.runtime.common.comm.ControlMessage; import org.apache.nemo.runtime.common.message.MessageSender; import org.apache.nemo.runtime.common.plan.*; import org.apache.nemo.runtime.master.PlanStateManager; -import org.apache.nemo.runtime.master.MetricMessageHandler; +import org.apache.nemo.runtime.master.metric.MetricMessageHandler; import org.apache.nemo.runtime.master.BlockManagerMaster; import org.apache.nemo.runtime.master.resource.ExecutorRepresenter; import org.apache.nemo.runtime.master.resource.ResourceSpecification; diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java index 2b8adf2..e2e078e 100644 --- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java +++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java @@ -31,7 +31,7 @@ import org.apache.nemo.runtime.common.state.BlockState; import org.apache.nemo.runtime.common.state.PlanState; import org.apache.nemo.runtime.common.state.TaskState; import org.apache.nemo.runtime.master.BlockManagerMaster; -import org.apache.nemo.runtime.master.MetricMessageHandler; +import org.apache.nemo.runtime.master.metric.MetricMessageHandler; import org.apache.nemo.runtime.master.PlanStateManager; import org.apache.nemo.runtime.master.resource.ExecutorRepresenter; import org.apache.nemo.runtime.master.resource.ResourceSpecification;