johnyangk closed pull request #85: [NEMO-172] Implement one partition per one 
element partitioner
URL: https://github.com/apache/incubator-nemo/pull/85
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
index 441949e33..cb50ef35c 100644
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
@@ -46,5 +46,6 @@ public static CompressionProperty of(final Value value) {
   public enum Value {
     Gzip,
     LZ4,
+    None
   }
 }
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
index 249433e77..04f5c072e 100644
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
@@ -44,6 +44,7 @@ public static PartitionerProperty of(final Value value) {
   public enum Value {
     DataSkewHashPartitioner,
     HashPartitioner,
-    IntactPartitioner
+    IntactPartitioner,
+    DedicatedKeyPerElementPartitioner
   }
 }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
new file mode 100644
index 000000000..e1b31f2cf
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A pass to support Sailfish-like shuffle by tagging edges.
+ * This pass modifies the decoder property toward {@link 
edu.snu.nemo.common.ir.vertex.transform.RelayTransform}
+ * to read data as byte arrays.
+ */
+public final class LargeShuffleDecompressionPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public LargeShuffleDecompressionPass() {
+    super(DecompressionProperty.class, 
Collections.singleton(CommunicationPatternProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.getVertices().forEach(vertex -> {
+      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
+      inEdges.forEach(edge -> {
+        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+            .equals(CommunicationPatternProperty.Value.Shuffle)) {
+          
edge.setProperty(DecompressionProperty.of(CompressionProperty.Value.None));
+
+          dag.getOutgoingEdgesOf(edge.getDst())
+              .forEach(edgeFromRelay -> {
+                
edgeFromRelay.setProperty(DecompressionProperty.of(CompressionProperty.Value.LZ4));
+              });
+        }
+      });
+    });
+    return dag;
+  }
+}
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
new file mode 100644
index 000000000..eb3b2bfdb
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A pass to support Sailfish-like shuffle by tagging edges.
+ * This pass modifies the partitioner property from {@link 
edu.snu.nemo.common.ir.vertex.transform.RelayTransform}
+ * to write an element as a partition.
+ * This enables that every byte[] element, which was a partition for the 
reduce task, becomes one partition again
+ * and flushed to disk write after it is relayed.
+ */
+public final class LargeShufflePartitionerPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public LargeShufflePartitionerPass() {
+    super(PartitionerProperty.class, 
Collections.singleton(CommunicationPatternProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.getVertices().forEach(vertex -> {
+      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
+      inEdges.forEach(edge -> {
+        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+            .equals(CommunicationPatternProperty.Value.Shuffle)) {
+          dag.getOutgoingEdgesOf(edge.getDst())
+              .forEach(edgeFromRelay -> {
+                edgeFromRelay.setProperty(PartitionerProperty.of(
+                    
PartitionerProperty.Value.DedicatedKeyPerElementPartitioner));
+              });
+        }
+      });
+    });
+    return dag;
+  }
+}
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
new file mode 100644
index 000000000..abc9300da
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A pass to support Sailfish-like shuffle by tagging edges.
+ * This pass modifies the encoder property toward {@link 
edu.snu.nemo.common.ir.vertex.transform.RelayTransform}
+ * to write data as byte arrays.
+ */
+public final class LargeSuffleCompressionPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public LargeSuffleCompressionPass() {
+    super(CompressionProperty.class, 
Collections.singleton(CommunicationPatternProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.getVertices().forEach(vertex -> {
+      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
+      inEdges.forEach(edge -> {
+        if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+            .equals(CommunicationPatternProperty.Value.Shuffle)) {
+          
edge.setProperty(CompressionProperty.of(CompressionProperty.Value.LZ4));
+
+          dag.getOutgoingEdgesOf(edge.getDst())
+              .forEach(edgeFromRelay -> {
+                
edgeFromRelay.setProperty(CompressionProperty.of(CompressionProperty.Value.None));
+              });
+        }
+      });
+    });
+    return dag;
+  }
+}
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
index c6108e2c6..e69e56c8e 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePass.java
@@ -35,6 +35,9 @@ public LargeShuffleCompositePass() {
         new LargeShuffleDataStorePass(),
         new LargeShuffleDecoderPass(),
         new LargeShuffleEncoderPass(),
+        new LargeShufflePartitionerPass(),
+        new LargeSuffleCompressionPass(),
+        new LargeShuffleDecompressionPass(),
         new LargeShuffleDataPersistencePass(),
         new LargeShuffleResourceSlotPass()
     ));
