http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
new file mode 100644
index 0000000..89e4642
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.TaskContext;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.runtime.operators.util.CloseableInputProvider;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.tez.runtime.input.TezReaderIterator;
+import org.apache.flink.tez.runtime.output.TezChannelSelector;
+import org.apache.flink.tez.runtime.output.TezOutputEmitter;
+import org.apache.flink.tez.runtime.output.TezOutputCollector;
+import org.apache.flink.tez.util.DummyInvokable;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.io.IOException;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+
+public class TezTask<S extends Function,OT>  implements TaskContext<S, OT> {
+
+       protected static final Log LOG = LogFactory.getLog(TezTask.class);
+
+       DummyInvokable invokable = new DummyInvokable();
+
+       /**
+        * The driver that invokes the user code (the stub implementation). The 
central driver in this task
+        * (further drivers may be chained behind this driver).
+        */
+       protected volatile Driver<S, OT> driver;
+
+       /**
+        * The instantiated user code of this task's main operator (driver). 
May be null if the operator has no udf.
+        */
+       protected S stub;
+
+       /**
+        * The udf's runtime context.
+        */
+       protected RuntimeUDFContext runtimeUdfContext;
+
+       /**
+        * The collector that forwards the user code's results. May forward to 
a channel or to chained drivers within
+        * this task.
+        */
+       protected Collector<OT> output;
+
+       /**
+        * The inputs reader, wrapped in an iterator. Prior to the local 
strategies, etc...
+        */
+       protected MutableObjectIterator<?>[] inputIterators;
+
+       /**
+        * The local strategies that are applied on the inputs.
+        */
+       protected volatile CloseableInputProvider<?>[] localStrategies;
+
+       /**
+        * The inputs to the operator. Return the readers' data after the 
application of the local strategy
+        * and the temp-table barrier.
+        */
+       protected MutableObjectIterator<?>[] inputs;
+
+       /**
+        * The serializers for the input data type.
+        */
+       protected TypeSerializerFactory<?>[] inputSerializers;
+
+       /**
+        * The comparators for the central driver.
+        */
+       protected TypeComparator<?>[] inputComparators;
+
+       /**
+        * The task configuration with the setup parameters.
+        */
+       protected TezTaskConfig config;
+
+       /**
+        * The class loader used to instantiate user code and user data types.
+        */
+       protected ClassLoader userCodeClassLoader = 
ClassLoader.getSystemClassLoader();
+
+       /**
+        * For now, create a default ExecutionConfig 
+        */
+       protected ExecutionConfig executionConfig;
+
+       /*
+        * Tez-specific variables given by the Processor
+        */
+       protected TypeSerializer<OT> outSerializer;
+
+       protected List<Integer> numberOfSubTasksInOutputs;
+
+       protected String taskName;
+
+       protected int numberOfSubtasks;
+
+       protected int indexInSubtaskGroup;
+
+       TezRuntimeEnvironment runtimeEnvironment;
+
+       public TezTask(TezTaskConfig config, RuntimeUDFContext 
runtimeUdfContext, long availableMemory) {
+               this.config = config;
+               final Class<? extends Driver<S, OT>> driverClass = 
this.config.getDriver();
+               this.driver = InstantiationUtil.instantiate(driverClass, 
Driver.class);
+               
+               LOG.info("ClassLoader URLs: " + 
Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs()));
+               
+               this.stub = 
this.config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(Function.class,
 this.userCodeClassLoader); //TODO get superclass properly
+               this.runtimeUdfContext = runtimeUdfContext;
+               this.outSerializer = (TypeSerializer<OT>) 
this.config.getOutputSerializer(getClass().getClassLoader()).getSerializer();
+               this.numberOfSubTasksInOutputs = 
this.config.getNumberSubtasksInOutput();
+               this.taskName = this.config.getTaskName();
+               this.numberOfSubtasks = 
this.runtimeUdfContext.getNumberOfParallelSubtasks();
+               this.indexInSubtaskGroup = 
this.runtimeUdfContext.getIndexOfThisSubtask();
+               this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 
* availableMemory));
+               this.executionConfig = runtimeUdfContext.getExecutionConfig();
+               this.invokable.setExecutionConfig(this.executionConfig);
+       }
+
+
+       //-------------------------------------------------------------
+       // Interface to FlinkProcessor
+       //-------------------------------------------------------------
+
+       public void invoke(List<KeyValueReader> readers, List<KeyValueWriter> 
writers) throws Exception {
+
+               // whatever happens in this scope, make sure that the local 
strategies are cleaned up!
+               // note that the initialization of the local strategies is in 
the try-finally block as well,
+               // so that the thread that creates them catches its own errors 
that may happen in that process.
+               // this is especially important, since there may be 
asynchronous closes (such as through canceling).
+               try {
+                       // initialize the inputs and outputs
+                       initInputsOutputs(readers, writers);
+
+                       // pre main-function initialization
+                       initialize();
+
+                       // the work goes here
+                       run();
+               }
+               finally {
+                       // clean up in any case!
+                       closeLocalStrategies();
+               }
+       }
+
+
+       /*
+        * Initialize inputs, input serializers, input comparators, and 
collector
+        * Assumes that the config and userCodeClassLoader has been set
+        */
+       private void initInputsOutputs (List<KeyValueReader> readers, 
List<KeyValueWriter> writers) throws Exception {
+
+               int numInputs = readers.size();
+               Preconditions.checkArgument(numInputs == 
driver.getNumberOfInputs());
+
+               // Prior to local strategies
+               this.inputIterators = new MutableObjectIterator[numInputs];
+               //local strategies
+               this.localStrategies = new CloseableInputProvider[numInputs];
+               // After local strategies
+               this.inputs = new MutableObjectIterator[numInputs];
+
+               int numComparators = driver.getNumberOfDriverComparators();
+               initInputsSerializersAndComparators(numInputs, numComparators);
+
+               int index = 0;
+               for (KeyValueReader reader : readers) {
+                       this.inputIterators[index] = new 
TezReaderIterator<Object>(reader);
+                       initInputLocalStrategy(index);
+                       index++;
+               }
+
+               int numOutputs = writers.size();
+               ArrayList<TezChannelSelector<OT>> channelSelectors = new 
ArrayList<TezChannelSelector<OT>>(numOutputs);
+               //ArrayList<Integer> numStreamsInOutputs = new 
ArrayList<Integer>(numOutputs);
+               for (int i = 0; i < numOutputs; i++) {
+                       final ShipStrategyType strategy = 
config.getOutputShipStrategy(i);
+                       final TypeComparatorFactory<OT> compFactory = 
config.getOutputComparator(i, this.userCodeClassLoader);
+                       final DataDistribution dataDist = 
config.getOutputDataDistribution(i, this.userCodeClassLoader);
+                       if (compFactory == null) {
+                               channelSelectors.add(i, new 
TezOutputEmitter<OT>(strategy));
+                       } else if (dataDist == null){
+                               final TypeComparator<OT> comparator = 
compFactory.createComparator();
+                               channelSelectors.add(i, new 
TezOutputEmitter<OT>(strategy, comparator));
+                       } else {
+                               final TypeComparator<OT> comparator = 
compFactory.createComparator();
+                               channelSelectors.add(i,new 
TezOutputEmitter<OT>(strategy, comparator, dataDist));
+                       }
+               }
+               this.output = new TezOutputCollector<OT>(writers, 
channelSelectors, outSerializer, numberOfSubTasksInOutputs);
+       }
+
+
+
+       // --------------------------------------------------------------------
+       // TaskContext interface
+       // --------------------------------------------------------------------
+
+       @Override
+       public TaskConfig getTaskConfig() {
+               return (TaskConfig) this.config;
+       }
+
+       @Override
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+
+       @Override
+       public ClassLoader getUserCodeClassLoader() {
+               return this.userCodeClassLoader;
+       }
+
+       @Override
+       public MemoryManager getMemoryManager() {
+               return runtimeEnvironment.getMemoryManager();
+       }
+
+       @Override
+       public IOManager getIOManager() {
+               return runtimeEnvironment.getIOManager();
+       }
+
+       @Override
+       public TaskManagerRuntimeInfo getTaskManagerInfo() {
+               return new TaskManagerRuntimeInfo("localhost", new 
Configuration());
+       }
+
+       @Override
+       public <X> MutableObjectIterator<X> getInput(int index) {
+               if (index < 0 || index > this.driver.getNumberOfInputs()) {
+                       throw new IndexOutOfBoundsException();
+               }
+               // check for lazy assignment from input strategies
+               if (this.inputs[index] != null) {
+                       @SuppressWarnings("unchecked")
+                       MutableObjectIterator<X> in = 
(MutableObjectIterator<X>) this.inputs[index];
+                       return in;
+               } else {
+                       final MutableObjectIterator<X> in;
+                       try {
+                               if (this.localStrategies[index] != null) {
+                                       @SuppressWarnings("unchecked")
+                                       MutableObjectIterator<X> iter = 
(MutableObjectIterator<X>) this.localStrategies[index].getIterator();
+                                       in = iter;
+                               } else {
+                                       throw new RuntimeException("Bug: null 
input iterator, null temp barrier, and null local strategy.");
+                               }
+                               this.inputs[index] = in;
+                               return in;
+                       } catch (InterruptedException iex) {
+                               throw new RuntimeException("Interrupted while 
waiting for input " + index + " to become available.");
+                       } catch (IOException ioex) {
+                               throw new RuntimeException("An I/O Exception 
occurred whily obaining input " + index + ".");
+                       }
+               }
+       }
+
+       @Override
+       public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
+               if (index < 0 || index >= this.driver.getNumberOfInputs()) {
+                       throw new IndexOutOfBoundsException();
+               }
+
+               @SuppressWarnings("unchecked")
+               final TypeSerializerFactory<X> serializerFactory = 
(TypeSerializerFactory<X>) this.inputSerializers[index];
+               return serializerFactory;
+       }
+
+       @Override
+       public <X> TypeComparator<X> getDriverComparator(int index) {
+               if (this.inputComparators == null) {
+                       throw new IllegalStateException("Comparators have not 
been created!");
+               }
+               else if (index < 0 || index >= 
this.driver.getNumberOfDriverComparators()) {
+                       throw new IndexOutOfBoundsException();
+               }
+
+               @SuppressWarnings("unchecked")
+               final TypeComparator<X> comparator = (TypeComparator<X>) 
this.inputComparators[index];
+               return comparator;
+       }
+
+
+
+       @Override
+       public S getStub() {
+               return this.stub;
+       }
+
+       @Override
+       public Collector<OT> getOutputCollector() {
+               return this.output;
+       }
+
+       @Override
+       public AbstractInvokable getOwningNepheleTask() {
+               return this.invokable;
+       }
+
+       @Override
+       public String formatLogString(String message) {
+               return null;
+       }
+
+
+       // --------------------------------------------------------------------
+       // Adapted from BatchTask
+       // --------------------------------------------------------------------
+
+       private void initInputLocalStrategy(int inputNum) throws Exception {
+               // check if there is already a strategy
+               if (this.localStrategies[inputNum] != null) {
+                       throw new IllegalStateException();
+               }
+
+               // now set up the local strategy
+               final LocalStrategy localStrategy = 
this.config.getInputLocalStrategy(inputNum);
+               if (localStrategy != null) {
+                       switch (localStrategy) {
+                               case NONE:
+                                       // the input is as it is
+                                       this.inputs[inputNum] = 
this.inputIterators[inputNum];
+                                       break;
+                               case SORT:
+                                       @SuppressWarnings({ "rawtypes", 
"unchecked" })
+                                       UnilateralSortMerger<?> sorter = new 
UnilateralSortMerger(getMemoryManager(), getIOManager(),
+                                                       
this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], 
getLocalStrategyComparator(inputNum),
+                                                       
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
+                                                       
this.config.getSpillingThresholdInput(inputNum), 
this.executionConfig.isObjectReuseEnabled());
+                                       // set the input to null such that it 
will be lazily fetched from the input strategy
+                                       this.inputs[inputNum] = null;
+                                       this.localStrategies[inputNum] = sorter;
+                                       break;
+                               case COMBININGSORT:
+                                       // sanity check this special case!
+                                       // this still breaks a bit of the 
abstraction!
+                                       // we should have nested configurations 
for the local strategies to solve that
+                                       if (inputNum != 0) {
+                                               throw new 
IllegalStateException("Performing combining sort outside a (group)reduce 
task!");
+                                       }
+
+                                       // instantiate ourselves a combiner. we 
should not use the stub, because the sort and the
+                                       // subsequent (group)reduce would 
otherwise share it multi-threaded
+                                       final Class<S> userCodeFunctionType = 
this.driver.getStubType();
+                                       if (userCodeFunctionType == null) {
+                                               throw new 
IllegalStateException("Performing combining sort outside a reduce task!");
+                                       }
+                                       final S localStub;
+                                       try {
+                                               localStub = 
initStub(userCodeFunctionType);
+                                       } catch (Exception e) {
+                                               throw new 
RuntimeException("Initializing the user code and the configuration failed" +
+                                                               (e.getMessage() 
== null ? "." : ": " + e.getMessage()), e);
+                                       }
+
+                                       if (!(localStub instanceof 
GroupCombineFunction)) {
+                                               throw new 
IllegalStateException("Performing combining sort outside a reduce task!");
+                                       }
+
+                                       @SuppressWarnings({ "rawtypes", 
"unchecked" })
+                                       CombiningUnilateralSortMerger<?> 
cSorter = new CombiningUnilateralSortMerger(
+                                                       (GroupCombineFunction) 
localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
+                                                       this.invokable, 
this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
+                                                       
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
+                                                       
this.config.getSpillingThresholdInput(inputNum), 
this.executionConfig.isObjectReuseEnabled());
+                                       
cSorter.setUdfConfiguration(this.config.getStubParameters());
+
+                                       // set the input to null such that it 
will be lazily fetched from the input strategy
+                                       this.inputs[inputNum] = null;
+                                       this.localStrategies[inputNum] = 
cSorter;
+                                       break;
+                               default:
+                                       throw new Exception("Unrecognized local 
strategy provided: " + localStrategy.name());
+                       }
+               } else {
+                       // no local strategy in the config
+                       this.inputs[inputNum] = this.inputIterators[inputNum];
+               }
+       }
+
+       private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) 
throws Exception {
+               TypeComparatorFactory<T> compFact = 
this.config.getInputComparator(inputNum, this.userCodeClassLoader);
+               if (compFact == null) {
+                       throw new Exception("Missing comparator factory for 
local strategy on input " + inputNum);
+               }
+               return compFact.createComparator();
+       }
+
+       protected S initStub(Class<? super S> stubSuperClass) throws Exception {
+               try {
+                       S stub = 
config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass,
 this.userCodeClassLoader);
+                       // check if the class is a subclass, if the check is 
required
+                       if (stubSuperClass != null && 
!stubSuperClass.isAssignableFrom(stub.getClass())) {
+                               throw new RuntimeException("The class '" + 
stub.getClass().getName() + "' is not a subclass of '" +
+                                               stubSuperClass.getName() + "' 
as is required.");
+                       }
+                       FunctionUtils.setFunctionRuntimeContext(stub, 
this.runtimeUdfContext);
+                       return stub;
+               }
+               catch (ClassCastException ccex) {
+                       throw new Exception("The stub class is not a proper 
subclass of " + stubSuperClass.getName(), ccex);
+               }
+       }
+
+       /**
+        * Creates all the serializers and comparators.
+        */
+       protected void initInputsSerializersAndComparators(int numInputs, int 
numComparators) throws Exception {
+               this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
+               this.inputComparators = numComparators > 0 ? new 
TypeComparator[numComparators] : null;
+               //this.inputComparators = 
this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null;
+               this.inputIterators = new MutableObjectIterator[numInputs];
+
+               for (int i = 0; i < numInputs; i++) {
+                       //  ---------------- create the serializer first 
---------------------
+                       final TypeSerializerFactory<?> serializerFactory = 
this.config.getInputSerializer(i, this.userCodeClassLoader);
+                       this.inputSerializers[i] = serializerFactory;
+                       // this.inputIterators[i] = 
createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
+               }
+               //  ---------------- create the driver's comparators 
---------------------
+               for (int i = 0; i < numComparators; i++) {
+                       if (this.inputComparators != null) {
+                               final TypeComparatorFactory<?> 
comparatorFactory = this.config.getDriverComparator(i, 
this.userCodeClassLoader);
+                               this.inputComparators[i] = 
comparatorFactory.createComparator();
+                       }
+               }
+       }
+
+       protected void initialize() throws Exception {
+               // create the operator
+               try {
+                       this.driver.setup(this);
+               }
+               catch (Throwable t) {
+                       throw new Exception("The driver setup for '" + //TODO 
put taks name here
+                                       "' , caused an error: " + 
t.getMessage(), t);
+               }
+
+               //this.runtimeUdfContext = createRuntimeContext();
+
+               // instantiate the UDF
+               try {
+                       final Class<? super S> userCodeFunctionType = 
this.driver.getStubType();
+                       // if the class is null, the driver has no user code
+                       if (userCodeFunctionType != null) {
+                               this.stub = initStub(userCodeFunctionType);
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Initializing the UDF" +
+                                       e.getMessage() == null ? "." : ": " + 
e.getMessage(), e);
+               }
+       }
+
+       /*
+       public RuntimeUDFContext createRuntimeContext() {
+               return new RuntimeUDFContext(this.taskName, 
this.numberOfSubtasks, this.indexInSubtaskGroup, null);
+       }
+       */
+
+       protected void closeLocalStrategies() {
+               if (this.localStrategies != null) {
+                       for (int i = 0; i < this.localStrategies.length; i++) {
+                               if (this.localStrategies[i] != null) {
+                                       try {
+                                               this.localStrategies[i].close();
+                                       } catch (Throwable t) {
+                                               LOG.error("Error closing local 
strategy for input " + i, t);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       protected void run() throws Exception {
+               // ---------------------------- Now, the actual processing 
starts ------------------------
+               // check for asynchronous canceling
+
+               boolean stubOpen = false;
+
+               try {
+                       // run the data preparation
+                       try {
+                               this.driver.prepare();
+                       }
+                       catch (Throwable t) {
+                               // if the preparation caused an error, clean up
+                               // errors during clean-up are swallowed, 
because we have already a root exception
+                               throw new Exception("The data preparation for 
task '" + this.taskName +
+                                               "' , caused an error: " + 
t.getMessage(), t);
+                       }
+
+                       // open stub implementation
+                       if (this.stub != null) {
+                               try {
+                                       Configuration stubConfig = 
this.config.getStubParameters();
+                                       FunctionUtils.openFunction(this.stub, 
stubConfig);
+                                       stubOpen = true;
+                               }
+                               catch (Throwable t) {
+                                       throw new Exception("The user defined 
'open()' method caused an exception: " + t.getMessage(), t);
+                               }
+                       }
+
+                       // run the user code
+                       this.driver.run();
+
+                       // close. We close here such that a regular close 
throwing an exception marks a task as failed.
+                       if (this.stub != null) {
+                               FunctionUtils.closeFunction(this.stub);
+                               stubOpen = false;
+                       }
+
+                       this.output.close();
+
+               }
+               catch (Exception ex) {
+                       // close the input, but do not report any exceptions, 
since we already have another root cause
+                       ex.printStackTrace();
+                       throw new RuntimeException("Exception in TaskContext: " 
+ ex.getMessage() + " "+  ex.getStackTrace());
+               }
+               finally {
+                       this.driver.cleanup();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
new file mode 100644
index 0000000..94a8315
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class TezTaskConfig extends TaskConfig {
+
+       public static final String TEZ_TASK_CONFIG = 
"tez.task.flink.processor.taskconfig";
+
+       private static final String NUMBER_SUBTASKS_IN_OUTPUTS = 
"tez.num_subtasks_in_output";
+
+       private static final String INPUT_SPLIT_PROVIDER = 
"tez.input_split_provider";
+
+       private static final String INPUT_POSITIONS = "tez.input_positions";
+
+       private static final String INPUT_FORMAT = "tez.input_format";
+
+       private static final String DATASOURCE_PROCESSOR_NAME = 
"tez.datasource_processor_name";
+
+       public TezTaskConfig(Configuration config) {
+               super(config);
+       }
+
+
+       public void setDatasourceProcessorName(String name) {
+               if (name != null) {
+                       this.config.setString(DATASOURCE_PROCESSOR_NAME, name);
+               }
+       }
+
+       public String getDatasourceProcessorName() {
+               return this.config.getString(DATASOURCE_PROCESSOR_NAME, null);
+       }
+
+       public void setNumberSubtasksInOutput(ArrayList<Integer> 
numberSubtasksInOutputs) {
+               try {
+                       
InstantiationUtil.writeObjectToConfig(numberSubtasksInOutputs, this.config, 
NUMBER_SUBTASKS_IN_OUTPUTS);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while writing the 
input split provider object to the task configuration.");
+               }
+       }
+
+       public ArrayList<Integer> getNumberSubtasksInOutput() {
+               ArrayList<Integer> numberOfSubTasksInOutputs = null;
+               try {
+                       numberOfSubTasksInOutputs = (ArrayList) 
InstantiationUtil.readObjectFromConfig(this.config, NUMBER_SUBTASKS_IN_OUTPUTS, 
getClass().getClassLoader());
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Error while reading the 
number of subtasks in outputs object from the task configuration.");
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Error while reading the 
number of subtasks in outpurs object from the task configuration. " +
+                                       "class not found.");
+               }
+               if (numberOfSubTasksInOutputs == null) {
+                       throw new NullPointerException();
+               }
+               return numberOfSubTasksInOutputs;
+
+       }
+
+
+       public void setInputSplitProvider (InputSplitProvider 
inputSplitProvider) {
+               try {
+                       
InstantiationUtil.writeObjectToConfig(inputSplitProvider, this.config, 
INPUT_SPLIT_PROVIDER);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while writing the 
input split provider object to the task configuration.");
+               }
+       }
+
+       public InputSplitProvider getInputSplitProvider () {
+               InputSplitProvider inputSplitProvider = null;
+               try {
+                       inputSplitProvider = (InputSplitProvider) 
InstantiationUtil.readObjectFromConfig(this.config, INPUT_SPLIT_PROVIDER, 
getClass().getClassLoader());
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Error while reading the 
input split provider object from the task configuration.");
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Error while reading the 
input split provider object from the task configuration. " +
+                                       "ChannelSelector class not found.");
+               }
+               if (inputSplitProvider == null) {
+                       throw new NullPointerException();
+               }
+               return inputSplitProvider;
+       }
+
+
+       public void setInputPositions(HashMap<String,ArrayList<Integer>> 
inputPositions) {
+               try {
+                       InstantiationUtil.writeObjectToConfig(inputPositions, 
this.config, INPUT_POSITIONS);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while writing the 
input positions object to the task configuration.");
+               }
+       }
+
+       public HashMap<String,ArrayList<Integer>> getInputPositions () {
+               HashMap<String,ArrayList<Integer>> inputPositions = null;
+               try {
+                       inputPositions = (HashMap) 
InstantiationUtil.readObjectFromConfig(this.config, INPUT_POSITIONS, 
getClass().getClassLoader());
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Error while reading the 
input positions object from the task configuration.");
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Error while reading the 
input positions object from the task configuration. " +
+                                       "ChannelSelector class not found.");
+               }
+               if (inputPositions == null) {
+                       throw new NullPointerException();
+               }
+               return inputPositions;
+       }
+
+       public void setInputFormat (InputFormat inputFormat) {
+               try {
+                       InstantiationUtil.writeObjectToConfig(inputFormat, 
this.config, INPUT_FORMAT);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error while writing the 
input format object to the task configuration.");
+               }
+       }
+
+       public InputFormat getInputFormat () {
+               InputFormat inputFormat = null;
+               try {
+                       inputFormat = (InputFormat) 
InstantiationUtil.readObjectFromConfig(this.config, INPUT_FORMAT, 
getClass().getClassLoader());
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Error while reading the 
input split provider object from the task configuration.");
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Error while reading the 
input split provider object from the task configuration. " +
+                                       "ChannelSelector class not found.");
+               }
+               if (inputFormat == null) {
+                       throw new NullPointerException();
+               }
+               return inputFormat;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
new file mode 100644
index 0000000..7ceeac8
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class UnionProcessor extends AbstractLogicalIOProcessor {
+
+       private TezTaskConfig config;
+       protected Map<String, LogicalInput> inputs;
+       protected Map<String, LogicalOutput> outputs;
+       private List<KeyValueReader> readers;
+       private List<KeyValueWriter> writers;
+       private int numInputs;
+       private int numOutputs;
+
+       public UnionProcessor(ProcessorContext context) {
+               super(context);
+       }
+
+       @Override
+       public void initialize() throws Exception {
+               UserPayload payload = getContext().getUserPayload();
+               Configuration conf = 
TezUtils.createConfFromUserPayload(payload);
+
+               this.config = (TezTaskConfig) 
EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), 
getClass().getClassLoader());
+               config.setTaskName(getContext().getTaskVertexName());
+       }
+
+       @Override
+       public void handleEvents(List<Event> processorEvents) {
+
+       }
+
+       @Override
+       public void close() throws Exception {
+
+       }
+
+       @Override
+       public void run(Map<String, LogicalInput> inputs, Map<String, 
LogicalOutput> outputs) throws Exception {
+               this.inputs = inputs;
+               this.outputs = outputs;
+               this.numInputs = inputs.size();
+               this.numOutputs = outputs.size();
+
+               this.readers = new ArrayList<KeyValueReader>(numInputs);
+               if (this.inputs != null) {
+                       for (LogicalInput input: this.inputs.values()) {
+                               input.start();
+                               readers.add((KeyValueReader) input.getReader());
+                       }
+               }
+
+               this.writers = new ArrayList<KeyValueWriter>(numOutputs);
+               if (this.outputs != null) {
+                       for (LogicalOutput output : this.outputs.values()) {
+                               output.start();
+                               writers.add((KeyValueWriter) 
output.getWriter());
+                       }
+               }
+
+               Preconditions.checkArgument(writers.size() == 1);
+               KeyValueWriter writer = writers.get(0);
+
+               for (KeyValueReader reader: this.readers) {
+                       while (reader.next()) {
+                               Object key = reader.getCurrentKey();
+                               Object value = reader.getCurrentValue();
+                               writer.write(key, value);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
new file mode 100644
index 0000000..ef59fd0
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.input;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class FlinkInput extends AbstractLogicalInput {
+
+       private static final Log LOG = LogFactory.getLog(FlinkInput.class);
+       
+       private InputSplit split;
+       private boolean splitIsCreated;
+       private final ReentrantLock rrLock = new ReentrantLock();
+       private final Condition rrInited = rrLock.newCondition();
+
+       public FlinkInput(InputContext inputContext, int numPhysicalInputs) {
+               super(inputContext, numPhysicalInputs);
+               getContext().requestInitialMemory(0l, null); // mandatory call
+               split = null;
+       }
+
+       @Override
+       public void handleEvents(List<Event> inputEvents) throws Exception {
+               
+               LOG.info("Received " + inputEvents.size() + " events (should be 
= 1)");
+               
+               Event event = inputEvents.iterator().next();
+               
+               Preconditions.checkArgument(event instanceof 
InputDataInformationEvent,
+                               getClass().getSimpleName()
+                                               + " can only handle a single 
event of type: "
+                                               + 
InputDataInformationEvent.class.getSimpleName());
+
+               initSplitFromEvent ((InputDataInformationEvent)event);
+       }
+
+       private void initSplitFromEvent (InputDataInformationEvent e) throws 
Exception {
+               rrLock.lock();
+
+               try {
+                       ByteString byteString = 
ByteString.copyFrom(e.getUserPayload());
+                       this.split =  (InputSplit) 
InstantiationUtil.deserializeObject(byteString.toByteArray(), 
getClass().getClassLoader());
+                       this.splitIsCreated = true;
+                       
+                       LOG.info ("Initializing input split " + 
split.getSplitNumber() + ": " + split.toString() + " from event (" + 
e.getSourceIndex() + "," + e.getTargetIndex() + "): " + e.toString());
+                       
+                       rrInited.signal();
+               }
+               catch (Exception ex) {
+                       throw new IOException(
+                                       "Interrupted waiting for InputSplit 
initialization");
+               }
+               finally {
+                       rrLock.unlock();
+               }
+       }
+
+       @Override
+       public List<Event> close() throws Exception {
+               return null;
+       }
+
+       @Override
+       public void start() throws Exception {
+       }
+
+       @Override
+       public Reader getReader() throws Exception {
+               throw new RuntimeException("FlinkInput does not contain a 
Reader. Should use getSplit instead");
+       }
+
+       @Override
+       public List<Event> initialize() throws Exception {
+               return null;
+       }
+
+       public InputSplit getSplit () throws Exception {
+
+               rrLock.lock();
+               try {
+                       if (!splitIsCreated) {
+                               checkAndAwaitSplitInitialization();
+                       }
+               }
+               finally {
+                       rrLock.unlock();
+               }
+               if (split == null) {
+                       LOG.info("Input split has not been created. This should 
not happen");
+                       throw new RuntimeException("Input split has not been 
created. This should not happen");
+               }
+               return split;
+       }
+
+       void checkAndAwaitSplitInitialization() throws IOException {
+               assert rrLock.getHoldCount() == 1;
+               rrLock.lock();
+               try {
+                       rrInited.await();
+               } catch (Exception e) {
+                       throw new IOException(
+                                       "Interrupted waiting for InputSplit 
initialization");
+               } finally {
+                       rrLock.unlock();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
new file mode 100644
index 0000000..db1261c
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.input;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+public class FlinkInputSplitGenerator extends InputInitializer {
+
+       private static final Log LOG = 
LogFactory.getLog(FlinkInputSplitGenerator.class);
+
+       InputFormat format;
+
+       public FlinkInputSplitGenerator(InputInitializerContext 
initializerContext) {
+               super(initializerContext);
+       }
+
+       @Override
+       public List<Event> initialize() throws Exception {
+
+               Configuration tezConf = 
TezUtils.createConfFromUserPayload(this.getContext().getUserPayload());
+
+               TezTaskConfig taskConfig = (TezTaskConfig) 
EncodingUtils.decodeObjectFromString(tezConf.get(TezTaskConfig.TEZ_TASK_CONFIG),
 getClass().getClassLoader());
+
+               this.format = taskConfig.getInputFormat();
+
+               int numTasks = this.getContext().getNumTasks();
+
+               LOG.info ("Creating splits for " + numTasks + " tasks for input 
format " + format);
+               
+               InputSplit[] splits = format.createInputSplits((numTasks > 0) ? 
numTasks : 1 );
+
+               LOG.info ("Created " + splits.length + " input splits" + " 
tasks for input format " + format);
+               
+               //LOG.info ("Created + " + splits.length + " input splits for 
input format " + format);
+
+               LOG.info ("Sending input split events");
+               LinkedList<Event> events = new LinkedList<Event>();
+               for (int i = 0; i < splits.length; i++) {
+                       byte [] bytes = 
InstantiationUtil.serializeObject(splits[i]);
+                       ByteBuffer buf = ByteBuffer.wrap(bytes);
+                       InputDataInformationEvent event = 
InputDataInformationEvent.createWithSerializedPayload(i % numTasks, buf);
+                       event.setTargetIndex(i % numTasks);
+                       events.add(event);
+                       LOG.info ("Added event of index " + i + ": " + event);
+               }
+               return events;
+       }
+
+       @Override
+       public void handleInputInitializerEvent(List<InputInitializerEvent> 
events) throws Exception {
+
+       }
+
+       @Override
+       public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+               //super.onVertexStateUpdated(stateUpdate);
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
new file mode 100644
index 0000000..722f0a1
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.input;
+
+
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+import java.io.IOException;
+
+public class TezReaderIterator<T> implements MutableObjectIterator<T>{
+
+       private KeyValueReader kvReader;
+
+       public TezReaderIterator(KeyValueReader kvReader) {
+               this.kvReader = kvReader;
+       }
+
+       @Override
+       public T next(T reuse) throws IOException {
+               if (kvReader.next()) {
+                       Object key = kvReader.getCurrentKey();
+                       Object value = kvReader.getCurrentValue();
+                       if (!(key instanceof IntWritable)) {
+                               throw new IllegalStateException("Wrong key 
type");
+                       }
+                       reuse = (T) value;
+                       return reuse;
+               }
+               else {
+                       return null;
+               }
+       }
+
+       @Override
+       public T next() throws IOException {
+               if (kvReader.next()) {
+                       Object key = kvReader.getCurrentKey();
+                       Object value = kvReader.getCurrentValue();
+                       if (!(key instanceof IntWritable)) {
+                               throw new IllegalStateException("Wrong key 
type");
+                       }
+                       return (T) value;
+               }
+               else {
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
new file mode 100644
index 0000000..2358f29
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.runtime.library.api.Partitioner;
+
+public class SimplePartitioner implements Partitioner {
+
+       @Override
+       public int getPartition(Object key, Object value, int numPartitions) {
+               if (!(key instanceof IntWritable)) {
+                       throw new IllegalStateException("Partitioning key 
should be int");
+               }
+               IntWritable channel = (IntWritable) key;
+               return channel.get();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
new file mode 100644
index 0000000..7e5cd55
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+import java.io.Serializable;
+
+public interface TezChannelSelector<T> extends Serializable {
+
+       /**
+        * Called to determine to which attached {@link 
org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given 
record shall be forwarded.
+        *
+        * @param record
+        *        the record to the determine the output channels for
+        * @param numberOfOutputChannels
+        *        the total number of output channels which are attached to 
respective output gate
+        * @return a (possibly empty) array of integer numbers which indicate 
the indices of the output channels through
+        *         which the record shall be forwarded
+        */
+       int[] selectChannels(T record, int numberOfOutputChannels);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
new file mode 100644
index 0000000..b68e6c8
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TezOutputCollector<T> implements Collector<T> {
+
+       private List<KeyValueWriter> writers;
+
+       private List<TezChannelSelector<T>> outputEmitters;
+
+       private List<Integer> numberOfStreamsInOutputs;
+
+       private int numOutputs;
+
+       private TypeSerializer<T> serializer;
+
+       public TezOutputCollector(List<KeyValueWriter> writers, 
List<TezChannelSelector<T>> outputEmitters, TypeSerializer<T> serializer, 
List<Integer> numberOfStreamsInOutputs) {
+               this.writers = writers;
+               this.outputEmitters = outputEmitters;
+               this.numberOfStreamsInOutputs = numberOfStreamsInOutputs;
+               this.serializer = serializer;
+               this.numOutputs = writers.size();
+       }
+
+       @Override
+       public void collect(T record) {
+               for (int i = 0; i < numOutputs; i++) {
+                       KeyValueWriter writer = writers.get(i);
+                       TezChannelSelector<T> outputEmitter = 
outputEmitters.get(i);
+                       int numberOfStreamsInOutput = 
numberOfStreamsInOutputs.get(i);
+                       try {
+                               for (int channel : 
outputEmitter.selectChannels(record, numberOfStreamsInOutput)) {
+                                       IntWritable key = new 
IntWritable(channel);
+                                       writer.write(key, record);
+                               }
+                       }
+                       catch (IOException e) {
+                               throw new RuntimeException("Emitting the record 
caused an I/O exception: " + e.getMessage(), e);
+                       }
+               }
+       }
+
+       @Override
+       public void close() {
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
new file mode 100644
index 0000000..6dcee0b
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+public class TezOutputEmitter<T> implements TezChannelSelector<T> {
+
+       private final ShipStrategyType strategy;                // the shipping 
strategy used by this output emitter
+
+       private int[] channels;                                         // the 
reused array defining target channels
+
+       private int nextChannelToSendTo = 0;            // counter to go over 
channels round robin
+
+       private final TypeComparator<T> comparator;     // the comparator for 
hashing / sorting
+
+       // 
------------------------------------------------------------------------
+       // Constructors
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new channel selector that distributes data round robin.
+        */
+       public TezOutputEmitter() {
+               this(ShipStrategyType.NONE);
+       }
+
+       /**
+        * Creates a new channel selector that uses the given strategy 
(broadcasting, partitioning, ...).
+        *
+        * @param strategy The distribution strategy to be used.
+        */
+       public TezOutputEmitter(ShipStrategyType strategy) {
+               this(strategy, null);
+       }
+
+       /**
+        * Creates a new channel selector that uses the given strategy 
(broadcasting, partitioning, ...)
+        * and uses the supplied comparator to hash / compare records for 
partitioning them deterministically.
+        *
+        * @param strategy The distribution strategy to be used.
+        * @param comparator The comparator used to hash / compare the records.
+        */
+       public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> 
comparator) {
+               this(strategy, comparator, null);
+       }
+
+       /**
+        * Creates a new channel selector that uses the given strategy 
(broadcasting, partitioning, ...)
+        * and uses the supplied comparator to hash / compare records for 
partitioning them deterministically.
+        *
+        * @param strategy The distribution strategy to be used.
+        * @param comparator The comparator used to hash / compare the records.
+        * @param distr The distribution pattern used in the case of a range 
partitioning.
+        */
+       public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> 
comparator, DataDistribution distr) {
+               if (strategy == null) {
+                       throw new NullPointerException();
+               }
+
+               this.strategy = strategy;
+               this.comparator = comparator;
+
+               switch (strategy) {
+                       case FORWARD:
+                       case PARTITION_HASH:
+                       case PARTITION_RANGE:
+                       case PARTITION_RANDOM:
+                       case PARTITION_FORCED_REBALANCE:
+                       case BROADCAST:
+                               break;
+                       default:
+                               throw new IllegalArgumentException("Invalid 
shipping strategy for OutputEmitter: " + strategy.name());
+               }
+
+               if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == 
null) {
+                       throw new NullPointerException("Data distribution must 
not be null when the ship strategy is range partitioning.");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // Channel Selection
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public final int[] selectChannels(T record, int numberOfChannels) {
+               switch (strategy) {
+                       case FORWARD:
+                       case PARTITION_RANDOM:
+                       case PARTITION_FORCED_REBALANCE:
+                               return robin(numberOfChannels);
+                       case PARTITION_HASH:
+                               return hashPartitionDefault(record, 
numberOfChannels);
+                       case PARTITION_RANGE:
+                               return rangePartition(record, numberOfChannels);
+                       case BROADCAST:
+                               return broadcast(numberOfChannels);
+                       default:
+                               throw new 
UnsupportedOperationException("Unsupported distribution strategy: " + 
strategy.name());
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private final int[] robin(int numberOfChannels) {
+               if (this.channels == null || this.channels.length != 1) {
+                       this.channels = new int[1];
+               }
+
+               int nextChannel = nextChannelToSendTo + 1;
+               nextChannel = nextChannel < numberOfChannels ? nextChannel : 0;
+
+               this.nextChannelToSendTo = nextChannel;
+               this.channels[0] = nextChannel;
+               return this.channels;
+       }
+
+       private final int[] broadcast(int numberOfChannels) {
+               if (channels == null || channels.length != numberOfChannels) {
+                       channels = new int[numberOfChannels];
+                       for (int i = 0; i < numberOfChannels; i++) {
+                               channels[i] = i;
+                       }
+               }
+
+               return channels;
+       }
+
+       private final int[] hashPartitionDefault(T record, int 
numberOfChannels) {
+               if (channels == null || channels.length != 1) {
+                       channels = new int[1];
+               }
+
+               int hash = this.comparator.hash(record);
+
+               hash = murmurHash(hash);
+
+               if (hash >= 0) {
+                       this.channels[0] = hash % numberOfChannels;
+               }
+               else if (hash != Integer.MIN_VALUE) {
+                       this.channels[0] = -hash % numberOfChannels;
+               }
+               else {
+                       this.channels[0] = 0;
+               }
+
+               return this.channels;
+       }
+
+       private final int murmurHash(int k) {
+               k *= 0xcc9e2d51;
+               k = Integer.rotateLeft(k, 15);
+               k *= 0x1b873593;
+
+               k = Integer.rotateLeft(k, 13);
+               k *= 0xe6546b64;
+
+               k ^= 4;
+               k ^= k >>> 16;
+               k *= 0x85ebca6b;
+               k ^= k >>> 13;
+               k *= 0xc2b2ae35;
+               k ^= k >>> 16;
+
+               return k;
+       }
+
+       private final int[] rangePartition(T record, int numberOfChannels) {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
new file mode 100644
index 0000000..39d247c
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.util;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+public class DummyInvokable extends AbstractInvokable {
+
+       private ExecutionConfig executionConfig;
+
+       public DummyInvokable() {
+       }
+
+       public DummyInvokable(ExecutionConfig executionConfig) {
+               this.executionConfig = executionConfig;
+       }
+
+       public void setExecutionConfig(ExecutionConfig executionConfig) {
+               this.executionConfig = executionConfig;
+       }
+
+       @Override
+       public void registerInputOutput() {}
+
+
+       @Override
+       public void invoke() throws Exception {}
+
+       @Override
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
new file mode 100644
index 0000000..202cb24
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.util;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.commons.codec.binary.Base64;
+
+import java.io.IOException;
+
+public class EncodingUtils {
+
+       public static Object decodeObjectFromString(String encoded, ClassLoader 
cl) {
+
+               try {
+                       if (encoded == null) {
+                               return null;
+                       }
+                       byte[] bytes = Base64.decodeBase64(encoded);
+
+                       return InstantiationUtil.deserializeObject(bytes, cl);
+               }
+               catch (IOException e) {
+                       e.printStackTrace();
+                       System.exit(-1);
+                       throw new RuntimeException();
+               }
+               catch (ClassNotFoundException e) {
+                       e.printStackTrace();
+                       System.exit(-1);
+                       throw new RuntimeException();
+               }
+       }
+
+       public static String encodeObjectToString(Object o) {
+
+               try {
+                       byte[] bytes = InstantiationUtil.serializeObject(o);
+
+                       String encoded = Base64.encodeBase64String(bytes);
+                       return encoded;
+               }
+               catch (IOException e) {
+                       e.printStackTrace();
+                       System.exit(-1);
+                       throw new RuntimeException();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
new file mode 100644
index 0000000..07c5f97
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.util;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class FlinkSerialization<T> extends Configured implements 
Serialization<T>{
+
+       @Override
+       public boolean accept(Class<?> c) {
+               TypeSerializer<T> typeSerializer = (TypeSerializer) 
EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
 getClass().getClassLoader());
+               T instance = typeSerializer.createInstance();
+               return instance.getClass().isAssignableFrom(c);
+       }
+
+       @Override
+       public Serializer<T> getSerializer(Class<T> c) {
+               TypeSerializer<T> typeSerializer = (TypeSerializer) 
EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
 getClass().getClassLoader());
+               return new FlinkSerializer<T>(typeSerializer);
+       }
+
+       @Override
+       public Deserializer<T> getDeserializer(Class<T> c) {
+               TypeSerializer<T> typeSerializer = (TypeSerializer) 
EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
 getClass().getClassLoader());
+               return new FlinkDeserializer<T>(typeSerializer);
+       }
+
+       public static class FlinkSerializer<T> implements Serializer<T> {
+
+               private OutputStream dataOut;
+               private DataOutputViewOutputStreamWrapper dataOutputView;
+               private TypeSerializer<T> typeSerializer;
+
+               public FlinkSerializer(TypeSerializer<T> typeSerializer) {
+                       this.typeSerializer = typeSerializer;
+               }
+
+               @Override
+               public void open(OutputStream out) throws IOException {
+                       this.dataOut = out;
+                       this.dataOutputView = new 
DataOutputViewOutputStreamWrapper(out);
+               }
+
+               @Override
+               public void serialize(T t) throws IOException {
+                       typeSerializer.serialize(t, dataOutputView);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       this.dataOut.close();
+               }
+       }
+
+       public static class FlinkDeserializer<T> implements Deserializer<T> {
+
+               private InputStream dataIn;
+               private TypeSerializer<T> typeSerializer;
+               private DataInputViewInputStreamWrapper dataInputView;
+
+               public FlinkDeserializer(TypeSerializer<T> typeSerializer) {
+                       this.typeSerializer = typeSerializer;
+               }
+
+               @Override
+               public void open(InputStream in) throws IOException {
+                       this.dataIn = in;
+                       this.dataInputView = new 
DataInputViewInputStreamWrapper(in);
+               }
+
+               @Override
+               public T deserialize(T t) throws IOException {
+                       T reuse = t;
+                       if (reuse == null) {
+                               reuse = typeSerializer.createInstance();
+                       }
+                       return typeSerializer.deserialize(reuse, dataInputView);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       this.dataIn.close();
+               }
+       }
+
+       private static final class DataOutputViewOutputStreamWrapper implements 
DataOutputView {
+
+               private final DataOutputStream dos;
+
+               public DataOutputViewOutputStreamWrapper(OutputStream output) {
+                       this.dos = new DataOutputStream(output);
+               }
+
+               @Override
+               public void write(int b) throws IOException {
+                       dos.write(b);
+               }
+
+               @Override
+               public void write(byte[] b) throws IOException {
+                       dos.write(b);
+               }
+
+               @Override
+               public void write(byte[] b, int off, int len) throws 
IOException {
+                       dos.write(b, off, len);
+               }
+
+               @Override
+               public void writeBoolean(boolean v) throws IOException {
+                       dos.writeBoolean(v);
+               }
+
+               @Override
+               public void writeByte(int v) throws IOException {
+                       dos.writeByte(v);
+               }
+
+               @Override
+               public void writeShort(int v) throws IOException {
+                       dos.writeShort(v);
+               }
+
+               @Override
+               public void writeChar(int v) throws IOException {
+                       dos.writeChar(v);
+               }
+
+               @Override
+               public void writeInt(int v) throws IOException {
+                       dos.writeInt(v);
+               }
+
+               @Override
+               public void writeLong(long v) throws IOException {
+                       dos.writeLong(v);
+               }
+
+               @Override
+               public void writeFloat(float v) throws IOException {
+                       dos.writeFloat(v);
+               }
+
+               @Override
+               public void writeDouble(double v) throws IOException {
+                       dos.writeDouble(v);
+               }
+
+               @Override
+               public void writeBytes(String s) throws IOException {
+                       dos.writeBytes(s);
+               }
+
+               @Override
+               public void writeChars(String s) throws IOException {
+                       dos.writeChars(s);
+               }
+
+               @Override
+               public void writeUTF(String s) throws IOException {
+                       dos.writeUTF(s);
+               }
+
+               @Override
+               public void skipBytesToWrite(int num) throws IOException {
+                       for (int i = 0; i < num; i++) {
+                               dos.write(0);
+                       }
+               }
+
+               @Override
+               public void write(DataInputView inview, int num) throws 
IOException {
+                       for (int i = 0; i < num; i++) {
+                               dos.write(inview.readByte());
+                       }
+               }
+       }
+
+       private static final class DataInputViewInputStreamWrapper implements 
DataInputView {
+
+               private final DataInputStream dis;
+
+
+               public DataInputViewInputStreamWrapper(InputStream input) {
+                       this.dis = new DataInputStream(input);
+               }
+
+               @Override
+               public void readFully(byte[] b) throws IOException {
+                       dis.readFully(b);
+               }
+
+               @Override
+               public void readFully(byte[] b, int off, int len) throws 
IOException {
+                       dis.readFully(b, off, len);
+               }
+
+               @Override
+               public int skipBytes(int n) throws IOException {
+                       return dis.skipBytes(n);
+               }
+
+               @Override
+               public boolean readBoolean() throws IOException {
+                       return dis.readBoolean();
+               }
+
+               @Override
+               public byte readByte() throws IOException {
+                       return dis.readByte();
+               }
+
+               @Override
+               public int readUnsignedByte() throws IOException {
+                       return dis.readUnsignedByte();
+               }
+
+               @Override
+               public short readShort() throws IOException {
+                       return dis.readShort();
+               }
+
+               @Override
+               public int readUnsignedShort() throws IOException {
+                       return dis.readUnsignedShort();
+               }
+
+               @Override
+               public char readChar() throws IOException {
+                       return dis.readChar();
+               }
+
+               @Override
+               public int readInt() throws IOException {
+                       return dis.readInt();
+               }
+
+               @Override
+               public long readLong() throws IOException {
+                       return dis.readLong();
+               }
+
+               @Override
+               public float readFloat() throws IOException {
+                       return dis.readFloat();
+               }
+
+               @Override
+               public double readDouble() throws IOException {
+                       return dis.readDouble();
+               }
+
+               @Override
+               public String readLine() throws IOException {
+                       return dis.readLine();
+               }
+
+               @Override
+               public String readUTF() throws IOException {
+                       return dis.readUTF();
+               }
+
+               @Override
+               public void skipBytesToRead(int numBytes) throws IOException {
+                       while (numBytes > 0) {
+                               numBytes -= dis.skipBytes(numBytes);
+                       }
+               }
+
+               @Override
+               public int read(byte[] b, int off, int len) throws IOException {
+                       dis.readFully(b, off, len);
+                       return len;
+               }
+
+               @Override
+               public int read(byte[] b) throws IOException {
+                       return read(b, 0, b.length);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/resources/log4j.properties 
b/flink-contrib/flink-tez/src/main/resources/log4j.properties
new file mode 100644
index 0000000..0845c81
--- /dev/null
+++ b/flink-contrib/flink-tez/src/main/resources/log4j.properties
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
new file mode 100644
index 0000000..9124faa
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.test;
+
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.tez.examples.ConnectedComponentsStep;
+import org.junit.Assert;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/*
+ * Note: This does not test whether the program computes one step of the
+ * Weakly Connected Components program correctly. It only tests whether
+ * the program assigns a wrong component to a vertex.
+ */
+
+public class ConnectedComponentsStepITCase extends TezProgramTestBase {
+
+    private static final long SEED = 0xBADC0FFEEBEEFL;
+
+    private static final int NUM_VERTICES = 1000;
+
+    private static final int NUM_EDGES = 10000;
+
+
+    private String verticesPath;
+    private String edgesPath;
+    private String resultPath;
+
+
+    @Override
+    protected void preSubmit() throws Exception {
+        verticesPath = createTempFile("vertices.txt", 
ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
+        edgesPath = createTempFile("edges.txt", 
ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
+        resultPath = getTempFilePath("results");
+    }
+
+    @Override
+    protected void testProgram() throws Exception {
+        ConnectedComponentsStep.main(verticesPath, edgesPath, resultPath, 
"100");
+    }
+
+    @Override
+    protected void postSubmit() throws Exception {
+        for (BufferedReader reader : getResultReader(resultPath)) {
+            checkOddEvenResult(reader);
+        }
+    }
+
+    private static void checkOddEvenResult(BufferedReader result) throws 
IOException {
+        Pattern split = Pattern.compile(" ");
+        String line;
+        while ((line = result.readLine()) != null) {
+            String[] res = split.split(line);
+            Assert.assertEquals("Malformed result: Wrong number of tokens in 
line.", 2, res.length);
+            try {
+                int vertex = Integer.parseInt(res[0]);
+                int component = Integer.parseInt(res[1]);
+                Assert.assertTrue(((vertex % 2) == (component % 2)));
+            } catch (NumberFormatException e) {
+                Assert.fail("Malformed result.");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
new file mode 100644
index 0000000..9a203fe
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.test;
+
+import org.apache.flink.test.testdata.PageRankData;
+import org.apache.flink.tez.examples.PageRankBasicStep;
+
+public class PageRankBasicStepITCase extends TezProgramTestBase {
+
+    private String verticesPath;
+    private String edgesPath;
+    private String resultPath;
+    private String expectedResult;
+
+    public static final String RANKS_AFTER_1_ITERATION = "1 0.2\n" +
+            "2 0.25666666666666665\n" +
+            "3 0.1716666666666667\n" +
+            "4 0.1716666666666667\n" +
+            "5 0.2";
+
+    @Override
+    protected void preSubmit() throws Exception {
+        resultPath = getTempDirPath("result");
+        verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
+        edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
+    }
+
+    @Override
+    protected void testProgram() throws Exception {
+        PageRankBasicStep.main(new String[]{verticesPath, edgesPath, 
resultPath, PageRankData.NUM_VERTICES+"", "-1"});
+        expectedResult = RANKS_AFTER_1_ITERATION;
+    }
+
+    @Override
+    protected void postSubmit() throws Exception {
+        compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001);
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
new file mode 100644
index 0000000..eda9d1a
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.test;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.tez.client.LocalTezEnvironment;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public abstract class TezProgramTestBase extends AbstractTestBase {
+
+    private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
+
+    private JobExecutionResult latestExecutionResult;
+
+    private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
+
+
+    public TezProgramTestBase() {
+        this(new Configuration());
+    }
+
+    public TezProgramTestBase(Configuration config) {
+        super (config);
+    }
+
+
+    public void setParallelism(int degreeOfParallelism) {
+        this.degreeOfParallelism = degreeOfParallelism;
+    }
+
+    public JobExecutionResult getLatestExecutionResult() {
+        return this.latestExecutionResult;
+    }
+
+
+    protected abstract void testProgram() throws Exception;
+
+    protected void preSubmit() throws Exception {}
+
+    protected void postSubmit() throws Exception {}
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Test entry point
+    // 
--------------------------------------------------------------------------------------------
+
+    // Ignored due to deadlocks in Tez 0.6.1 
(https://s3.amazonaws.com/archive.travis-ci.org/jobs/67848151/log.txt)
+    // TODO Reactivate with future Tez versions
+    @Ignore
+    @Test
+    public void testJob() throws Exception {
+        // pre-submit
+        try {
+            preSubmit();
+        }
+        catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+            Assert.fail("Pre-submit work caused an error: " + e.getMessage());
+        }
+
+        // prepare the test environment
+        LocalTezEnvironment env = LocalTezEnvironment.create();
+        env.setParallelism(degreeOfParallelism);
+        env.setAsContext();
+
+        // call the test program
+        try {
+            testProgram();
+        }
+        catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+            Assert.fail("Error while calling the test program: " + 
e.getMessage());
+        }
+
+        // post-submit
+        try {
+            postSubmit();
+        }
+        catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+            Assert.fail("Post-submit work caused an error: " + e.getMessage());
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
new file mode 100644
index 0000000..35aa54a
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.test;
+
+
+import org.apache.flink.examples.java.relational.WebLogAnalysis;
+import org.apache.flink.test.testdata.WebLogAnalysisData;
+
+public class WebLogAnalysisITCase extends TezProgramTestBase {
+
+    private String docsPath;
+    private String ranksPath;
+    private String visitsPath;
+    private String resultPath;
+
+    @Override
+    protected void preSubmit() throws Exception {
+        docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
+        ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
+        visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
+        resultPath = getTempDirPath("result");
+    }
+
+    @Override
+    protected void postSubmit() throws Exception {
+        compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, 
resultPath);
+    }
+    @Override
+    protected void testProgram() throws Exception {
+        WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, 
resultPath});
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
new file mode 100644
index 0000000..d73aa8b
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.test;
+
+import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WordCountITCase extends TezProgramTestBase {
+
+    protected String textPath;
+    protected String resultPath;
+
+    public WordCountITCase(){
+    }
+
+    @Override
+    protected void preSubmit() throws Exception {
+        textPath = createTempFile("text.txt", WordCountData.TEXT);
+        resultPath = getTempDirPath("result");
+    }
+
+    @Override
+    protected void postSubmit() throws Exception {
+        compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
+    }
+
+    @Override
+    protected void testProgram() throws Exception {
+        WordCount.main(new String[]{textPath, resultPath});
+    }
+}

Reply via email to