kennknowles closed pull request #3677: Tez runner
URL: https://github.com/apache/beam/pull/3677
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/pom.xml b/runners/pom.xml
index 164d1b3a15b..1ad1347b809 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -42,6 +42,7 @@
     <module>google-cloud-dataflow-java</module>
     <module>spark</module>
     <module>apex</module>
+    <module>tez</module>
     <module>gcp</module>
   </modules>
 
diff --git a/runners/tez/pom.xml b/runners/tez/pom.xml
new file mode 100644
index 00000000000..b7d0d6d13db
--- /dev/null
+++ b/runners/tez/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+    </plugins>
+
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+      </resource>
+    </resources>
+
+  </build>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>2.0.0</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-tez</artifactId>
+  <version>2.0.0</version>
+
+  <name>Apache Beam :: Runners :: Tez</name>
+
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <!-- Tez -->
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+      <version>0.8.4</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>2.7.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>2.7.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>0.8.4</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce</artifactId>
+      <version>0.8.4</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+      <version>0.8.4</version>
+    </dependency>
+
+    <!-- Beam -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-harness</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java
new file mode 100644
index 00000000000..8b37b0941ad
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options that configure the Tez pipeline.
+ */
+public interface TezPipelineOptions extends PipelineOptions, 
java.io.Serializable {
+  //TODO: Add options to configure Tez
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java
new file mode 100644
index 00000000000..7d32b473e92
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.tez.translation.TezPipelineTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PipelineRunner} that translates the
+ * pipeline to an Tez DAG and executes it on a Tez cluster.
+ *
+ */
+public class TezRunner extends PipelineRunner<TezRunnerResult>{
+
+  private static final Logger LOG = LoggerFactory.getLogger(TezClient.class);
+
+  private final TezPipelineOptions options;
+
+  private TezRunner(TezPipelineOptions options){
+    this.options = options;
+  }
+
+  public static TezRunner fromOptions(PipelineOptions options) {
+    TezPipelineOptions tezOptions = 
PipelineOptionsValidator.validate(TezPipelineOptions.class,options);
+    return new TezRunner(tezOptions);
+  }
+
+  @Override
+  public TezRunnerResult run(Pipeline pipeline) {
+    //Setup Tez Local Config
+    TezConfiguration config = new TezConfiguration();
+    config.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    config.set("fs.default.name", "file:///");
+    
config.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, 
true);
+    config.set(TezConfiguration.TEZ_TASK_LOG_LEVEL, "DEBUG");
+    //TODO: Support Remote Tez Configuration
+
+    final TezPipelineTranslator translator = new 
TezPipelineTranslator(options, config);
+    final AtomicReference<DAG> tezDAG = new AtomicReference<>();
+    DAG dag = DAG.create(options.getJobName());
+    tezDAG.set(dag);
+    translator.translate(pipeline, dag);
+
+    TezClient client = TezClient.create("TezRun", config);
+    try {
+      client.start();
+      client.submitDAG(dag);
+    } catch (Exception e){
+      e.printStackTrace();
+    }
+
+    return new TezRunnerResult(client);
+  }
+}
\ No newline at end of file
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java
new file mode 100644
index 00000000000..870c43c1d86
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.client.TezClient;
+import org.joda.time.Duration;
+
+/**
+ * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Tez.
+ */
+public class TezRunnerResult implements PipelineResult {
+
+  private final TezClient client;
+  private State state = State.UNKNOWN;
+
+  public TezRunnerResult(TezClient client){
+    this.client = client;
+  }
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public State waitUntilFinish() {
+    return waitUntilFinish(null);
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    long timeout = (duration == null || duration.getMillis() < 1) ? 
Long.MAX_VALUE
+        : System.currentTimeMillis() + duration.getMillis();
+    try {
+      while (client.getAppMasterStatus() != TezAppMasterStatus.SHUTDOWN && 
System.currentTimeMillis() < timeout) {
+        Thread.sleep(500);
+      }
+      if (!client.getAppMasterStatus().equals(TezAppMasterStatus.SHUTDOWN)){
+        return null;
+      }
+      return State.DONE;
+    } catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public State cancel() throws IOException {
+    //TODO: CODE TO CANCEL PIPELINE
+    return state;
+  }
+
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException();
+  }
+
+
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java
new file mode 100644
index 00000000000..f1f5aeece88
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.Flatten;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Flatten} translation to Tez equivalent.
+ */
+class FlattenPCollectionTranslator<T> implements 
TransformTranslator<Flatten.PCollections<T>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlattenPCollectionTranslator.class);
+
+  @Override
+  public void translate(Flatten.PCollections<T> transform, TranslationContext 
context) {
+    //TODO: Translate transform to Tez and add to TranslationContext
+  }
+}
\ No newline at end of file
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java
new file mode 100644
index 00000000000..8f9575287ca
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link GroupByKey} translation to Tez {@link 
org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig}
+ */
+class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, 
V>> {
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByKey.class);
+
+  @Override
+  public void translate(GroupByKey<K, V> transform, TranslationContext 
context) {
+    if (context.getCurrentInputs().size() > 1 ){
+      throw new RuntimeException("Multiple Inputs are not yet supported");
+    } else if (context.getCurrentOutputs().size() > 1){
+      throw new RuntimeException("Multiple Outputs are not yet supported");
+    }
+    PValue input = 
Iterables.getOnlyElement(context.getCurrentInputs().values());
+    PValue output = 
Iterables.getOnlyElement(context.getCurrentOutputs().values());
+    context.addShufflePair(input, output);
+  }
+}
\ No newline at end of file
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java
new file mode 100644
index 00000000000..9ce11e6f253
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.DoFn;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.beam.sdk.transforms.ParDo} translation to Tez {@link 
Vertex}.
+ */
+class ParDoTranslator<InputT, OutputT> implements 
TransformTranslator<MultiOutput<InputT, OutputT>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParDoTranslator.class);
+  private static final String OUTPUT_TAG = "OUTPUT_TAG";
+  private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE";
+
+  @Override
+  public void translate(MultiOutput<InputT, OutputT> transform, 
TranslationContext context) {
+    //Prepare input/output targets
+    if (context.getCurrentInputs().size() > 1){
+      throw new NotImplementedException("Multiple Inputs are not yet 
supported");
+    } else if (context.getCurrentOutputs().size() > 1){
+      throw new NotImplementedException("Multiple Outputs are not yet 
supported");
+    }
+    PValue input = 
Iterables.getOnlyElement(context.getCurrentInputs().values());
+    PValue output = 
Iterables.getOnlyElement(context.getCurrentOutputs().values());
+
+    //Prepare UserPayload Configuration
+    DoFn doFn = transform.getFn();
+    String doFnInstance;
+    try {
+      doFnInstance = TranslatorUtil.toString(doFn);
+    } catch ( IOException e){
+      throw new RuntimeException("DoFn failed to serialize: " + 
e.getMessage());
+    }
+    Configuration config = new Configuration();
+    config.set(OUTPUT_TAG, transform.getMainOutputTag().getId());
+    config.set(DO_FN_INSTANCE_TAG, doFnInstance);
+
+    //Check for shuffle input
+    boolean shuffle = false;
+    for (Pair<PValue, PValue> pair : context.getShuffleSet()){
+      if (pair.getRight().equals(input)){
+        shuffle = true;
+      }
+    }
+
+    //Create Vertex with Payload
+    try {
+      UserPayload payload = TezUtils.createUserPayloadFromConf(config);
+      Vertex vertex;
+      if (shuffle) {
+        vertex = Vertex.create(context.getCurrentName(), 
ProcessorDescriptor.create(TezDoFnProcessor.class.getName()).setUserPayload(payload),
 1);
+        //TODO: add customizable parallelism
+      } else {
+        vertex = Vertex.create(context.getCurrentName(), 
ProcessorDescriptor.create(TezDoFnProcessor.class.getName()).setUserPayload(payload));
+      }
+      context.addVertex(context.getCurrentName(), vertex, input, output);
+    } catch (Exception e){
+      throw new RuntimeException("Vertex Translation Failure from: " + 
e.getMessage());
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java
new file mode 100644
index 00000000000..3192a818984
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Bounded} translation to Tez {@link DataSourceDescriptor}.
+ */
+class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransformTranslator.class);
+
+  @Override
+  public void translate(Bounded<T> transform, TranslationContext context) {
+    //Build datasource and add to datasource map
+    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new 
Configuration(context.getConfig()),
+        TextInputFormat.class, transform.getSource().toString()).build();
+    //TODO: Support Configurable Input Formats
+    context.getCurrentOutputs().forEach( (a, b) -> context.addSource(b, 
dataSource));
+  }
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java
new file mode 100644
index 00000000000..0fde90cabae
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import java.util.LinkedList;
+import org.apache.beam.fn.harness.fake.FakeStepContext;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.tez.translation.io.MROutputManager;
+import org.apache.beam.runners.tez.translation.io.NoOpOutputManager;
+import 
org.apache.beam.runners.tez.translation.io.OrderedPartitionedKVOutputManager;
+import org.apache.beam.runners.tez.translation.io.OutputManagerFactory;
+import org.apache.beam.runners.tez.translation.io.TezOutputManager;
+import org.apache.beam.runners.tez.translation.io.UnorderedKVEdgeOutputManager;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+/**
+ * TezDoFnProcessor is the Tez Wrapper to wrap user defined functions for Tez 
processing
+ * The DoFn is received through the {@link UserPayload} and then run using the 
simple {@link DoFnRunner}
+ */
+public class TezDoFnProcessor extends SimpleProcessor {
+
+  private static final String OUTPUT_TAG = "OUTPUT_TAG";
+  private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE";
+
+  private DoFn<?,?> theDoFn;
+  private String outputTag;
+
+  public TezDoFnProcessor(ProcessorContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() throws Exception {
+    Configuration config = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    outputTag = config.get(OUTPUT_TAG, null);
+    String doFnInstance = config.get(DO_FN_INSTANCE_TAG, null);
+    theDoFn = (DoFn) TranslatorUtil.fromString(doFnInstance);
+    super.initialize();
+  }
+
+  @Override
+  public void run() throws Exception {
+    //Setup Reader
+    KeyValueReader kvReader = null;
+    KeyValuesReader kvsReader = null;
+    LogicalInput input = Iterables.getOnlyElement(getInputs().values());
+    Reader reader = input.getReader();
+    if (reader instanceof KeyValueReader) {
+      kvReader = (KeyValueReader) reader;
+    } else if (reader instanceof KeyValuesReader) {
+      kvsReader = (KeyValuesReader) reader;
+    } else {
+      throw new RuntimeException("UNSUPPORTED READER!");
+    }
+
+    //Setup Writer
+    TezOutputManager outputManager;
+    if (getOutputs().size() == 1){
+      LogicalOutput output = Iterables.getOnlyElement(getOutputs().values());
+      outputManager = OutputManagerFactory.createOutputManager(output);
+      outputManager.before();
+    } else if (getOutputs().size() == 0){
+      outputManager = new NoOpOutputManager();
+    } else {
+      throw new RuntimeException("Multiple outputs not yet supported");
+    }
+
+    //Initialize DoFnRunner
+    DoFnRunner runner = 
DoFnRunners.simpleRunner(PipelineOptionsFactory.create(), theDoFn, 
NullSideInputReader
+        .empty(), outputManager, new TupleTag<>(outputTag), new LinkedList<>(),
+        new FakeStepContext(), WindowingStrategy.globalDefault());
+    runner.startBundle();
+
+    //Start Runner
+    if (kvsReader != null){
+      while (kvsReader.next()){
+        
outputManager.setCurrentElement(WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvsReader.getCurrentKey())));
+        
runner.processElement(WindowedValue.valueInGlobalWindow(KV.of(TranslatorUtil.convertToJavaType(kvsReader.getCurrentKey()),
+            
TranslatorUtil.convertIteratorToJavaType(kvsReader.getCurrentValues()))));
+      }
+    } else if (kvReader != null){
+      while (kvReader.next()){
+        WindowedValue value = 
WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvReader.getCurrentKey()));
+        outputManager.setCurrentElement(value);
+        
runner.processElement(WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvReader.getCurrentValue())));
+      }
+    } else {
+      throw new RuntimeException("UNSUPPORTED READER!");
+    }
+
+    outputManager.after();
+    runner.finishBundle();
+  }
+
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java
new file mode 100644
index 00000000000..7b4646afa84
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TezPipelineTranslator} translates {@link Pipeline} objects
+ * into Tez logical plan {@link DAG}.
+ */
+public class TezPipelineTranslator implements Pipeline.PipelineVisitor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TezPipelineTranslator.class);
+
+  /**
+   * A map from {@link PTransform} subclass to the corresponding
+   * {@link TransformTranslator} to use to translate that transform.
+   */
+  private static final Map<Class<? extends PTransform>, TransformTranslator>
+      transformTranslators = new HashMap<>();
+
+  private static final Map<Class<? extends PTransform>, TransformTranslator>
+      compositeTransformTranslators = new HashMap<>();
+
+  private final TranslationContext translationContext;
+
+  static {
+    registerTransformTranslator(ParDo.MultiOutput.class, new 
ParDoTranslator<>());
+    registerTransformTranslator(GroupByKey.class, new 
GroupByKeyTranslator<>());
+    registerTransformTranslator(Window.Assign.class, new 
WindowAssignTranslator<>());
+    registerTransformTranslator(Read.Bounded.class, new 
ReadBoundedTranslator<>());
+    registerTransformTranslator(Flatten.PCollections.class, new 
FlattenPCollectionTranslator<>());
+    registerTransformTranslator(View.CreatePCollectionView.class, new 
ViewCreatePCollectionViewTranslator<>());
+    registerCompositeTransformTranslator(WriteFiles.class, new 
WriteFilesTranslator());
+  }
+
+  public TezPipelineTranslator(TezPipelineOptions options, TezConfiguration 
config){
+    translationContext = new TranslationContext(options, config);
+  }
+
+  public void translate(Pipeline pipeline, DAG dag) {
+    pipeline.traverseTopologically(this);
+    translationContext.populateDAG(dag);
+  }
+
+  /**
+   * Main visitor method called on each {@link PTransform} to transform them 
to Tez objects.
+   * @param node Pipeline node containing {@link PTransform} to be translated.
+   */
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    LOG.debug("visiting transform {}", node.getTransform());
+    PTransform transform = node.getTransform();
+    TransformTranslator translator = 
transformTranslators.get(transform.getClass());
+    if (translator == null) {
+      throw new UnsupportedOperationException(
+          "no translator registered for " + transform);
+    }
+    translationContext.setCurrentTransform(node);
+    translator.translate(transform, translationContext);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    LOG.debug("entering composite transform {}", node.getTransform());
+    PTransform transform = node.getTransform();
+    if (transform != null){
+      TransformTranslator translator = 
compositeTransformTranslators.get(transform.getClass());
+      if (translator != null) {
+        translationContext.setCurrentTransform(node);
+        translator.translate(transform, translationContext);
+        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+      }
+    }
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(Node node) {
+    LOG.debug("leaving composite transform {}", node.getTransform());
+  }
+
+  @Override
+  public void visitValue(PValue value, Node producer) {
+    LOG.debug("visiting value {}", value);
+  }
+
+  /**
+   * Records that instances of the specified PTransform class
+   * should be translated by default by the corresponding
+   * {@link TransformTranslator}.
+   */
+  private static <TransformT extends PTransform> void 
registerTransformTranslator(
+      Class<TransformT> transformClass, TransformTranslator<? extends 
TransformT> transformTranslator) {
+    if (transformTranslators.put(transformClass, transformTranslator) != null) 
{
+      throw new IllegalArgumentException("defining multiple translators for " 
+ transformClass);
+    }
+  }
+
+  private static <TransformT extends PTransform> void 
registerCompositeTransformTranslator(
+      Class<TransformT> transformClass, TransformTranslator<? extends 
TransformT> transformTranslator) {
+    if (compositeTransformTranslators.put(transformClass, transformTranslator) 
!= null) {
+      throw new IllegalArgumentException("defining multiple translators for " 
+ transformClass);
+    }
+  }
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java
new file mode 100644
index 00000000000..736c84031fb
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Translates {@link PTransform} to Tez functions.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable 
{
+  void translate(T transform, TranslationContext context);
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java
new file mode 100644
index 00000000000..1bffe953dca
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ * Tracks and maintains each individual {@link Vertex} and their {@link Edge} 
connections.
+ */
+public class TranslationContext {
+
+  private final TezPipelineOptions pipelineOptions;
+  private final TezConfiguration config;
+
+  private AppliedPTransform<?, ?, ?> currentTransform;
+  private String currentName;
+  private Map<TupleTag<?>, PValue> currentInputs;
+  private Map<TupleTag<?>, PValue> currentOutputs;
+
+  private Map<String, Vertex> vertexMap = new HashMap<>();
+  private Map<PValue, Vertex> vertexInputMap = new HashMap<>();
+  private Map<PValue, Vertex> vertexOutputMap = new HashMap<>();
+
+  private Set<Pair<PValue, PValue>> shuffleSet = new HashSet<>();
+
+  private Map<PValue, DataSourceDescriptor> sourceMap = new HashMap<>();
+  private Map<PValue, DataSinkDescriptor> sinkMap = new HashMap<>();
+
+  public TranslationContext(TezPipelineOptions options, TezConfiguration 
config){
+    this.pipelineOptions = options;
+    this.config = config;
+  }
+
+  public void setCurrentTransform(TransformHierarchy.Node treeNode) {
+    this.currentTransform = treeNode.toAppliedPTransform();
+    this.currentInputs = treeNode.getInputs();
+    this.currentOutputs = treeNode.getOutputs();
+    this.currentName = treeNode.getFullName();
+  }
+
+  public void addVertex(String name, Vertex vertex, PValue input, PValue 
output) {
+    vertexMap.put(name, vertex);
+    vertexInputMap.put(input, vertex);
+    vertexOutputMap.put(output, vertex);
+  }
+
+  public void addShufflePair(PValue input, PValue output) {
+    shuffleSet.add(Pair.of(input, output));
+  }
+
+  public Set<Pair<PValue, PValue>> getShuffleSet(){
+    return this.shuffleSet;
+  }
+
+  public void addSource(PValue output, DataSourceDescriptor dataSource) {
+    sourceMap.put(output, dataSource);
+  }
+
+  public void addSink(PValue input, DataSinkDescriptor dataSink) {
+    sinkMap.put(input, dataSink);
+  }
+
+  public TezConfiguration getConfig() {
+    return config;
+  }
+
+  public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return currentTransform;
+  }
+
+  public String getCurrentName() {
+    return currentName;
+  }
+
+  public Map<TupleTag<?>, PValue> getCurrentInputs() {
+    return currentInputs;
+  }
+
+  public Map<TupleTag<?>, PValue> getCurrentOutputs() {
+    return currentOutputs;
+  }
+
+  /**
+   * Populates the given Tez dag with the context's {@link Vertex} and {@link 
Edge}.
+   * @param dag Empty Tez dag to be populated.
+   */
+  public void populateDAG(DAG dag){
+
+    for (Vertex v : vertexMap.values()){
+      dag.addVertex(v);
+    }
+
+    //Add Sources
+    sourceMap.forEach( (value, source) -> {
+      Vertex vertex = vertexInputMap.get(value);
+      if (vertex != null){
+        vertex.addDataSource(value.getName(), source);
+      }
+    });
+
+    //Add Sinks
+    sinkMap.forEach( (value, source) -> {
+      Vertex vertex = vertexOutputMap.get(value);
+      if (vertex != null){
+        vertex.addDataSink(value.getName(), source);
+      }
+    });
+
+    //Add Shuffle Edges
+    for (Pair<PValue, PValue> pair : shuffleSet){
+      Vertex inputVertex = vertexOutputMap.get(pair.getLeft());
+      Vertex outputVertex = vertexInputMap.get(pair.getRight());
+      OrderedPartitionedKVEdgeConfig edgeConfig = 
OrderedPartitionedKVEdgeConfig.newBuilder(
+          BytesWritable.class.getName(), BytesWritable.class.getName(), 
HashPartitioner.class.getName()).build();
+      dag.addEdge(Edge.create(inputVertex, outputVertex, 
edgeConfig.createDefaultEdgeProperty()));
+    }
+
+    //Add Edges
+    vertexInputMap.forEach( (PValue inputValue, Vertex inputVertex) -> {
+      vertexOutputMap.forEach( (outputValue, outputVertex) -> {
+        if (inputValue.equals(outputValue)){
+          UnorderedKVEdgeConfig edgeConfig = 
UnorderedKVEdgeConfig.newBuilder(BytesWritable.class.getName(),
+              BytesWritable.class.getName()).build();
+          dag.addEdge(Edge.create(outputVertex, inputVertex, 
edgeConfig.createDefaultOneToOneEdgeProperty()));
+        }
+      });
+    });
+  }
+
+
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java
new file mode 100644
index 00000000000..32b3ad0cbfc
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import org.apache.beam.sdk.transforms.DoFn;
+import java.util.List;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Translator Utilities to convert between hadoop and java types.
+ */
+public class TranslatorUtil {
+
+  /**
+   * Utility to convert java objects to bytes and place them in BytesWritable 
wrapper for hadoop use.
+   * @param element java object to be converted
+   * @return BytesWritable wrapped object
+   */
+  public static Object convertToBytesWritable(Object element) {
+    byte[] bytes;
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutput out = new ObjectOutputStream(bos)) {
+      out.writeObject(element);
+      out.flush();
+      bytes = bos.toByteArray();
+    } catch (Exception e){
+      throw new RuntimeException("Failed to serialize object into byte array: 
" + e.getMessage());
+    }
+    if (bytes != null) {
+      return new BytesWritable(bytes);
+    } else {
+      throw new RuntimeException("Cannot convert null element to 
BytesWritable!");
+    }
+  }
+
+  /**
+   * Utility to convert hadoop objects back to their java equivalent.
+   * @param element hadoop object to be converted
+   * @return original java object
+   */
+  public static Object convertToJavaType(Object element) {
+    Object returnValue;
+    if (element instanceof BytesWritable){
+      BytesWritable myElement = (BytesWritable) element;
+      byte[] data = myElement.getBytes();
+      try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+          ObjectInput in = new ObjectInputStream(bis)) {
+        returnValue = in.readObject();
+      } catch (Exception e){
+        throw new RuntimeException("Failed to deserialize object from byte 
array: " + e.getMessage());
+      }
+    } else if (element instanceof Text) {
+      returnValue = element.toString();
+    } else if (element instanceof BooleanWritable) {
+      returnValue = ((BooleanWritable) element).get();
+    } else if (element instanceof IntWritable){
+      returnValue = ((IntWritable) element).get();
+    } else if (element instanceof DoubleWritable){
+      returnValue = ((DoubleWritable) element).get();
+    } else if (element instanceof FloatWritable){
+      returnValue = ((FloatWritable) element).get();
+    } else if (element instanceof LongWritable){
+      returnValue = ((LongWritable) element).get();
+    } else if (element instanceof ShortWritable){
+      returnValue = ((ShortWritable) element).get();
+    } else if (element instanceof ObjectWritable){
+      returnValue = ((ObjectWritable) element).get();
+    } else {
+      throw new RuntimeException("Hadoop Type " + element.getClass() + " 
cannot be converted to Java!");
+    }
+    return returnValue;
+  }
+
+  /**
+   * Utility to convert hadoop objects within an iterable back to their java 
equivalent.
+   * @param iterable Iterable containing objects to be converted
+   * @return new Iterable with original java objects
+   */
+  static Iterable<Object> convertIteratorToJavaType(Iterable<Object> iterable){
+    List<Object> list = new ArrayList<>();
+    iterable.iterator().forEachRemaining((Object element) -> 
list.add(convertToJavaType(element)));
+    return list;
+  }
+
+  /**
+   * Utility to serialize a serializable object into a string.
+   * @param object that is serializable to be serialized.
+   * @return serialized string
+   * @throws IOException thrown for serialization errors.
+   */
+  public static String toString( Serializable object ) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(object);
+    oos.close();
+    return Base64.getEncoder().encodeToString(baos.toByteArray());
+  }
+
+  /**
+   * Utility to deserialize a string into a serializable object.
+   * @param string containing serialized object.
+   * @return Original object
+   * @throws IOException thrown for serialization errors.
+   * @throws ClassNotFoundException thrown for serialization errors.
+   */
+  public static Object fromString( String string ) throws IOException, 
ClassNotFoundException {
+    byte [] data = Base64.getDecoder().decode(string);
+    ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(data));
+    Object object = ois.readObject();
+    ois.close();
+    return object;
+  }
+
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java
new file mode 100644
index 00000000000..3fbb2968243
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.View;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link View.CreatePCollectionView} translation to Tez equivalent.
+ */
+class ViewCreatePCollectionViewTranslator<ElemT, ViewT> implements
+    TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ViewCreatePCollectionViewTranslator.class);
+
+  @Override
+  public void translate(View.CreatePCollectionView<ElemT, ViewT> transform, 
TranslationContext context) {
+    //TODO: Translate transform to Tez and add to TranslationContext
+  }
+}
\ No newline at end of file
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java
new file mode 100644
index 00000000000..433e5a5b338
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Assign;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Assign} translation to Tez equivalent.
+ */
+class WindowAssignTranslator<T> implements 
TransformTranslator<Window.Assign<T>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindowAssignTranslator.class);
+
+  @Override
+  public void translate(Assign<T> transform, TranslationContext context) {
+    //TODO: Translate transform to Tez and add to TranslationContext
+  }
+}
\ No newline at end of file
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java
new file mode 100644
index 00000000000..312d8aece21
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.mapreduce.output.MROutput;
+
+/**
+ * {@link MROutput} translation to Tez {@link DataSinkDescriptor}.
+ */
+class WriteFilesTranslator implements TransformTranslator<WriteFiles<?>> {
+
+  @Override
+  public void translate(WriteFiles transform, TranslationContext context) {
+    Pattern pattern = Pattern.compile(".*\\{.*\\{value=(.*)}}.*");
+    Matcher matcher = 
pattern.matcher(transform.getSink().getBaseOutputDirectoryProvider().toString());
+    if (matcher.matches()){
+      String output = matcher.group(1);
+      DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new 
Configuration(context.getConfig()),
+          TextOutputFormat.class, output).build();
+
+      context.getCurrentInputs().forEach( (a, b) -> context.addSink(b, 
dataSink));
+    }
+  }
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java
new file mode 100644
index 00000000000..8305e1e28fc
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import java.io.IOException;
+import org.apache.beam.runners.tez.translation.TranslatorUtil;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * {@link TezOutputManager} implementation that properly writes output to 
{@link MROutput}
+ */
+public class MROutputManager extends TezOutputManager {
+
+  private MROutput output;
+
+  public MROutputManager(LogicalOutput output) {
+    super(output);
+    if (output.getClass().equals(MROutput.class)){
+      this.output = (MROutput) output;
+      try {
+        setWriter((KeyValueWriter) output.getWriter());
+      } catch (Exception e) {
+        throw new RuntimeException("Error when retrieving writer for output" + 
e.getMessage());
+      }
+    } else {
+      throw new RuntimeException("Incorrect OutputManager for: " + 
output.getClass());
+    }
+  }
+
+  @Override
+  public void after() {
+    try {
+      output.flush();
+      output.commit();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+    try {
+      getWriter().write(null, 
TranslatorUtil.convertToBytesWritable(output.getValue()));
+    } catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java
new file mode 100644
index 00000000000..725be0a521a
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * {@link TezOutputManager} implementation for when the {@link 
org.apache.tez.dag.api.Vertex} has no output.
+ * Used in cases such as when the ParDo within the Vertex writes the output 
itself.
+ */
+public class NoOpOutputManager extends TezOutputManager {
+
+  public NoOpOutputManager() {
+    super(null);
+  }
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {}
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java
new file mode 100644
index 00000000000..4a652dafe6a
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.runners.tez.translation.TranslatorUtil;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+
+/**
+ * {@link TezOutputManager} implementation that properly writes output to 
{@link OrderedPartitionedKVOutput}
+ */
+public class OrderedPartitionedKVOutputManager extends TezOutputManager {
+
+  private OrderedPartitionedKVOutput output;
+
+  public OrderedPartitionedKVOutputManager(LogicalOutput output) {
+    super(output);
+    if (output.getClass().equals(OrderedPartitionedKVOutput.class)){
+      this.output = (OrderedPartitionedKVOutput) output;
+      try {
+        setWriter((KeyValueWriter) output.getWriter());
+      } catch (Exception e) {
+        throw new RuntimeException("Error when retrieving writer for output" + 
e.getMessage());
+      }
+    } else {
+      throw new RuntimeException("Incorrect OutputManager for: " + 
output.getClass());
+    }
+  }
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+    try {
+      if (output.getValue() instanceof KV) {
+        getWriter().write(TranslatorUtil.convertToBytesWritable(((KV) 
output.getValue()).getKey()),
+            TranslatorUtil.convertToBytesWritable(((KV) 
output.getValue()).getValue()));
+      } else {
+        throw new IllegalArgumentException("GroupByKey can only group 
Key-Value outputs!");
+      }
+    } catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java
new file mode 100644
index 00000000000..807de3f12fd
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+public class OutputManagerFactory {
+  public static TezOutputManager createOutputManager(LogicalOutput output){
+    TezOutputManager outputManager;
+    if (output.getClass().equals(OrderedPartitionedKVOutput.class)){
+      outputManager = new OrderedPartitionedKVOutputManager(output);
+    } else if (output.getClass().equals(UnorderedKVOutput.class)){
+      outputManager = new UnorderedKVEdgeOutputManager(output);
+    } else if (output.getClass().equals(MROutput.class)){
+      outputManager = new MROutputManager(output);
+    } else {
+      throw new RuntimeException("Output type: " + output.getClass() + " is 
unsupported");
+    }
+    return outputManager;
+  }
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java
new file mode 100644
index 00000000000..2999ee53905
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * Abstract Output Manager that adds before and after methods to the {@link 
DoFnRunners.OutputManager}
+ * interface so that outputs that require them can be added and used with the 
TezRunner.
+ */
+public abstract class TezOutputManager implements DoFnRunners.OutputManager {
+
+  private WindowedValue currentElement;
+  private KeyValueWriter writer;
+  private LogicalOutput output;
+
+  public TezOutputManager(LogicalOutput output){
+    this.output = output;
+  }
+
+  public void before() {}
+
+  public void after() {}
+
+  public void setCurrentElement(WindowedValue currentElement) {
+    this.currentElement = currentElement;
+  }
+
+  public WindowedValue getCurrentElement(){
+    return currentElement;
+  }
+
+  public void setWriter(KeyValueWriter writer) {
+    this.writer = writer;
+  }
+
+  public KeyValueWriter getWriter() {
+    return writer;
+  }
+
+  public LogicalOutput getOutput() {
+    return output;
+  }
+}
diff --git 
a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java
 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java
new file mode 100644
index 00000000000..34cb371142d
--- /dev/null
+++ 
b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.runners.tez.translation.TranslatorUtil;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+/**
+ * {@link TezOutputManager} implementation that properly writes output to 
{@link UnorderedKVOutput}
+ */
+public class UnorderedKVEdgeOutputManager extends TezOutputManager {
+
+  private UnorderedKVOutput output;
+
+  public UnorderedKVEdgeOutputManager(LogicalOutput output) {
+    super(output);
+    if (output.getClass().equals(UnorderedKVOutput.class)){
+      this.output = (UnorderedKVOutput) output;
+      try {
+        setWriter((KeyValueWriter) output.getWriter());
+      } catch (Exception e) {
+        throw new RuntimeException("Error when retrieving writer for output" + 
e.getMessage());
+      }
+    } else {
+      throw new RuntimeException("Incorrect OutputManager for: " + 
output.getClass());
+    }
+  }
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+    try {
+      
getWriter().write(TranslatorUtil.convertToBytesWritable(getCurrentElement().getValue()),
+          TranslatorUtil.convertToBytesWritable(output.getValue()));
+    } catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java
new file mode 100644
index 00000000000..66e9f191ba0
--- /dev/null
+++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the Tez runner.
+ */
+public class TezRunnerTest {
+
+  private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
+  private static final String INPUT_LOCATION = 
"src/test/resources/test_input.txt";
+
+  private static Pipeline tezPipeline;
+  private static Pipeline directPipeline;
+
+  @Before
+  public void setupPipelines(){
+    //TezRunner Pipeline
+    PipelineOptions tezOptions = PipelineOptionsFactory.create();
+    tezOptions.setRunner(TezRunner.class);
+    tezPipeline = Pipeline.create(tezOptions);
+
+    //DirectRunner Pipeline
+    PipelineOptions options = PipelineOptionsFactory.create();
+    directPipeline = Pipeline.create(options);
+  }
+
+  @Test
+  public void simpleTest() throws Exception {
+    tezPipeline.apply(TextIO.read().from(INPUT_LOCATION))
+        .apply(ParDo.of(new AddHelloWorld()))
+        .apply(ParDo.of(new TestTezFn()));
+
+    directPipeline.apply(TextIO.read().from(INPUT_LOCATION))
+        .apply(ParDo.of(new AddHelloWorld()))
+        .apply(ParDo.of(new TestDirectFn()));
+
+    tezPipeline.run().waitUntilFinish();
+    directPipeline.run().waitUntilFinish();
+    Assert.assertEquals(TestDirectFn.RESULTS, TestTezFn.RESULTS);
+  }
+
+  @Test
+  public void wordCountTest() throws Exception {
+    tezPipeline.apply("ONE", TextIO.read().from(INPUT_LOCATION))
+        .apply("TWO", ParDo.of(new TokenDoFn()))
+        .apply("THREE", GroupByKey.create())
+        .apply("FOUR", ParDo.of(new ProcessDoFn()))
+        .apply("FIVE", ParDo.of(new TestTezFn()));
+
+    directPipeline.apply("ONE", TextIO.read().from(INPUT_LOCATION))
+        .apply("TWO", ParDo.of(new TokenDoFn()))
+        .apply("THREE", GroupByKey.create())
+        .apply("FOUR", ParDo.of(new ProcessDoFn()))
+        .apply("FIVE", ParDo.of(new TestDirectFn()));
+
+    tezPipeline.run().waitUntilFinish();
+    directPipeline.run().waitUntilFinish();
+    Assert.assertEquals(TestDirectFn.RESULTS, TestTezFn.RESULTS);
+  }
+
+  private static class AddHelloWorld extends DoFn<String, String>{
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+
+      // Split the line into words.
+      String[] words = c.element().split(TOKENIZER_PATTERN);
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word + " HelloWorld");
+        }
+      }
+    }
+  }
+
+  public static class TokenDoFn extends DoFn<String, KV<String, Integer>>{
+    @ProcessElement
+    public void processElement(ProcessContext c){
+      for( String word : c.element().split(TOKENIZER_PATTERN)){
+        if(!word.isEmpty()){
+          c.output(KV.of(word, 1));
+        }
+      }
+    }
+  }
+
+  public static class ProcessDoFn extends DoFn<KV<String,Iterable<Integer>>, 
String>{
+    @ProcessElement
+    public void processElement(ProcessContext c){
+      Integer sum = 0;
+      for( Integer integer : c.element().getValue()){
+        sum = sum + integer;
+      }
+      c.output(c.element().getKey() + ": " + sum);
+    }
+  }
+
+  private static class TestTezFn extends DoFn<String, String> {
+    private static final Set<String> RESULTS = Collections.synchronizedSet(new 
HashSet<>());
+
+    public TestTezFn(){
+      RESULTS.clear();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      RESULTS.add(c.element());
+    }
+  }
+
+  private static class TestDirectFn extends DoFn<String, String> {
+    private static final Set<String> RESULTS = Collections.synchronizedSet(new 
HashSet<>());
+
+    public TestDirectFn(){
+      RESULTS.clear();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      RESULTS.add(c.element());
+    }
+  }
+
+}
diff --git 
a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java
 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java
new file mode 100644
index 00000000000..df3532def41
--- /dev/null
+++ 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.runners.tez.TezRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the ParDoTranslator class
+ */
+public class ParDoTranslatorTest {
+
+  private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE";
+  private static final String TEST_TAG = "TestName";
+  private static TransformHierarchy hierarchy;
+  private static PValue pvalue;
+  private static DAG dag;
+  private static TranslationContext context;
+  private static ParDoTranslator translator;
+
+  @Test
+  public void testParDoTranslation() throws Exception {
+    MultiOutput parDo = ParDo.of(new TestDoFn()).withOutputTags(new 
TupleTag<>(), TupleTagList.of(new TupleTag<String>()));
+    Node node = hierarchy.pushNode(TEST_TAG, pvalue, parDo);
+    hierarchy.setOutput(pvalue);
+    context.setCurrentTransform(node);
+    translator.translate(parDo, context);
+    context.populateDAG(dag);
+    Vertex vertex = Iterables.getOnlyElement(dag.getVertices());
+    Configuration config = 
TezUtils.createConfFromUserPayload(vertex.getProcessorDescriptor().getUserPayload());
+    String doFnString = config.get(DO_FN_INSTANCE_TAG);
+    DoFn doFn = (DoFn) TranslatorUtil.fromString(doFnString);
+
+    Assert.assertEquals(vertex.getProcessorDescriptor().getClassName(), 
TezDoFnProcessor.class.getName());
+    Assert.assertEquals(doFn.getClass(), TestDoFn.class);
+  }
+
+  @Before
+  public void setupTest(){
+    dag = DAG.create(TEST_TAG);
+    translator = new ParDoTranslator();
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(TezRunner.class);
+    TezPipelineOptions tezOptions = 
PipelineOptionsValidator.validate(TezPipelineOptions.class, options);
+    context = new TranslationContext(tezOptions, new TezConfiguration());
+    hierarchy = new TransformHierarchy(Pipeline.create());
+    PValue innerValue = new PValueBase() {
+      @Override
+      public String getName() {return null;}
+    };
+    pvalue = new PValue() {
+      @Override
+      public String getName() {return null;}
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        Map<TupleTag<?>, PValue> map = new HashMap<>();
+        map.put(new TupleTag<>(), innerValue);
+        return map;
+      }
+
+      @Override
+      public void finishSpecifying(PInput upstreamInput, PTransform<?, ?> 
upstreamTransform) {}
+
+      @Override
+      public Pipeline getPipeline() {return null;}
+
+      @Override
+      public void finishSpecifyingOutput(String transformName, PInput input, 
PTransform<?, ?> transform) {}
+    };
+  }
+
+  private static class TestDoFn extends DoFn<String, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      //Test DoFn
+    }
+  }
+}
diff --git 
a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java
 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java
new file mode 100644
index 00000000000..cfd8e41bf8a
--- /dev/null
+++ 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the TezDoFnProcessor that wraps beam DoFns.
+ */
+public class TezDoFnProcessorTest {
+
+  private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
+  private static final String INPUT_LOCATION = 
"src/test/resources/test_input.txt";
+  private static final DAG dag = DAG.create("TestDag");
+  private static TezClient client;
+
+  @Before
+  public void setUp(){
+    TezConfiguration config = new TezConfiguration();
+    config.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    config.set("fs.default.name", "file:///");
+    
config.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, 
true);
+    config.set(TezConfiguration.TEZ_TASK_LOG_LEVEL, "DEBUG");
+    client = TezClient.create("TezClient", config);
+  }
+
+  @Test
+  public void testDoFn() throws Exception {
+    String expected = FileUtils.readFileToString(new File(INPUT_LOCATION));
+    Set<String> expectedSet = new 
HashSet<>(Arrays.asList(expected.split(TOKENIZER_PATTERN)));
+
+    DoFn<?,?> doFn = new TestWordsFn();
+    String doFnInstance;
+    doFnInstance = TranslatorUtil.toString(doFn);
+
+    Configuration config = new Configuration();
+    config.set("OUTPUT_TAG", new TupleTag<>().getId());
+    config.set("DO_FN_INSTANCE", doFnInstance);
+    UserPayload payload = TezUtils.createUserPayloadFromConf(config);
+
+    Vertex vertex = Vertex.create("TestVertex", ProcessorDescriptor
+        .create(TezDoFnProcessor.class.getName()).setUserPayload(payload));
+    vertex.addDataSource("TestInput" , MRInput.createConfigBuilder(new 
Configuration(),
+        TextInputFormat.class, INPUT_LOCATION).build());
+
+    dag.addVertex(vertex);
+    client.start();
+    client.submitDAG(dag);
+    while (client.getAppMasterStatus() != TezAppMasterStatus.SHUTDOWN){}
+
+    Assert.assertEquals(expectedSet, TestWordsFn.RESULTS);
+  }
+
+  private static class TestWordsFn extends DoFn<String, String> {
+    private static final Set<String> RESULTS = Collections.synchronizedSet(new 
HashSet<>());
+
+    public TestWordsFn(){
+      RESULTS.clear();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+
+      // Split the line into words.
+      String[] words = c.element().split(TOKENIZER_PATTERN);
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          RESULTS.add(word);
+        }
+      }
+    }
+  }
+
+}
diff --git 
a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java
 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java
