[GitHub] sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-09-04 Thread GitBox
sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use 
MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index f41146b64..f3caf7dd4 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -157,9 +157,9 @@ def dot(self):
 label += '{}:{}'.format(transform_name, class_name)
 except:
 pass
-if ('class' in self.properties and self.properties['class'] == 
'MetricCollectionBarrierVertex'):
+if ('class' in self.properties and self.properties['class'] == 
'AggregationBarrierVertex'):
 shape = ', shape=box'
-label += 'MetricCollectionBarrier'
+label += 'AggregationBarrier'
 else:
 shape = ''
 try:
diff --git a/common/pom.xml b/common/pom.xml
index da5a48c80..18ef10b02 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -52,5 +52,11 @@ limitations under the License.
 ${hadoop.version}
 provided
 
+
+  org.apache.beam
+  beam-sdks-java-core
+  ${beam.version}
+
+
 
 
diff --git a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java 
b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
index be7cf592c..23bc6abd3 100644
--- a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
+++ b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
@@ -24,6 +24,7 @@
 public interface KeyExtractor extends Serializable {
   /**
* Extracts key.
+   *
* @param element Element to get the key from.
* @return The extracted key of the element.
*/
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
index dc67ff355..16fa877c5 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -20,8 +20,8 @@
 import java.io.Serializable;
 
 /**
- * A decoder factory object which generates decoders that decode values of 
type {@code T} into byte streams.
- * To avoid to generate instance-based coder such as Spark serializer for 
every decoding,
+ * A decoder factory object which generates decoders that decode byte streams 
into values of type {@code T}.
+ * To avoid generating instance-based coder such as Spark serializer for every 
decoding,
  * user need to instantiate a decoder instance and use it.
  *
  * @param  element type.
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
index d63fafb9c..82c3730c0 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
@@ -46,7 +46,7 @@
 
 /**
  * Encodes the given value onto the specified output stream.
- * It have to be able to encode the given stream consequently by calling 
this method repeatedly.
+ * It has to be able to encode the given stream consequently by calling 
this method repeatedly.
  * Because the user can want to keep a single output stream and 
continuously concatenate elements,
  * the output stream should not be closed.
  *
diff --git 
a/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
new file mode 100644
index 0..4335413a4
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link DecoderFactory} which is used for long.
+ */
+public final class LongDecoderFactory implements DecoderFactory {
+
+  private static final LongDecoderFactory LONG_DECODER_FACTORY = new 
LongDecoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private LongDecoderFactory() {
+ 

[GitHub] sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-09-04 Thread GitBox
sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use 
MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index f41146b64..f3caf7dd4 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -157,9 +157,9 @@ def dot(self):
 label += '{}:{}'.format(transform_name, class_name)
 except:
 pass
-if ('class' in self.properties and self.properties['class'] == 
'MetricCollectionBarrierVertex'):
+if ('class' in self.properties and self.properties['class'] == 
'AggregationBarrierVertex'):
 shape = ', shape=box'
-label += 'MetricCollectionBarrier'
+label += 'AggregationBarrier'
 else:
 shape = ''
 try:
diff --git a/common/pom.xml b/common/pom.xml
index da5a48c80..18ef10b02 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -52,5 +52,11 @@ limitations under the License.
 ${hadoop.version}
 provided
 
+
+  org.apache.beam
+  beam-sdks-java-core
+  ${beam.version}
+
+
 
 
diff --git a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java 
b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
index be7cf592c..23bc6abd3 100644
--- a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
+++ b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
@@ -24,6 +24,7 @@
 public interface KeyExtractor extends Serializable {
   /**
* Extracts key.
+   *
* @param element Element to get the key from.
* @return The extracted key of the element.
*/
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
index dc67ff355..16fa877c5 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -20,8 +20,8 @@
 import java.io.Serializable;
 
 /**
- * A decoder factory object which generates decoders that decode values of 
type {@code T} into byte streams.
- * To avoid to generate instance-based coder such as Spark serializer for 
every decoding,
+ * A decoder factory object which generates decoders that decode byte streams 
into values of type {@code T}.
+ * To avoid generating instance-based coder such as Spark serializer for every 
decoding,
  * user need to instantiate a decoder instance and use it.
  *
  * @param  element type.
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
index d63fafb9c..82c3730c0 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
@@ -46,7 +46,7 @@
 
 /**
  * Encodes the given value onto the specified output stream.
- * It have to be able to encode the given stream consequently by calling 
this method repeatedly.
+ * It has to be able to encode the given stream consequently by calling 
this method repeatedly.
  * Because the user can want to keep a single output stream and 
continuously concatenate elements,
  * the output stream should not be closed.
  *
diff --git 
a/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
new file mode 100644
index 0..4335413a4
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link DecoderFactory} which is used for long.
+ */
+public final class LongDecoderFactory implements DecoderFactory {
+
+  private static final LongDecoderFactory LONG_DECODER_FACTORY = new 
LongDecoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private LongDecoderFactory() {
+