http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
deleted file mode 100644
index 6dcee0b..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-public class TezOutputEmitter<T> implements TezChannelSelector<T> {
-
-       private final ShipStrategyType strategy;                // the shipping 
strategy used by this output emitter
-
-       private int[] channels;                                         // the 
reused array defining target channels
-
-       private int nextChannelToSendTo = 0;            // counter to go over 
channels round robin
-
-       private final TypeComparator<T> comparator;     // the comparator for 
hashing / sorting
-
-       // 
------------------------------------------------------------------------
-       // Constructors
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new channel selector that distributes data round robin.
-        */
-       public TezOutputEmitter() {
-               this(ShipStrategyType.NONE);
-       }
-
-       /**
-        * Creates a new channel selector that uses the given strategy 
(broadcasting, partitioning, ...).
-        *
-        * @param strategy The distribution strategy to be used.
-        */
-       public TezOutputEmitter(ShipStrategyType strategy) {
-               this(strategy, null);
-       }
-
-       /**
-        * Creates a new channel selector that uses the given strategy 
(broadcasting, partitioning, ...)
-        * and uses the supplied comparator to hash / compare records for 
partitioning them deterministically.
-        *
-        * @param strategy The distribution strategy to be used.
-        * @param comparator The comparator used to hash / compare the records.
-        */
-       public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> 
comparator) {
-               this(strategy, comparator, null);
-       }
-
-       /**
-        * Creates a new channel selector that uses the given strategy 
(broadcasting, partitioning, ...)
-        * and uses the supplied comparator to hash / compare records for 
partitioning them deterministically.
-        *
-        * @param strategy The distribution strategy to be used.
-        * @param comparator The comparator used to hash / compare the records.
-        * @param distr The distribution pattern used in the case of a range 
partitioning.
-        */
-       public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> 
comparator, DataDistribution distr) {
-               if (strategy == null) {
-                       throw new NullPointerException();
-               }
-
-               this.strategy = strategy;
-               this.comparator = comparator;
-
-               switch (strategy) {
-                       case FORWARD:
-                       case PARTITION_HASH:
-                       case PARTITION_RANGE:
-                       case PARTITION_RANDOM:
-                       case PARTITION_FORCED_REBALANCE:
-                       case BROADCAST:
-                               break;
-                       default:
-                               throw new IllegalArgumentException("Invalid 
shipping strategy for OutputEmitter: " + strategy.name());
-               }
-
-               if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == 
null) {
-                       throw new NullPointerException("Data distribution must 
not be null when the ship strategy is range partitioning.");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // Channel Selection
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public final int[] selectChannels(T record, int numberOfChannels) {
-               switch (strategy) {
-                       case FORWARD:
-                       case PARTITION_RANDOM:
-                       case PARTITION_FORCED_REBALANCE:
-                               return robin(numberOfChannels);
-                       case PARTITION_HASH:
-                               return hashPartitionDefault(record, 
numberOfChannels);
-                       case PARTITION_RANGE:
-                               return rangePartition(record, numberOfChannels);
-                       case BROADCAST:
-                               return broadcast(numberOfChannels);
-                       default:
-                               throw new 
UnsupportedOperationException("Unsupported distribution strategy: " + 
strategy.name());
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       private final int[] robin(int numberOfChannels) {
-               if (this.channels == null || this.channels.length != 1) {
-                       this.channels = new int[1];
-               }
-
-               int nextChannel = nextChannelToSendTo + 1;
-               nextChannel = nextChannel < numberOfChannels ? nextChannel : 0;
-
-               this.nextChannelToSendTo = nextChannel;
-               this.channels[0] = nextChannel;
-               return this.channels;
-       }
-
-       private final int[] broadcast(int numberOfChannels) {
-               if (channels == null || channels.length != numberOfChannels) {
-                       channels = new int[numberOfChannels];
-                       for (int i = 0; i < numberOfChannels; i++) {
-                               channels[i] = i;
-                       }
-               }
-
-               return channels;
-       }
-
-       private final int[] hashPartitionDefault(T record, int 
numberOfChannels) {
-               if (channels == null || channels.length != 1) {
-                       channels = new int[1];
-               }
-
-               int hash = this.comparator.hash(record);
-
-               hash = murmurHash(hash);
-
-               if (hash >= 0) {
-                       this.channels[0] = hash % numberOfChannels;
-               }
-               else if (hash != Integer.MIN_VALUE) {
-                       this.channels[0] = -hash % numberOfChannels;
-               }
-               else {
-                       this.channels[0] = 0;
-               }
-
-               return this.channels;
-       }
-
-       private final int murmurHash(int k) {
-               k *= 0xcc9e2d51;
-               k = Integer.rotateLeft(k, 15);
-               k *= 0x1b873593;
-
-               k = Integer.rotateLeft(k, 13);
-               k *= 0xe6546b64;
-
-               k ^= 4;
-               k ^= k >>> 16;
-               k *= 0x85ebca6b;
-               k ^= k >>> 13;
-               k *= 0xc2b2ae35;
-               k ^= k >>> 16;
-
-               return k;
-       }
-
-       private final int[] rangePartition(T record, int numberOfChannels) {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
deleted file mode 100644
index 39d247c..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class DummyInvokable extends AbstractInvokable {
-
-       private ExecutionConfig executionConfig;
-
-       public DummyInvokable() {
-       }
-
-       public DummyInvokable(ExecutionConfig executionConfig) {
-               this.executionConfig = executionConfig;
-       }
-
-       public void setExecutionConfig(ExecutionConfig executionConfig) {
-               this.executionConfig = executionConfig;
-       }
-
-       @Override
-       public void registerInputOutput() {}
-
-
-       @Override
-       public void invoke() throws Exception {}
-
-       @Override
-       public ExecutionConfig getExecutionConfig() {
-               return executionConfig;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
deleted file mode 100644
index 202cb24..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.commons.codec.binary.Base64;
-
-import java.io.IOException;
-
-public class EncodingUtils {
-
-       public static Object decodeObjectFromString(String encoded, ClassLoader 
cl) {
-
-               try {
-                       if (encoded == null) {
-                               return null;
-                       }
-                       byte[] bytes = Base64.decodeBase64(encoded);
-
-                       return InstantiationUtil.deserializeObject(bytes, cl);
-               }
-               catch (IOException e) {
-                       e.printStackTrace();
-                       System.exit(-1);
-                       throw new RuntimeException();
-               }
-               catch (ClassNotFoundException e) {
-                       e.printStackTrace();
-                       System.exit(-1);
-                       throw new RuntimeException();
-               }
-       }
-
-       public static String encodeObjectToString(Object o) {
-
-               try {
-                       byte[] bytes = InstantiationUtil.serializeObject(o);
-
-                       String encoded = Base64.encodeBase64String(bytes);
-                       return encoded;
-               }
-               catch (IOException e) {
-                       e.printStackTrace();
-                       System.exit(-1);
-                       throw new RuntimeException();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
deleted file mode 100644
index 07c5f97..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class FlinkSerialization<T> extends Configured implements 
Serialization<T>{
-
-       @Override
-       public boolean accept(Class<?> c) {
-               TypeSerializer<T> typeSerializer = (TypeSerializer) 
EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
 getClass().getClassLoader());
-               T instance = typeSerializer.createInstance();
-               return instance.getClass().isAssignableFrom(c);
-       }
-
-       @Override
-       public Serializer<T> getSerializer(Class<T> c) {
-               TypeSerializer<T> typeSerializer = (TypeSerializer) 
EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
 getClass().getClassLoader());
-               return new FlinkSerializer<T>(typeSerializer);
-       }
-
-       @Override
-       public Deserializer<T> getDeserializer(Class<T> c) {
-               TypeSerializer<T> typeSerializer = (TypeSerializer) 
EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
 getClass().getClassLoader());
-               return new FlinkDeserializer<T>(typeSerializer);
-       }
-
-       public static class FlinkSerializer<T> implements Serializer<T> {
-
-               private OutputStream dataOut;
-               private DataOutputViewOutputStreamWrapper dataOutputView;
-               private TypeSerializer<T> typeSerializer;
-
-               public FlinkSerializer(TypeSerializer<T> typeSerializer) {
-                       this.typeSerializer = typeSerializer;
-               }
-
-               @Override
-               public void open(OutputStream out) throws IOException {
-                       this.dataOut = out;
-                       this.dataOutputView = new 
DataOutputViewOutputStreamWrapper(out);
-               }
-
-               @Override
-               public void serialize(T t) throws IOException {
-                       typeSerializer.serialize(t, dataOutputView);
-               }
-
-               @Override
-               public void close() throws IOException {
-                       this.dataOut.close();
-               }
-       }
-
-       public static class FlinkDeserializer<T> implements Deserializer<T> {
-
-               private InputStream dataIn;
-               private TypeSerializer<T> typeSerializer;
-               private DataInputViewInputStreamWrapper dataInputView;
-
-               public FlinkDeserializer(TypeSerializer<T> typeSerializer) {
-                       this.typeSerializer = typeSerializer;
-               }
-
-               @Override
-               public void open(InputStream in) throws IOException {
-                       this.dataIn = in;
-                       this.dataInputView = new 
DataInputViewInputStreamWrapper(in);
-               }
-
-               @Override
-               public T deserialize(T t) throws IOException {
-                       T reuse = t;
-                       if (reuse == null) {
-                               reuse = typeSerializer.createInstance();
-                       }
-                       return typeSerializer.deserialize(reuse, dataInputView);
-               }
-
-               @Override
-               public void close() throws IOException {
-                       this.dataIn.close();
-               }
-       }
-
-       private static final class DataOutputViewOutputStreamWrapper implements 
DataOutputView {
-
-               private final DataOutputStream dos;
-
-               public DataOutputViewOutputStreamWrapper(OutputStream output) {
-                       this.dos = new DataOutputStream(output);
-               }
-
-               @Override
-               public void write(int b) throws IOException {
-                       dos.write(b);
-               }
-
-               @Override
-               public void write(byte[] b) throws IOException {
-                       dos.write(b);
-               }
-
-               @Override
-               public void write(byte[] b, int off, int len) throws 
IOException {
-                       dos.write(b, off, len);
-               }
-
-               @Override
-               public void writeBoolean(boolean v) throws IOException {
-                       dos.writeBoolean(v);
-               }
-
-               @Override
-               public void writeByte(int v) throws IOException {
-                       dos.writeByte(v);
-               }
-
-               @Override
-               public void writeShort(int v) throws IOException {
-                       dos.writeShort(v);
-               }
-
-               @Override
-               public void writeChar(int v) throws IOException {
-                       dos.writeChar(v);
-               }
-
-               @Override
-               public void writeInt(int v) throws IOException {
-                       dos.writeInt(v);
-               }
-
-               @Override
-               public void writeLong(long v) throws IOException {
-                       dos.writeLong(v);
-               }
-
-               @Override
-               public void writeFloat(float v) throws IOException {
-                       dos.writeFloat(v);
-               }
-
-               @Override
-               public void writeDouble(double v) throws IOException {
-                       dos.writeDouble(v);
-               }
-
-               @Override
-               public void writeBytes(String s) throws IOException {
-                       dos.writeBytes(s);
-               }
-
-               @Override
-               public void writeChars(String s) throws IOException {
-                       dos.writeChars(s);
-               }
-
-               @Override
-               public void writeUTF(String s) throws IOException {
-                       dos.writeUTF(s);
-               }
-
-               @Override
-               public void skipBytesToWrite(int num) throws IOException {
-                       for (int i = 0; i < num; i++) {
-                               dos.write(0);
-                       }
-               }
-
-               @Override
-               public void write(DataInputView inview, int num) throws 
IOException {
-                       for (int i = 0; i < num; i++) {
-                               dos.write(inview.readByte());
-                       }
-               }
-       }
-
-       private static final class DataInputViewInputStreamWrapper implements 
DataInputView {
-
-               private final DataInputStream dis;
-
-
-               public DataInputViewInputStreamWrapper(InputStream input) {
-                       this.dis = new DataInputStream(input);
-               }
-
-               @Override
-               public void readFully(byte[] b) throws IOException {
-                       dis.readFully(b);
-               }
-
-               @Override
-               public void readFully(byte[] b, int off, int len) throws 
IOException {
-                       dis.readFully(b, off, len);
-               }
-
-               @Override
-               public int skipBytes(int n) throws IOException {
-                       return dis.skipBytes(n);
-               }
-
-               @Override
-               public boolean readBoolean() throws IOException {
-                       return dis.readBoolean();
-               }
-
-               @Override
-               public byte readByte() throws IOException {
-                       return dis.readByte();
-               }
-
-               @Override
-               public int readUnsignedByte() throws IOException {
-                       return dis.readUnsignedByte();
-               }
-
-               @Override
-               public short readShort() throws IOException {
-                       return dis.readShort();
-               }
-
-               @Override
-               public int readUnsignedShort() throws IOException {
-                       return dis.readUnsignedShort();
-               }
-
-               @Override
-               public char readChar() throws IOException {
-                       return dis.readChar();
-               }
-
-               @Override
-               public int readInt() throws IOException {
-                       return dis.readInt();
-               }
-
-               @Override
-               public long readLong() throws IOException {
-                       return dis.readLong();
-               }
-
-               @Override
-               public float readFloat() throws IOException {
-                       return dis.readFloat();
-               }
-
-               @Override
-               public double readDouble() throws IOException {
-                       return dis.readDouble();
-               }
-
-               @Override
-               public String readLine() throws IOException {
-                       return dis.readLine();
-               }
-
-               @Override
-               public String readUTF() throws IOException {
-                       return dis.readUTF();
-               }
-
-               @Override
-               public void skipBytesToRead(int numBytes) throws IOException {
-                       while (numBytes > 0) {
-                               numBytes -= dis.skipBytes(numBytes);
-                       }
-               }
-
-               @Override
-               public int read(byte[] b, int off, int len) throws IOException {
-                       dis.readFully(b, off, len);
-                       return len;
-               }
-
-               @Override
-               public int read(byte[] b) throws IOException {
-                       return read(b, 0, b.length);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/resources/log4j.properties 
b/flink-staging/flink-tez/src/main/resources/log4j.properties
deleted file mode 100644
index 0845c81..0000000
--- a/flink-staging/flink-tez/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=INFO, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
 
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
deleted file mode 100644
index 9124faa..0000000
--- 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.tez.examples.ConnectedComponentsStep;
-import org.junit.Assert;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.util.regex.Pattern;
-
-/*
- * Note: This does not test whether the program computes one step of the
- * Weakly Connected Components program correctly. It only tests whether
- * the program assigns a wrong component to a vertex.
- */
-
-public class ConnectedComponentsStepITCase extends TezProgramTestBase {
-
-    private static final long SEED = 0xBADC0FFEEBEEFL;
-
-    private static final int NUM_VERTICES = 1000;
-
-    private static final int NUM_EDGES = 10000;
-
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-
-
-    @Override
-    protected void preSubmit() throws Exception {
-        verticesPath = createTempFile("vertices.txt", 
ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
-        edgesPath = createTempFile("edges.txt", 
ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
-        resultPath = getTempFilePath("results");
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        ConnectedComponentsStep.main(verticesPath, edgesPath, resultPath, 
"100");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        for (BufferedReader reader : getResultReader(resultPath)) {
-            checkOddEvenResult(reader);
-        }
-    }
-
-    private static void checkOddEvenResult(BufferedReader result) throws 
IOException {
-        Pattern split = Pattern.compile(" ");
-        String line;
-        while ((line = result.readLine()) != null) {
-            String[] res = split.split(line);
-            Assert.assertEquals("Malformed result: Wrong number of tokens in 
line.", 2, res.length);
-            try {
-                int vertex = Integer.parseInt(res[0]);
-                int component = Integer.parseInt(res[1]);
-                Assert.assertTrue(((vertex % 2) == (component % 2)));
-            } catch (NumberFormatException e) {
-                Assert.fail("Malformed result.");
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
 
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
deleted file mode 100644
index 9a203fe..0000000
--- 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.tez.examples.PageRankBasicStep;
-
-public class PageRankBasicStepITCase extends TezProgramTestBase {
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-    private String expectedResult;
-
-    public static final String RANKS_AFTER_1_ITERATION = "1 0.2\n" +
-            "2 0.25666666666666665\n" +
-            "3 0.1716666666666667\n" +
-            "4 0.1716666666666667\n" +
-            "5 0.2";
-
-    @Override
-    protected void preSubmit() throws Exception {
-        resultPath = getTempDirPath("result");
-        verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
-        edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        PageRankBasicStep.main(new String[]{verticesPath, edgesPath, 
resultPath, PageRankData.NUM_VERTICES+"", "-1"});
-        expectedResult = RANKS_AFTER_1_ITERATION;
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
 
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
deleted file mode 100644
index eda9d1a..0000000
--- 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.tez.client.LocalTezEnvironment;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public abstract class TezProgramTestBase extends AbstractTestBase {
-
-    private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
-
-    private JobExecutionResult latestExecutionResult;
-
-    private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
-
-
-    public TezProgramTestBase() {
-        this(new Configuration());
-    }
-
-    public TezProgramTestBase(Configuration config) {
-        super (config);
-    }
-
-
-    public void setParallelism(int degreeOfParallelism) {
-        this.degreeOfParallelism = degreeOfParallelism;
-    }
-
-    public JobExecutionResult getLatestExecutionResult() {
-        return this.latestExecutionResult;
-    }
-
-
-    protected abstract void testProgram() throws Exception;
-
-    protected void preSubmit() throws Exception {}
-
-    protected void postSubmit() throws Exception {}
-
-    // 
--------------------------------------------------------------------------------------------
-    //  Test entry point
-    // 
--------------------------------------------------------------------------------------------
-
-    // Ignored due to deadlocks in Tez 0.6.1 
(https://s3.amazonaws.com/archive.travis-ci.org/jobs/67848151/log.txt)
-    // TODO Reactivate with future Tez versions
-    @Ignore
-    @Test
-    public void testJob() throws Exception {
-        // pre-submit
-        try {
-            preSubmit();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Pre-submit work caused an error: " + e.getMessage());
-        }
-
-        // prepare the test environment
-        LocalTezEnvironment env = LocalTezEnvironment.create();
-        env.setParallelism(degreeOfParallelism);
-        env.setAsContext();
-
-        // call the test program
-        try {
-            testProgram();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Error while calling the test program: " + 
e.getMessage());
-        }
-
-        // post-submit
-        try {
-            postSubmit();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Post-submit work caused an error: " + e.getMessage());
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
 
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
deleted file mode 100644
index 35aa54a..0000000
--- 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-
-import org.apache.flink.examples.java.relational.WebLogAnalysis;
-import org.apache.flink.test.testdata.WebLogAnalysisData;
-
-public class WebLogAnalysisITCase extends TezProgramTestBase {
-
-    private String docsPath;
-    private String ranksPath;
-    private String visitsPath;
-    private String resultPath;
-
-    @Override
-    protected void preSubmit() throws Exception {
-        docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
-        ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
-        visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, 
resultPath);
-    }
-    @Override
-    protected void testProgram() throws Exception {
-        WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, 
resultPath});
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
 
b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
deleted file mode 100644
index d73aa8b..0000000
--- 
a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class WordCountITCase extends TezProgramTestBase {
-
-    protected String textPath;
-    protected String resultPath;
-
-    public WordCountITCase(){
-    }
-
-    @Override
-    protected void preSubmit() throws Exception {
-        textPath = createTempFile("text.txt", WordCountData.TEXT);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        WordCount.main(new String[]{textPath, resultPath});
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/resources/log4j-test.properties 
b/flink-staging/flink-tez/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0845c81..0000000
--- a/flink-staging/flink-tez/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=INFO, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/resources/logback-test.xml 
b/flink-staging/flink-tez/src/test/resources/logback-test.xml
deleted file mode 100644
index 48e4374..0000000
--- a/flink-staging/flink-tez/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{60} - 
%msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-
-    <!--<logger name="org.apache.flink.runtime.operators.BatchTask" 
level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.client.JobClient" 
level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.taskmanager.Task" 
level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.jobmanager.JobManager" 
level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.taskmanager.TaskManager" 
level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" 
level="OFF"/>-->
-    <!--<logger name="org.apache.flink.runtime.jobmanager.EventCollector" 
level="OFF"/>-->
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
deleted file mode 100644
index 535c910..0000000
--- a/flink-staging/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-    <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-parent</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-
-       <artifactId>flink-staging</artifactId>
-       <name>flink-staging</name>
-       <packaging>pom</packaging>
-
-       <modules>
-               <module>flink-avro</module>
-               <module>flink-jdbc</module>
-               <module>flink-hadoop-compatibility</module>
-               <module>flink-hbase</module>
-               <module>flink-hcatalog</module>
-               <module>flink-table</module>
-               <module>flink-ml</module>
-               <module>flink-scala-shell</module>
-       </modules>
-       
-       <!-- See main pom.xml for explanation of profiles -->
-       <profiles>
-               <profile>
-                       <id>hadoop-2</id>
-                       <activation>
-                               <property>
-                                       <!-- Please do not remove the 'hadoop2' 
comment. See ./tools/generate_specific_pom.sh -->
-                                       
<!--hadoop2--><name>!hadoop.profile</name>
-                               </property>
-                       </activation>
-                       <modules>
-                               <!-- Include the flink-fs-tests project only 
for HD2.
-                                       The HDFS minicluster interfaces changed 
between the two versions.
-                                -->
-                               <module>flink-fs-tests</module>
-                       </modules>
-               </profile>
-               <profile>
-                       <id>include-tez</id>
-                       <modules>
-                               <module>flink-tez</module>
-                       </modules>
-               </profile>
-       </profiles>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6cba956..e503a11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,12 +62,13 @@ under the License.
                <module>flink-streaming-java</module>
                <module>flink-streaming-scala</module>
                <module>flink-streaming-connectors</module>
+               <module>flink-batch-connectors</module>
                <module>flink-examples</module>
                <module>flink-clients</module>
                <module>flink-tests</module>
                <module>flink-test-utils</module>
-               <module>flink-staging</module>
                <module>flink-libraries</module>
+               <module>flink-scala-shell</module>
                <module>flink-quickstart</module>
                <module>flink-contrib</module>
                <module>flink-dist</module>
@@ -428,6 +429,10 @@ under the License.
                        </properties>
                        <modules>
                                <module>flink-yarn</module>
+                               <!-- Include the flink-fs-tests project only 
for HD2.
+                                       The HDFS minicluster interfaces changed 
between the two versions.
+                                -->
+                               <module>flink-fs-tests</module>
                        </modules>
                </profile>
 
@@ -802,13 +807,13 @@ under the License.
 
                                                <!-- Test Data. -->
                                                
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
-                                               
<exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude>
+                                               
<exclude>flink-batch-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude>
                                                
<exclude>out/test/flink-avro/avro/user.avsc</exclude>
-                                               
<exclude>flink-staging/flink-table/src/test/scala/resources/*.out</exclude>
+                                               
<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>
                                                <!-- TweetInputFormat Test 
Data-->
                                                
<exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>
-                                               
<exclude>flink-staging/flink-avro/src/test/resources/testdata.avro</exclude>
-                                               
<exclude>flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
+                                               
<exclude>flink-batch-connectors/flink-avro/src/test/resources/testdata.avro</exclude>
+                                               
<exclude>flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
                                                
<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv</exclude>
                                                
<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text</exclude>
                                                <!-- Configuration Files. -->

Reply via email to