johnyangk closed pull request #47: [NEMO-83] Move /tests/runtime into /runtime/tests URL: https://github.com/apache/incubator-nemo/pull/47
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java new file mode 100644 index 00000000..face9942 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.coder; + +import java.io.*; + +/** + * A {@link Coder} which is used for an integer. + */ +public final class IntCoder implements Coder<Integer> { + + /** + * A private constructor. + */ + private IntCoder() { + } + + /** + * Static initializer of the coder. + */ + public static IntCoder of() { + return new IntCoder(); + } + + @Override + public void encode(final Integer value, final OutputStream outStream) throws IOException { + final DataOutputStream dataOutputStream = new DataOutputStream(outStream); + dataOutputStream.writeInt(value); + } + + @Override + public Integer decode(final InputStream inStream) throws IOException { + // If the inStream is closed well in upper level, it is okay to not close this stream + // because the DataInputStream itself will not contain any extra information. + // (when we close this stream, the inStream will be closed together.) + final DataInputStream dataInputStream = new DataInputStream(inStream); + return dataInputStream.readInt(); + } +} diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/PairCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java similarity index 50% rename from compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/PairCoder.java rename to common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java index 0fe6e881..3ad3552a 100644 --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/PairCoder.java +++ b/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java @@ -13,26 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.compiler.frontend.beam.coder; +package edu.snu.nemo.common.coder; import edu.snu.nemo.common.Pair; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StructuredCoder; -import org.apache.beam.sdk.util.common.ElementByteSizeObserver; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; /** - * BEAM Coder for {@link edu.snu.nemo.common.Pair}. Reference: KvCoder in BEAM. + * A Coder for {@link edu.snu.nemo.common.Pair}. Reference: KvCoder in BEAM. * @param <A> type for the left coder. * @param <B> type for the right coder. */ -public final class PairCoder<A, B> extends StructuredCoder<Pair<A, B>> { +public final class PairCoder<A, B> implements Coder<Pair<A, B>> { private final Coder<A> leftCoder; private final Coder<B> rightCoder; @@ -64,6 +57,7 @@ private PairCoder(final Coder<A> leftCoder, final Coder<B> rightCoder) { Coder<A> getLeftCoder() { return leftCoder; } + /** * @return the right coder. */ @@ -71,12 +65,10 @@ private PairCoder(final Coder<A> leftCoder, final Coder<B> rightCoder) { return rightCoder; } - //===================================================================================================== - @Override public void encode(final Pair<A, B> pair, final OutputStream outStream) throws IOException { if (pair == null) { - throw new CoderException("cannot encode a null KV"); + throw new IOException("cannot encode a null pair"); } leftCoder.encode(pair.left(), outStream); rightCoder.encode(pair.right(), outStream); @@ -88,52 +80,4 @@ public void encode(final Pair<A, B> pair, final OutputStream outStream) throws I final B value = rightCoder.decode(inStream); return Pair.of(key, value); } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Arrays.asList(leftCoder, rightCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic(this, "Key coder must be deterministic", getLeftCoder()); - verifyDeterministic(this, "Value coder must be deterministic", getRightCoder()); - } - - @Override - public boolean consistentWithEquals() { - return leftCoder.consistentWithEquals() && rightCoder.consistentWithEquals(); - } - - @Override - public Object structuralValue(final Pair<A, B> pair) { - if (consistentWithEquals()) { - return pair; - } else { - return Pair.of(getLeftCoder().structuralValue(pair.left()), getRightCoder().structuralValue(pair.right())); - } - } - - /** - * Returns whether both leftCoder and rightCoder are considered not expensive. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(final Pair<A, B> pair) { - return leftCoder.isRegisterByteSizeObserverCheap(pair.left()) - && rightCoder.isRegisterByteSizeObserverCheap(pair.right()); - } - - /** - * Notifies ElementByteSizeObserver about the byte size of the - * encoded value using this coder. - */ - @Override - public void registerByteSizeObserver(final Pair<A, B> pair, - final ElementByteSizeObserver observer) throws Exception { - if (pair == null) { - throw new CoderException("cannot encode a null Pair"); - } - leftCoder.registerByteSizeObserver(pair.left(), observer); - rightCoder.registerByteSizeObserver(pair.right(), observer); - } } diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeTestUtil.java similarity index 85% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeTestUtil.java index 26b4a540..0a75b6fc 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeTestUtil.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime; +package edu.snu.nemo.runtime.common; -import org.apache.beam.sdk.values.KV; +import edu.snu.nemo.common.Pair; import java.util.*; import java.util.stream.Collectors; @@ -25,6 +25,13 @@ * Utility class for runtime unit tests. */ public final class RuntimeTestUtil { + + /** + * Private constructor for utility class. + */ + private RuntimeTestUtil() { + } + /** * Gets a list of integer pair elements in range. * @param start value of the range (inclusive). @@ -34,7 +41,7 @@ public static List getRangedNumList(final int start, final int end) { final List numList = new ArrayList<>(end - start); - IntStream.range(start, end).forEach(number -> numList.add(KV.of(number, number))); + IntStream.range(start, end).forEach(number -> numList.add(Pair.of(number, number))); return numList; } diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/message/local/LocalMessageTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java similarity index 99% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/common/message/local/LocalMessageTest.java rename to runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java index 24d86d9d..a05b7d2a 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/message/local/LocalMessageTest.java +++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.common.message.local; +package edu.snu.nemo.runtime.common.message.local; import edu.snu.nemo.runtime.common.message.MessageContext; import edu.snu.nemo.runtime.common.message.MessageEnvironment; diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java similarity index 93% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java rename to runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java index d7afe451..cd2fc169 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java +++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java @@ -13,11 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.common.optimizer.pass.runtime; +package edu.snu.nemo.runtime.common.optimizer.pass.runtime; -import edu.snu.nemo.common.Pair; import edu.snu.nemo.runtime.common.data.KeyRange; -import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass; import org.junit.Before; import org.junit.Test; diff --git a/runtime/executor/pom.xml b/runtime/executor/pom.xml index 578b080d..0997f8f5 100644 --- a/runtime/executor/pom.xml +++ b/runtime/executor/pom.xml @@ -57,5 +57,17 @@ limitations under the License. <artifactId>lz4-java</artifactId> <version>1.4.1</version> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>edu.snu.nemo</groupId> + <artifactId>nemo-runtime-master</artifactId> + <version>0.1-SNAPSHOT</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java similarity index 98% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java index d864819a..6bac9352 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java @@ -13,11 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.executor.data; +package edu.snu.nemo.runtime.executor.data; +import edu.snu.nemo.common.Pair; +import edu.snu.nemo.common.coder.IntCoder; +import edu.snu.nemo.common.coder.PairCoder; import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty; import edu.snu.nemo.conf.JobConf; -import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder; import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.data.HashRange; @@ -26,7 +28,6 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher; import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment; import edu.snu.nemo.runtime.common.state.BlockState; -import edu.snu.nemo.runtime.executor.data.*; import edu.snu.nemo.runtime.executor.data.block.Block; import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition; import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer; @@ -34,9 +35,6 @@ import edu.snu.nemo.runtime.executor.data.stores.*; import edu.snu.nemo.runtime.master.BlockManagerMaster; import edu.snu.nemo.runtime.master.RuntimeMaster; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.values.KV; import org.apache.commons.io.FileUtils; import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; @@ -59,7 +57,7 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -import static edu.snu.nemo.tests.runtime.RuntimeTestUtil.getRangedNumList; +import static edu.snu.nemo.runtime.common.RuntimeTestUtil.getRangedNumList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -73,7 +71,7 @@ @PrepareForTest({BlockManagerMaster.class, RuntimeMaster.class, SerializerManager.class}) public final class BlockStoreTest { private static final String TMP_FILE_DIRECTORY = "./tmpFiles"; - private static final Coder CODER = new BeamCoder(KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); + private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of()); private static final Serializer SERIALIZER = new Serializer(CODER, Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Compression.LZ4))); private static final SerializerManager serializerManager = mock(SerializerManager.class); @@ -575,7 +573,7 @@ private List getFixedKeyRangedNumList(final int key, final int start, final int end) { final List numList = new ArrayList<>(end - start); - IntStream.range(start, end).forEach(number -> numList.add(KV.of(key, number))); + IntStream.range(start, end).forEach(number -> numList.add(Pair.of(key, number))); return numList; } diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java similarity index 98% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java index bc1ba30c..eeb00315 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.executor.data; +package edu.snu.nemo.runtime.executor.data; import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.runtime.executor.data.BlockTransferConnectionQueue; diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java similarity index 98% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java index 6a9dbfb5..a0252cac 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java @@ -13,8 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.executor.datatransfer; +package edu.snu.nemo.runtime.executor.datatransfer; +import edu.snu.nemo.common.coder.IntCoder; +import edu.snu.nemo.common.coder.PairCoder; import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.edge.executionproperty.*; @@ -28,7 +30,6 @@ import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; -import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder; import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.message.MessageEnvironment; @@ -53,8 +54,6 @@ import edu.snu.nemo.runtime.master.resource.ContainerManager; import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; import edu.snu.nemo.runtime.master.scheduler.*; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.commons.io.FileUtils; import org.apache.reef.driver.evaluator.EvaluatorRequestor; import org.apache.reef.io.network.naming.NameResolverConfiguration; @@ -81,8 +80,8 @@ import java.util.stream.IntStream; import static edu.snu.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY; -import static edu.snu.nemo.tests.runtime.RuntimeTestUtil.flatten; -import static edu.snu.nemo.tests.runtime.RuntimeTestUtil.getRangedNumList; +import static edu.snu.nemo.runtime.common.RuntimeTestUtil.getRangedNumList; +import static edu.snu.nemo.runtime.common.RuntimeTestUtil.flatten; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -107,7 +106,7 @@ private static final int PARALLELISM_TEN = 10; private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)"; private static final AtomicInteger TEST_INDEX = new AtomicInteger(0); - private static final Coder CODER = new BeamCoder(KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); + private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of()); private static final Tang TANG = Tang.Factory.getTang(); private static final int HASH_RANGE_MULTIPLIER = 10; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services