http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java deleted file mode 100644 index 6dcee0b..0000000 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime.output; - -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; - -public class TezOutputEmitter<T> implements TezChannelSelector<T> { - - private final ShipStrategyType strategy; // the shipping strategy used by this output emitter - - private int[] channels; // the reused array defining target channels - - private int nextChannelToSendTo = 0; // counter to go over channels round robin - - private final TypeComparator<T> comparator; // the comparator for hashing / sorting - - // ------------------------------------------------------------------------ - // Constructors - // ------------------------------------------------------------------------ - - /** - * Creates a new channel selector that distributes data round robin. - */ - public TezOutputEmitter() { - this(ShipStrategyType.NONE); - } - - /** - * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...). - * - * @param strategy The distribution strategy to be used. - */ - public TezOutputEmitter(ShipStrategyType strategy) { - this(strategy, null); - } - - /** - * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...) - * and uses the supplied comparator to hash / compare records for partitioning them deterministically. - * - * @param strategy The distribution strategy to be used. - * @param comparator The comparator used to hash / compare the records. - */ - public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator) { - this(strategy, comparator, null); - } - - /** - * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...) - * and uses the supplied comparator to hash / compare records for partitioning them deterministically. - * - * @param strategy The distribution strategy to be used. - * @param comparator The comparator used to hash / compare the records. - * @param distr The distribution pattern used in the case of a range partitioning. - */ - public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, DataDistribution distr) { - if (strategy == null) { - throw new NullPointerException(); - } - - this.strategy = strategy; - this.comparator = comparator; - - switch (strategy) { - case FORWARD: - case PARTITION_HASH: - case PARTITION_RANGE: - case PARTITION_RANDOM: - case PARTITION_FORCED_REBALANCE: - case BROADCAST: - break; - default: - throw new IllegalArgumentException("Invalid shipping strategy for OutputEmitter: " + strategy.name()); - } - - if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == null) { - throw new NullPointerException("Data distribution must not be null when the ship strategy is range partitioning."); - } - } - - // ------------------------------------------------------------------------ - // Channel Selection - // ------------------------------------------------------------------------ - - @Override - public final int[] selectChannels(T record, int numberOfChannels) { - switch (strategy) { - case FORWARD: - case PARTITION_RANDOM: - case PARTITION_FORCED_REBALANCE: - return robin(numberOfChannels); - case PARTITION_HASH: - return hashPartitionDefault(record, numberOfChannels); - case PARTITION_RANGE: - return rangePartition(record, numberOfChannels); - case BROADCAST: - return broadcast(numberOfChannels); - default: - throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name()); - } - } - - // -------------------------------------------------------------------------------------------- - - private final int[] robin(int numberOfChannels) { - if (this.channels == null || this.channels.length != 1) { - this.channels = new int[1]; - } - - int nextChannel = nextChannelToSendTo + 1; - nextChannel = nextChannel < numberOfChannels ? nextChannel : 0; - - this.nextChannelToSendTo = nextChannel; - this.channels[0] = nextChannel; - return this.channels; - } - - private final int[] broadcast(int numberOfChannels) { - if (channels == null || channels.length != numberOfChannels) { - channels = new int[numberOfChannels]; - for (int i = 0; i < numberOfChannels; i++) { - channels[i] = i; - } - } - - return channels; - } - - private final int[] hashPartitionDefault(T record, int numberOfChannels) { - if (channels == null || channels.length != 1) { - channels = new int[1]; - } - - int hash = this.comparator.hash(record); - - hash = murmurHash(hash); - - if (hash >= 0) { - this.channels[0] = hash % numberOfChannels; - } - else if (hash != Integer.MIN_VALUE) { - this.channels[0] = -hash % numberOfChannels; - } - else { - this.channels[0] = 0; - } - - return this.channels; - } - - private final int murmurHash(int k) { - k *= 0xcc9e2d51; - k = Integer.rotateLeft(k, 15); - k *= 0x1b873593; - - k = Integer.rotateLeft(k, 13); - k *= 0xe6546b64; - - k ^= 4; - k ^= k >>> 16; - k *= 0x85ebca6b; - k ^= k >>> 13; - k *= 0xc2b2ae35; - k ^= k >>> 16; - - return k; - } - - private final int[] rangePartition(T record, int numberOfChannels) { - throw new UnsupportedOperationException(); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java deleted file mode 100644 index 39d247c..0000000 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.util; - - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -public class DummyInvokable extends AbstractInvokable { - - private ExecutionConfig executionConfig; - - public DummyInvokable() { - } - - public DummyInvokable(ExecutionConfig executionConfig) { - this.executionConfig = executionConfig; - } - - public void setExecutionConfig(ExecutionConfig executionConfig) { - this.executionConfig = executionConfig; - } - - @Override - public void registerInputOutput() {} - - - @Override - public void invoke() throws Exception {} - - @Override - public ExecutionConfig getExecutionConfig() { - return executionConfig; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java deleted file mode 100644 index 202cb24..0000000 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.util; - -import org.apache.flink.util.InstantiationUtil; -import org.apache.commons.codec.binary.Base64; - -import java.io.IOException; - -public class EncodingUtils { - - public static Object decodeObjectFromString(String encoded, ClassLoader cl) { - - try { - if (encoded == null) { - return null; - } - byte[] bytes = Base64.decodeBase64(encoded); - - return InstantiationUtil.deserializeObject(bytes, cl); - } - catch (IOException e) { - e.printStackTrace(); - System.exit(-1); - throw new RuntimeException(); - } - catch (ClassNotFoundException e) { - e.printStackTrace(); - System.exit(-1); - throw new RuntimeException(); - } - } - - public static String encodeObjectToString(Object o) { - - try { - byte[] bytes = InstantiationUtil.serializeObject(o); - - String encoded = Base64.encodeBase64String(bytes); - return encoded; - } - catch (IOException e) { - e.printStackTrace(); - System.exit(-1); - throw new RuntimeException(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java deleted file mode 100644 index 07c5f97..0000000 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.util; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.Serializer; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -public class FlinkSerialization<T> extends Configured implements Serialization<T>{ - - @Override - public boolean accept(Class<?> c) { - TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader()); - T instance = typeSerializer.createInstance(); - return instance.getClass().isAssignableFrom(c); - } - - @Override - public Serializer<T> getSerializer(Class<T> c) { - TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader()); - return new FlinkSerializer<T>(typeSerializer); - } - - @Override - public Deserializer<T> getDeserializer(Class<T> c) { - TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader()); - return new FlinkDeserializer<T>(typeSerializer); - } - - public static class FlinkSerializer<T> implements Serializer<T> { - - private OutputStream dataOut; - private DataOutputViewOutputStreamWrapper dataOutputView; - private TypeSerializer<T> typeSerializer; - - public FlinkSerializer(TypeSerializer<T> typeSerializer) { - this.typeSerializer = typeSerializer; - } - - @Override - public void open(OutputStream out) throws IOException { - this.dataOut = out; - this.dataOutputView = new DataOutputViewOutputStreamWrapper(out); - } - - @Override - public void serialize(T t) throws IOException { - typeSerializer.serialize(t, dataOutputView); - } - - @Override - public void close() throws IOException { - this.dataOut.close(); - } - } - - public static class FlinkDeserializer<T> implements Deserializer<T> { - - private InputStream dataIn; - private TypeSerializer<T> typeSerializer; - private DataInputViewInputStreamWrapper dataInputView; - - public FlinkDeserializer(TypeSerializer<T> typeSerializer) { - this.typeSerializer = typeSerializer; - } - - @Override - public void open(InputStream in) throws IOException { - this.dataIn = in; - this.dataInputView = new DataInputViewInputStreamWrapper(in); - } - - @Override - public T deserialize(T t) throws IOException { - T reuse = t; - if (reuse == null) { - reuse = typeSerializer.createInstance(); - } - return typeSerializer.deserialize(reuse, dataInputView); - } - - @Override - public void close() throws IOException { - this.dataIn.close(); - } - } - - private static final class DataOutputViewOutputStreamWrapper implements DataOutputView { - - private final DataOutputStream dos; - - public DataOutputViewOutputStreamWrapper(OutputStream output) { - this.dos = new DataOutputStream(output); - } - - @Override - public void write(int b) throws IOException { - dos.write(b); - } - - @Override - public void write(byte[] b) throws IOException { - dos.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - dos.write(b, off, len); - } - - @Override - public void writeBoolean(boolean v) throws IOException { - dos.writeBoolean(v); - } - - @Override - public void writeByte(int v) throws IOException { - dos.writeByte(v); - } - - @Override - public void writeShort(int v) throws IOException { - dos.writeShort(v); - } - - @Override - public void writeChar(int v) throws IOException { - dos.writeChar(v); - } - - @Override - public void writeInt(int v) throws IOException { - dos.writeInt(v); - } - - @Override - public void writeLong(long v) throws IOException { - dos.writeLong(v); - } - - @Override - public void writeFloat(float v) throws IOException { - dos.writeFloat(v); - } - - @Override - public void writeDouble(double v) throws IOException { - dos.writeDouble(v); - } - - @Override - public void writeBytes(String s) throws IOException { - dos.writeBytes(s); - } - - @Override - public void writeChars(String s) throws IOException { - dos.writeChars(s); - } - - @Override - public void writeUTF(String s) throws IOException { - dos.writeUTF(s); - } - - @Override - public void skipBytesToWrite(int num) throws IOException { - for (int i = 0; i < num; i++) { - dos.write(0); - } - } - - @Override - public void write(DataInputView inview, int num) throws IOException { - for (int i = 0; i < num; i++) { - dos.write(inview.readByte()); - } - } - } - - private static final class DataInputViewInputStreamWrapper implements DataInputView { - - private final DataInputStream dis; - - - public DataInputViewInputStreamWrapper(InputStream input) { - this.dis = new DataInputStream(input); - } - - @Override - public void readFully(byte[] b) throws IOException { - dis.readFully(b); - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - dis.readFully(b, off, len); - } - - @Override - public int skipBytes(int n) throws IOException { - return dis.skipBytes(n); - } - - @Override - public boolean readBoolean() throws IOException { - return dis.readBoolean(); - } - - @Override - public byte readByte() throws IOException { - return dis.readByte(); - } - - @Override - public int readUnsignedByte() throws IOException { - return dis.readUnsignedByte(); - } - - @Override - public short readShort() throws IOException { - return dis.readShort(); - } - - @Override - public int readUnsignedShort() throws IOException { - return dis.readUnsignedShort(); - } - - @Override - public char readChar() throws IOException { - return dis.readChar(); - } - - @Override - public int readInt() throws IOException { - return dis.readInt(); - } - - @Override - public long readLong() throws IOException { - return dis.readLong(); - } - - @Override - public float readFloat() throws IOException { - return dis.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return dis.readDouble(); - } - - @Override - public String readLine() throws IOException { - return dis.readLine(); - } - - @Override - public String readUTF() throws IOException { - return dis.readUTF(); - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - while (numBytes > 0) { - numBytes -= dis.skipBytes(numBytes); - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - dis.readFully(b, off, len); - return len; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/resources/log4j.properties b/flink-staging/flink-tez/src/main/resources/log4j.properties deleted file mode 100644 index 0845c81..0000000 --- a/flink-staging/flink-tez/src/main/resources/log4j.properties +++ /dev/null @@ -1,30 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to OFF to not flood build logs -# set manually to INFO for debugging purposes -log4j.rootLogger=INFO, testlogger - -# A1 is set to be a ConsoleAppender. -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java deleted file mode 100644 index 9124faa..0000000 --- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - -import org.apache.flink.test.testdata.ConnectedComponentsData; -import org.apache.flink.tez.examples.ConnectedComponentsStep; -import org.junit.Assert; - -import java.io.BufferedReader; -import java.io.IOException; -import java.util.regex.Pattern; - -/* - * Note: This does not test whether the program computes one step of the - * Weakly Connected Components program correctly. It only tests whether - * the program assigns a wrong component to a vertex. - */ - -public class ConnectedComponentsStepITCase extends TezProgramTestBase { - - private static final long SEED = 0xBADC0FFEEBEEFL; - - private static final int NUM_VERTICES = 1000; - - private static final int NUM_EDGES = 10000; - - - private String verticesPath; - private String edgesPath; - private String resultPath; - - - @Override - protected void preSubmit() throws Exception { - verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES)); - edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED)); - resultPath = getTempFilePath("results"); - } - - @Override - protected void testProgram() throws Exception { - ConnectedComponentsStep.main(verticesPath, edgesPath, resultPath, "100"); - } - - @Override - protected void postSubmit() throws Exception { - for (BufferedReader reader : getResultReader(resultPath)) { - checkOddEvenResult(reader); - } - } - - private static void checkOddEvenResult(BufferedReader result) throws IOException { - Pattern split = Pattern.compile(" "); - String line; - while ((line = result.readLine()) != null) { - String[] res = split.split(line); - Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length); - try { - int vertex = Integer.parseInt(res[0]); - int component = Integer.parseInt(res[1]); - Assert.assertTrue(((vertex % 2) == (component % 2))); - } catch (NumberFormatException e) { - Assert.fail("Malformed result."); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java deleted file mode 100644 index 9a203fe..0000000 --- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - -import org.apache.flink.test.testdata.PageRankData; -import org.apache.flink.tez.examples.PageRankBasicStep; - -public class PageRankBasicStepITCase extends TezProgramTestBase { - - private String verticesPath; - private String edgesPath; - private String resultPath; - private String expectedResult; - - public static final String RANKS_AFTER_1_ITERATION = "1 0.2\n" + - "2 0.25666666666666665\n" + - "3 0.1716666666666667\n" + - "4 0.1716666666666667\n" + - "5 0.2"; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); - edgesPath = createTempFile("edges.txt", PageRankData.EDGES); - } - - @Override - protected void testProgram() throws Exception { - PageRankBasicStep.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "-1"}); - expectedResult = RANKS_AFTER_1_ITERATION; - } - - @Override - protected void postSubmit() throws Exception { - compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java deleted file mode 100644 index eda9d1a..0000000 --- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.tez.client.LocalTezEnvironment; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; - -public abstract class TezProgramTestBase extends AbstractTestBase { - - private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4; - - private JobExecutionResult latestExecutionResult; - - private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM; - - - public TezProgramTestBase() { - this(new Configuration()); - } - - public TezProgramTestBase(Configuration config) { - super (config); - } - - - public void setParallelism(int degreeOfParallelism) { - this.degreeOfParallelism = degreeOfParallelism; - } - - public JobExecutionResult getLatestExecutionResult() { - return this.latestExecutionResult; - } - - - protected abstract void testProgram() throws Exception; - - protected void preSubmit() throws Exception {} - - protected void postSubmit() throws Exception {} - - // -------------------------------------------------------------------------------------------- - // Test entry point - // -------------------------------------------------------------------------------------------- - - // Ignored due to deadlocks in Tez 0.6.1 (https://s3.amazonaws.com/archive.travis-ci.org/jobs/67848151/log.txt) - // TODO Reactivate with future Tez versions - @Ignore - @Test - public void testJob() throws Exception { - // pre-submit - try { - preSubmit(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail("Pre-submit work caused an error: " + e.getMessage()); - } - - // prepare the test environment - LocalTezEnvironment env = LocalTezEnvironment.create(); - env.setParallelism(degreeOfParallelism); - env.setAsContext(); - - // call the test program - try { - testProgram(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail("Error while calling the test program: " + e.getMessage()); - } - - // post-submit - try { - postSubmit(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail("Post-submit work caused an error: " + e.getMessage()); - } - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java deleted file mode 100644 index 35aa54a..0000000 --- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - - -import org.apache.flink.examples.java.relational.WebLogAnalysis; -import org.apache.flink.test.testdata.WebLogAnalysisData; - -public class WebLogAnalysisITCase extends TezProgramTestBase { - - private String docsPath; - private String ranksPath; - private String visitsPath; - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - docsPath = createTempFile("docs", WebLogAnalysisData.DOCS); - ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS); - visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS); - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath); - } - @Override - protected void testProgram() throws Exception { - WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, resultPath}); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java deleted file mode 100644 index d73aa8b..0000000 --- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - -import org.apache.flink.examples.java.wordcount.WordCount; -import org.apache.flink.test.testdata.WordCountData; - -public class WordCountITCase extends TezProgramTestBase { - - protected String textPath; - protected String resultPath; - - public WordCountITCase(){ - } - - @Override - protected void preSubmit() throws Exception { - textPath = createTempFile("text.txt", WordCountData.TEXT); - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath); - } - - @Override - protected void testProgram() throws Exception { - WordCount.main(new String[]{textPath, resultPath}); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/test/resources/log4j-test.properties b/flink-staging/flink-tez/src/test/resources/log4j-test.properties deleted file mode 100644 index 0845c81..0000000 --- a/flink-staging/flink-tez/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,30 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to OFF to not flood build logs -# set manually to INFO for debugging purposes -log4j.rootLogger=INFO, testlogger - -# A1 is set to be a ConsoleAppender. -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/test/resources/logback-test.xml b/flink-staging/flink-tez/src/test/resources/logback-test.xml deleted file mode 100644 index 48e4374..0000000 --- a/flink-staging/flink-tez/src/test/resources/logback-test.xml +++ /dev/null @@ -1,37 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{60} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - - <!--<logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>--> - <!--<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>--> - <!--<logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>--> - <!--<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>--> - <!--<logger name="org.apache.flink.runtime.taskmanager.TaskManager" level="OFF"/>--> - <!--<logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" level="OFF"/>--> - <!--<logger name="org.apache.flink.runtime.jobmanager.EventCollector" level="OFF"/>--> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml deleted file mode 100644 index 535c910..0000000 --- a/flink-staging/pom.xml +++ /dev/null @@ -1,72 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-parent</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - - <artifactId>flink-staging</artifactId> - <name>flink-staging</name> - <packaging>pom</packaging> - - <modules> - <module>flink-avro</module> - <module>flink-jdbc</module> - <module>flink-hadoop-compatibility</module> - <module>flink-hbase</module> - <module>flink-hcatalog</module> - <module>flink-table</module> - <module>flink-ml</module> - <module>flink-scala-shell</module> - </modules> - - <!-- See main pom.xml for explanation of profiles --> - <profiles> - <profile> - <id>hadoop-2</id> - <activation> - <property> - <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh --> - <!--hadoop2--><name>!hadoop.profile</name> - </property> - </activation> - <modules> - <!-- Include the flink-fs-tests project only for HD2. - The HDFS minicluster interfaces changed between the two versions. - --> - <module>flink-fs-tests</module> - </modules> - </profile> - <profile> - <id>include-tez</id> - <modules> - <module>flink-tez</module> - </modules> - </profile> - </profiles> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6cba956..e503a11 100644 --- a/pom.xml +++ b/pom.xml @@ -62,12 +62,13 @@ under the License. <module>flink-streaming-java</module> <module>flink-streaming-scala</module> <module>flink-streaming-connectors</module> + <module>flink-batch-connectors</module> <module>flink-examples</module> <module>flink-clients</module> <module>flink-tests</module> <module>flink-test-utils</module> - <module>flink-staging</module> <module>flink-libraries</module> + <module>flink-scala-shell</module> <module>flink-quickstart</module> <module>flink-contrib</module> <module>flink-dist</module> @@ -428,6 +429,10 @@ under the License. </properties> <modules> <module>flink-yarn</module> + <!-- Include the flink-fs-tests project only for HD2. + The HDFS minicluster interfaces changed between the two versions. + --> + <module>flink-fs-tests</module> </modules> </profile> @@ -802,13 +807,13 @@ under the License. <!-- Test Data. --> <exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude> - <exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude> + <exclude>flink-batch-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude> <exclude>out/test/flink-avro/avro/user.avsc</exclude> - <exclude>flink-staging/flink-table/src/test/scala/resources/*.out</exclude> + <exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude> <!-- TweetInputFormat Test Data--> <exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude> - <exclude>flink-staging/flink-avro/src/test/resources/testdata.avro</exclude> - <exclude>flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude> + <exclude>flink-batch-connectors/flink-avro/src/test/resources/testdata.avro</exclude> + <exclude>flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude> <exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv</exclude> <exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text</exclude> <!-- Configuration Files. -->