diff --git 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
index d97816ca2..f85927077 100644
--- 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
+++ 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/LargeShuffleCompositePassTest.java
@@ -64,6 +64,10 @@ public void testLargeShuffle() {
                 edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
             assertEquals(BytesDecoderFactory.of(),
                 edgeToMerger.getPropertyValue(DecoderProperty.class).get());
+            assertEquals(CompressionProperty.Value.LZ4,
+                
edgeToMerger.getPropertyValue(CompressionProperty.class).get());
+            assertEquals(CompressionProperty.Value.None,
+                
edgeToMerger.getPropertyValue(DecompressionProperty.class).get());
           } else {
             assertEquals(DataFlowProperty.Value.Pull,
                 edgeToMerger.getPropertyValue(DataFlowProperty.class).get());
@@ -78,6 +82,12 @@ public void testLargeShuffle() {
               edgeFromMerger.getPropertyValue(DataStoreProperty.class).get());
           assertEquals(BytesEncoderFactory.of(),
               edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
+          
assertEquals(PartitionerProperty.Value.DedicatedKeyPerElementPartitioner,
+              
edgeFromMerger.getPropertyValue(PartitionerProperty.class).get());
+          assertEquals(CompressionProperty.Value.None,
+              
edgeFromMerger.getPropertyValue(CompressionProperty.class).get());
+          assertEquals(CompressionProperty.Value.LZ4,
+              
edgeFromMerger.getPropertyValue(DecompressionProperty.class).get());
         });
       } else {
         // Non merger vertex.
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
index 10602b942..e0de210ae 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
@@ -103,13 +103,27 @@
    * Commits this block to prevent further write.
    *
    * @return the size of each partition if the data in the block is serialized.
-   * @throws BlockWriteException for any error occurred while trying to write 
a block.
+   * @throws BlockWriteException for any error occurred while trying to commit 
a block.
    *                             (This exception will be thrown to the 
scheduler
    *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
    *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
    */
   Optional<Map<K, Long>> commit() throws BlockWriteException;
 
+  /**
+   * Commits all un-committed partitions.
+   * This method can be useful if partitions in a block should be committed 
before the block is committed totally.
+   * For example, non-committed partitions in a file block can be flushed to 
storage from memory.
+   * If another element is written after this method is called, a new 
non-committed partition should be created
+   * for the element even if a partition with the same key is committed 
already.
+   *
+   * @throws BlockWriteException for any error occurred while trying to commit 
partitions.
+   *                             (This exception will be thrown to the 
scheduler
+   *                             through {@link 
edu.snu.nemo.runtime.executor.Executor} and
+   *                             have to be handled by the scheduler with 
fault tolerance mechanism.)
+   */
+  void commitPartitions() throws BlockWriteException;
+
   /**
    * @return the ID of this block.
    */
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 41f125b65..b9a04e437 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -20,6 +20,7 @@
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.executor.data.*;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.partition.Partition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.metadata.PartitionMetadata;
@@ -314,13 +315,7 @@ public void deleteFile() throws IOException {
   public synchronized Optional<Map<K, Long>> commit() throws 
BlockWriteException {
     try {
       if (!metadata.isCommitted()) {
-        final List<SerializedPartition<K>> partitions = new ArrayList<>();
-        for (final SerializedPartition<K> partition : 
nonCommittedPartitionsMap.values()) {
-          partition.commit();
-          partitions.add(partition);
-        }
-        writeToFile(partitions);
-        nonCommittedPartitionsMap.clear();
+        commitPartitions();
         metadata.commitBlock();
       }
       final List<PartitionMetadata<K>> partitionMetadataList = 
metadata.getPartitionMetadataList();
@@ -341,6 +336,25 @@ public void deleteFile() throws IOException {
     }
   }
 
+  /**
+   * Commits all un-committed partitions.
+   * The committed partitions will be flushed to the storage.
+   */
+  @Override
+  public synchronized void commitPartitions() throws BlockWriteException {
+    final List<SerializedPartition<K>> partitions = new ArrayList<>();
+    try {
+      for (final Partition<?, K> partition : 
nonCommittedPartitionsMap.values()) {
+        partition.commit();
+        partitions.add((SerializedPartition<K>) partition);
+      }
+      writeToFile(partitions);
+      nonCommittedPartitionsMap.clear();
+    } catch (final IOException e) {
+      throw new BlockWriteException(e);
+    }
+  }
+
   /**
    * @return the ID of this block.
    */
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index 6a8323a9b..5bf1e0195 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -187,6 +187,18 @@ public void writeSerializedPartitions(final 
Iterable<SerializedPartition<K>> par
     return Optional.empty();
   }
 
+  /**
+   * Commits all un-committed partitions.
+   */
+  @Override
+  public synchronized void commitPartitions() throws BlockWriteException {
+    nonCommittedPartitionsMap.forEach((key, partition) -> {
+      partition.commit();
+      nonSerializedPartitions.add(partition);
+    });
+    nonCommittedPartitionsMap.clear();
+  }
+
   /**
    * @return the ID of this block.
    */
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index 4282b69c7..847558fc8 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -180,11 +180,7 @@ public void writeSerializedPartitions(final 
Iterable<SerializedPartition<K>> par
   public synchronized Optional<Map<K, Long>> commit() throws 
BlockWriteException {
     try {
       if (!committed) {
-        for (final SerializedPartition<K> partition : 
nonCommittedPartitionsMap.values()) {
-          partition.commit();
-          serializedPartitions.add(partition);
-        }
-        nonCommittedPartitionsMap.clear();
+        commitPartitions();
         committed = true;
       }
       final Map<K, Long> partitionSizes = new 
HashMap<>(serializedPartitions.size());
@@ -204,6 +200,22 @@ public void writeSerializedPartitions(final 
Iterable<SerializedPartition<K>> par
     }
   }
 
+  /**
+   * Commits all un-committed partitions.
+   */
+  @Override
+  public synchronized void commitPartitions() throws BlockWriteException {
+    try {
+      for (final SerializedPartition<K> partition : 
nonCommittedPartitionsMap.values()) {
+        partition.commit();
+        serializedPartitions.add(partition);
+      }
+      nonCommittedPartitionsMap.clear();
+    } catch (final IOException e) {
+      throw new BlockWriteException(e);
+    }
+  }
+
   /**
    * @return the ID of this block.
    */
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
new file mode 100644
index 000000000..02c9bedfb
--- /dev/null
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data.partitioner;
+
+import java.lang.annotation.*;
+
+/**
+ * Declares that all of the designated keys for each element in a {@link 
Partitioner} is dedicated for the element.
+ */
+@Target({ElementType.TYPE})
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DedicatedKeyPerElement {
+}
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
new file mode 100644
index 000000000..f01f08b60
--- /dev/null
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data.partitioner;
+
+/**
+ * An implementation of {@link Partitioner} which assigns a dedicated key per 
an output data from a task.
+ * WARNING: Because this partitioner assigns a dedicated key per element, it 
should be used under specific circumstances
+ * that the number of output element is not that many. For example, every 
output element of
+ * {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform} inserted by 
large shuffle optimization is always
+ * a partition. In this case, assigning a key for each element can be useful.
+ */
+@DedicatedKeyPerElement
+public final class DedicatedKeyPerElementPartitioner implements 
Partitioner<Integer> {
+  private int key;
+
+  /**
+   * Constructor.
+   */
+  public DedicatedKeyPerElementPartitioner() {
+    key = 0;
+  }
+
+  @Override
+  public Integer partition(final Object element) {
+    return key++;
+  }
+}
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
index 467ba181f..00f4ca216 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
@@ -45,6 +45,8 @@ public final OutputStream chainOutput(final OutputStream out) 
throws IOException
         return new GZIPOutputStream(out);
       case LZ4:
         return new LZ4BlockOutputStream(out);
+      case None:
+        return out;
       default:
         throw new UnsupportedCompressionException("Not supported compression 
method");
     }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
index 558bd357f..b36546c71 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
@@ -45,6 +45,8 @@ public final InputStream chainInput(final InputStream in) 
throws IOException {
         return new GZIPInputStream(in);
       case LZ4:
         return new LZ4BlockInputStream(in);
+      case None:
+        return in;
       default:
         throw new UnsupportedCompressionException("Not supported compression 
method");
     }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 930a4b4a4..6e4164a13 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -82,6 +82,9 @@
       case DataSkewHashPartitioner:
         this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, 
dstParallelism, keyExtractor.get());
         break;
+      case DedicatedKeyPerElementPartitioner:
+        this.partitioner = new DedicatedKeyPerElementPartitioner();
+        break;
       default:
         throw new UnsupportedPartitionerException(
             new Throwable("Partitioner " + partitionerPropertyValue + " is not 
supported."));
@@ -103,6 +106,12 @@
   public void write(final Object element) {
     if (nonDummyBlock) {
       blockToWrite.write(partitioner.partition(element), element);
+
+      final DedicatedKeyPerElement dedicatedKeyPerElement =
+          partitioner.getClass().getAnnotation(DedicatedKeyPerElement.class);
+      if (dedicatedKeyPerElement != null) {
+        blockToWrite.commitPartitions();
+      }
     } // If else, does not need to write because the data is duplicated.
   }
 
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
new file mode 100644
index 000000000..c05745d28
--- /dev/null
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data;
+
+import edu.snu.nemo.common.coder.IntDecoderFactory;
+import edu.snu.nemo.common.coder.IntEncoderFactory;
+import edu.snu.nemo.runtime.common.data.HashRange;
+import edu.snu.nemo.runtime.executor.data.block.Block;
+import edu.snu.nemo.runtime.executor.data.block.FileBlock;
+import edu.snu.nemo.runtime.executor.data.block.NonSerializedMemoryBlock;
+import edu.snu.nemo.runtime.executor.data.block.SerializedMemoryBlock;
+import edu.snu.nemo.runtime.executor.data.metadata.LocalFileMetadata;
+import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.*;
+
+/**
+ * Tests write and read for {@link Block}s.
+ */
+public final class BlockTest {
+  private Serializer serializer;
+  private Map<Integer, List<Integer>> testData;
+
+  /**
+   * Generates the test data and serializer.
+   */
+  @Before
+  public void setUp() throws Exception {
+    serializer = new Serializer<>(IntEncoderFactory.of(), 
IntDecoderFactory.of(), new ArrayList<>(), new ArrayList<>());
+    testData = new HashMap<>();
+
+    final List<Integer> list1 = Collections.singletonList(1);
+    final List<Integer> list2 = Arrays.asList(1, 2);
+    final List<Integer> list3 = Arrays.asList(1, 2, 3);
+
+    testData.put(1, list1);
+    testData.put(2, list2);
+    testData.put(3, list3);
+  }
+
+  /**
+   * Test {@link NonSerializedMemoryBlock}.
+   */
+  @Test(timeout = 10000)
+  public void testNonSerializedMemoryBlock() throws Exception {
+    final Block<Integer> block = new NonSerializedMemoryBlock<>("testBlock", 
serializer);
+    testBlock(block);
+  }
+
+  /**
+   * Test {@link 
edu.snu.nemo.runtime.executor.data.block.SerializedMemoryBlock}.
+   */
+  @Test(timeout = 10000)
+  public void testSerializedMemoryBlock() throws Exception {
+    final Block<Integer> block = new SerializedMemoryBlock<>("testBlock", 
serializer);
+    testBlock(block);
+  }
+
+  /**
+   * Test {@link FileBlock}.
+   */
+  @Test(timeout = 10000)
+  public void testFileBlock() throws Exception {
+    final String tmpDir = "./tmpFiles";
+    final String filePath = tmpDir + "/BlockTestFile";
+    try {
+      new File(tmpDir).mkdirs();
+      final LocalFileMetadata<Integer> metadata = new LocalFileMetadata<>();
+      final Block<Integer> block = new FileBlock<>("testBlock", serializer, 
filePath, metadata);
+      testBlock(block);
+    } finally {
+      FileUtils.deleteDirectory(new File(tmpDir));
+    }
+  }
+
+
+  /**
+   * Tests write to & read from a block.
+   */
+  private void testBlock(final Block<Integer> block) throws Exception {
+    // Write elements to partitions in the block
+    testData.forEach((key, partitionData) -> partitionData.forEach(element -> 
block.write(key, element)));
+
+    // Commit all partitions
+    block.commitPartitions();
+
+    // Write elements again. Because all partitions are committed, new 
partitions for each key will be created.
+    testData.forEach((key, partitionData) -> partitionData.forEach(element -> 
block.write(key, element)));
+
+    // Commit the block
+    block.commit();
+
+    int count = 0;
+    final Iterable<NonSerializedPartition<Integer>> partitions = 
block.readPartitions(HashRange.all());
+    for (final NonSerializedPartition<Integer> readPartition : partitions) {
+      count++;
+      final List<Integer> expectedData = testData.get(readPartition.getKey());
+      final Iterable<Integer> readData = readPartition.getData();
+      compare(expectedData, readData);
+    }
+    Assert.assertEquals(count, testData.size() * 2);
+  }
+
+  /**
+   * Compare the contents of a list and an iterable.
+   * @param list     the list to test.
+   * @param iterable the iterable to test.
+   * @throws RuntimeException if the contents are not matched.
+   */
+  private void compare(final List<Integer> list,
+                       final Iterable<Integer> iterable) throws 
RuntimeException {
+    final List<Integer> copiedList = new ArrayList<>(list);
+    for (final Integer element : iterable) {
+      if (!copiedList.remove(element)) {
+        throw new RuntimeException("Contents mismatch! \nlist: " + list + 
"\niterable: " + iterable);
+      }
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to