new file mode 100644
index 00000000000..2efca6a9dd0
--- /dev/null
+++ 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.runners.tez.TezRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the TranslationContext class
+ */
+public class TranslationContextTest {
+
+  private static final String TEST_SOURCE = "Test.txt";
+  private static final String DAG_NAME = "TestDag";
+  private static final String VERTEX1_NAME = "TestVertex1";
+  private static final String VERTEX2_NAME = "TestVertex2";
+  private static final String PVALUE_NAME = "TestPValue";
+
+  private static PValue value1;
+  private static PValue value2;
+  private static PValue value3;
+  private static TranslationContext context;
+  private static DAG dag;
+
+  @Before
+  public void setUp() {
+    dag = DAG.create(DAG_NAME);
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(TezRunner.class);
+    TezPipelineOptions tezOptions = 
PipelineOptionsValidator.validate(TezPipelineOptions.class, options);
+    context = new TranslationContext(tezOptions, new TezConfiguration());
+    value1 = new PValueBase() {
+      @Override
+      public String getName() {
+        return PVALUE_NAME;
+      }
+    };
+    value2 = new PValueBase() {
+      @Override
+      public String getName() {
+        return PVALUE_NAME;
+      }
+    };
+    value3 = new PValueBase() {
+      @Override
+      public String getName() {
+        return PVALUE_NAME;
+      }
+    };
+  }
+
+  @Test
+  public void testVertexConnect() throws Exception {
+    Vertex vertex1 = Vertex.create(VERTEX1_NAME, 
ProcessorDescriptor.create(TezDoFnProcessor.class.getName()));
+    Vertex vertex2 = Vertex.create(VERTEX2_NAME, 
ProcessorDescriptor.create(TezDoFnProcessor.class.getName()));
+    context.addVertex(VERTEX1_NAME, vertex1, value1, value2);
+    context.addVertex(VERTEX2_NAME, vertex2, value2, value3);
+    context.populateDAG(dag);
+    Vertex vertex1Output = 
Iterables.getOnlyElement(dag.getVertex(VERTEX1_NAME).getOutputVertices());
+    Vertex vertex2Input = 
Iterables.getOnlyElement(dag.getVertex(VERTEX2_NAME).getInputVertices());
+
+    Assert.assertEquals(vertex2, vertex1Output);
+    Assert.assertEquals(vertex1, vertex2Input);
+  }
+
+  @Test
+  public void testDataSourceConnect() throws Exception {
+    Vertex vertex1 = Vertex.create(VERTEX1_NAME, 
ProcessorDescriptor.create(TezDoFnProcessor.class.getName()));
+    context.addVertex(VERTEX1_NAME, vertex1, value1, value2);
+    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new 
Configuration(context.getConfig()),
+        TextInputFormat.class, 
TEST_SOURCE).groupSplits(true).generateSplitsInAM(true).build();
+    context.addSource(value1, dataSource);
+    context.populateDAG(dag);
+    DataSourceDescriptor vertex1Source = 
Iterables.getOnlyElement(dag.getVertex(VERTEX1_NAME).getDataSources());
+
+    Assert.assertEquals(dataSource, vertex1Source);
+  }
+}
diff --git 
a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java
 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java
new file mode 100644
index 00000000000..b2bb5fc0656
--- /dev/null
+++ 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the Tez TranslatorUtil class
+ */
+public class TranslatorUtilTest {
+
+  @Test
+  public void testDoFnSerialization() throws Exception {
+    DoFn doFn = new testDoFn();
+    String doFnString = TranslatorUtil.toString(doFn);
+    DoFn newDoFn = (DoFn) TranslatorUtil.fromString(doFnString);
+    Assert.assertEquals(newDoFn.getClass(), doFn.getClass());
+  }
+
+  private static class testDoFn extends DoFn {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      //Test DoFn
+    }
+  }
+}
diff --git a/runners/tez/src/test/resources/test_input.txt 
b/runners/tez/src/test/resources/test_input.txt
new file mode 100644
index 00000000000..383be420a8d
--- /dev/null
+++ b/runners/tez/src/test/resources/test_input.txt
@@ -0,0 +1,2 @@
+This is an example of splitting up lines.
+Then formatting those lines into individual tokens.
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to