[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-06-25 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r445938844



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by 
first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 
cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner {

Review comment:
   Renamed it to Twister2Runner





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-06-23 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r444652797



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToWindowFunctionPrimitive.java
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.comms.structs.Tuple;
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.runners.twister2.utils.TranslationUtils;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+
+/** ByteToWindow function. */
+public class ByteToWindowFunctionPrimitive
+implements MapFunc>, Tuple> {
+  private transient Coder keyCoder;
+  private transient WindowedValueCoder wvCoder;
+  private static final Logger LOG = 
Logger.getLogger(ByteToWindowFunctionPrimitive.class.getName());
+
+  private transient boolean isInitialized = false;
+  private byte[] keyCoderBytes;
+  private byte[] wvCoderBytes;
+
+  public ByteToWindowFunctionPrimitive() {
+// non arg constructor needed for kryo
+isInitialized = false;
+  }
+
+  public ByteToWindowFunctionPrimitive(
+  final Coder inputKeyCoder, final WindowedValueCoder wvCoder) {
+this.keyCoder = inputKeyCoder;
+this.wvCoder = wvCoder;
+
+keyCoderBytes = SerializableUtils.serializeToByteArray(keyCoder);
+wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+  }
+
+  @Override
+  public WindowedValue> map(Tuple input) {
+K key = null;
+WindowedValue value = null;
+try {
+  key = CoderUtils.decodeFromByteArray(keyCoder, input.getKey());
+
+  value = TranslationUtils.fromByteArray(input.getValue(), wvCoder);
+} catch (CoderException e) {
+  LOG.info(e.getMessage());
+}
+WindowedValue> element;
+if (value != null) {
+  element =

Review comment:
   Corrected that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-06-23 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r444652734



##
File path: runners/twister2/build.gradle
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module' }
+
+applyJavaNature(automaticModuleName: 'org.apache.beam.runners.twister2')
+evaluationDependsOn(":sdks:java:core")
+configurations {
+validatesRunner
+}
+description = "Apache Beam :: Runners :: Twister2"
+
+repositories {
+mavenLocal()
+}
+
+dependencies {
+compile library.java.vendored_guava_26_0_jre
+
+compile (project(path: ":runners:core-java")){
+exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
+}
+compile project(":runners:java-fn-execution")
+compile "org.twister2:api-java:0.6.0"

Review comment:
   Done, thank you for pointing that out





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-06-23 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r444623178



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by 
first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 
cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner {

Review comment:
   Yes, that was the intention of using the legacy part, I can rename it if 
needed? what do you think would be the best approach?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-22 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r429283778



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/package-info.java
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Internal implementation of the Beam runner for Apache Twister2. */

Review comment:
   Fixed it, it is good that you caught that, that would go unfixed for a 
long time otherwise





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428399477



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/BeamBatchWorker.java
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.tset.TBase;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.tset.TBaseGraph;
+import edu.iu.dsc.tws.tset.env.BatchTSetEnvironment;
+import edu.iu.dsc.tws.tset.links.BaseTLink;
+import edu.iu.dsc.tws.tset.sets.BaseTSet;
+import edu.iu.dsc.tws.tset.sets.BuildableTSet;
+import edu.iu.dsc.tws.tset.sets.batch.CachedTSet;
+import edu.iu.dsc.tws.tset.sets.batch.ComputeTSet;
+import edu.iu.dsc.tws.tset.sets.batch.SinkTSet;
+import edu.iu.dsc.tws.tset.worker.BatchTSetIWorker;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.twister2.translators.functions.DoFnFunction;
+import 
org.apache.beam.runners.twister2.translators.functions.Twister2SinkFunction;
+
+/**
+ * The Twister2 worker that will execute the job logic once the job is 
submitted from the run
+ * method.
+ */
+public class BeamBatchWorker implements Serializable, BatchTSetIWorker {
+
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  private HashMap> sideInputDataSets;
+  private Set leaves;

Review comment:
   I might have to take a little time on this and make sure i get the type 
information correct. Would it be OK to create improvement issue for this for 
now, so i can get back to it later after the code is merged?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428399112



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+
+/** Represents a Twister2 pipeline execution result. */
+public class Twister2PipelineResult implements PipelineResult {
+
+  PipelineResult.State state = State.RUNNING;
+
+  @Override
+  public State getState() {
+return state;
+  }
+
+  @Override
+  public State cancel() throws IOException {
+throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+return State.DONE;

Review comment:
   Those operations are not supported currently, I missed to mark them as 
such. Fixed them





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428397620



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by 
first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 
cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {

Review comment:
   Changed to protected 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428385521



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/AssignWindowsFunction.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.ComputeCollectorFunc;
+import edu.iu.dsc.tws.api.tset.fn.RecordCollector;
+import java.io.ObjectStreamException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.logging.Logger;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.twister2.utils.Twister2AssignContext;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import 
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+
+/** Assign Windows function. */
+public class AssignWindowsFunction
+implements ComputeCollectorFunc, 
Iterator>> {
+  private static final Logger LOG = 
Logger.getLogger(AssignWindowsFunction.class.getName());
+
+  private transient boolean isInitilized = false;

Review comment:
   Thank you for catching that, fixed all the occurrences 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428384829



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/package-info.java
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */

Review comment:
   wonder how i missed that, i was sure i searched for spark and flink 
:smile: . Fixed all the instances





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428384075



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2StreamTranslationContext.java
##
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+/** Twister2StreamTranslationContext. */
+public class Twister2StreamTranslationContext {}

Review comment:
   Yes it is :smile: , there was a duplicate class fixed it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428381476



##
File path: runners/twister2/build.gradle
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module' }
+
+applyJavaNature(automaticModuleName: 'org.apache.beam.runners.twister2')
+evaluationDependsOn(":sdks:java:core")
+configurations {
+validatesRunner
+}
+description = "Apache Beam :: Runners :: Twister2"
+
+repositories {

Review comment:
   Thanks for catching that, removed it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428380317



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by 
first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 
cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {
+this.options = options;
+  }
+
+  public static Twister2LegacyRunner fromOptions(PipelineOptions options) {
+Twister2PipelineOptions pipelineOptions =
+PipelineOptionsValidator.validate(Twister2PipelineOptions.class, 
options);
+if (pipelineOptions.getFilesToStage() == null) {
+  pipelineOptions.setFilesToStage(
+  detectClassPathResourcesToStage(
+  Twister2LegacyRunner.class.getClassLoader(), pipelineOptions));
+  LOG.info(
+  "PipelineOptions.filesToStage was not specified. "
+  + "Defaulting to files from the classpath: will stage {} files. "
+  + "Enable logging at DEBUG level to see which files will be 
staged"
+  + pipelineOptions.getFilesToStage().size());
+}
+return new Twister2LegacyRunner(pipelineOptions);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+// create a worker and pass in the pipeline and then do the translation
+Twister2PipelineExecutionEnvironment env = new 
Twister2PipelineExecutionEnvironment(options);
+LOG.info("Translating pipeline to 

[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428378347



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import edu.iu.dsc.tws.tset.env.TSetEnvironment;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/** Twister2PipelineOptions. */
+public interface Twister2PipelineOptions extends PipelineOptions, 
StreamingOptions {
+  @Description("set unique application name for Twister2 runner")

Review comment:
   Was not aware of that convention, thanks for pointing that out. 
reformatted according this rule





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428376950



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import edu.iu.dsc.tws.tset.env.TSetEnvironment;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+
+/** Twister2PipelineOptions. */
+public interface Twister2PipelineOptions extends PipelineOptions, 
StreamingOptions {
+  @Description("set unique application name for Twister2 runner")
+  void setApplicationName(String name);
+
+  String getApplicationName();

Review comment:
   I initially added them for completeness, but ended up not using some of 
them. removed the unused options since they can be added if needed later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428335678



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by 
first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 
cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {
+this.options = options;
+  }
+
+  public static Twister2LegacyRunner fromOptions(PipelineOptions options) {
+Twister2PipelineOptions pipelineOptions =
+PipelineOptionsValidator.validate(Twister2PipelineOptions.class, 
options);
+if (pipelineOptions.getFilesToStage() == null) {
+  pipelineOptions.setFilesToStage(
+  detectClassPathResourcesToStage(
+  Twister2LegacyRunner.class.getClassLoader(), pipelineOptions));
+  LOG.info(
+  "PipelineOptions.filesToStage was not specified. "
+  + "Defaulting to files from the classpath: will stage {} files. "
+  + "Enable logging at DEBUG level to see which files will be 
staged"
+  + pipelineOptions.getFilesToStage().size());
+}
+return new Twister2LegacyRunner(pipelineOptions);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+// create a worker and pass in the pipeline and then do the translation
+Twister2PipelineExecutionEnvironment env = new 
Twister2PipelineExecutionEnvironment(options);
+LOG.info("Translating pipeline to 

[GitHub] [beam] pulasthi commented on a change in pull request #10888: [BEAM-7304] Twister2 Beam runner

2020-05-20 Thread GitBox


pulasthi commented on a change in pull request #10888:
URL: https://github.com/apache/beam/pull/10888#discussion_r428226234



##
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2LegacyRunner.java
##
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2;
+
+import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+
+import edu.iu.dsc.tws.api.JobConfig;
+import edu.iu.dsc.tws.api.Twister2Job;
+import edu.iu.dsc.tws.api.config.Config;
+import edu.iu.dsc.tws.api.driver.DriverJobState;
+import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
+import edu.iu.dsc.tws.api.tset.sets.TSet;
+import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
+import edu.iu.dsc.tws.local.LocalSubmitter;
+import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
+import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the pipeline by 
first translating them
+ * to a Twister2 Plan and then executing them either locally or on a Twister2 
cluster, depending on
+ * the configuration.
+ */
+public class Twister2LegacyRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
Logger.getLogger(Twister2LegacyRunner.class.getName());
+  private static final String SIDEINPUTS = "sideInputs";
+  private static final String LEAVES = "leaves";
+  private static final String GRAPH = "graph";
+  /** Provided options. */
+  private final Twister2PipelineOptions options;
+
+  public Twister2LegacyRunner(Twister2PipelineOptions options) {
+this.options = options;
+  }
+
+  public static Twister2LegacyRunner fromOptions(PipelineOptions options) {
+Twister2PipelineOptions pipelineOptions =
+PipelineOptionsValidator.validate(Twister2PipelineOptions.class, 
options);
+if (pipelineOptions.getFilesToStage() == null) {
+  pipelineOptions.setFilesToStage(
+  detectClassPathResourcesToStage(
+  Twister2LegacyRunner.class.getClassLoader(), pipelineOptions));
+  LOG.info(
+  "PipelineOptions.filesToStage was not specified. "
+  + "Defaulting to files from the classpath: will stage {} files. "
+  + "Enable logging at DEBUG level to see which files will be 
staged"
+  + pipelineOptions.getFilesToStage().size());
+}
+return new Twister2LegacyRunner(pipelineOptions);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+// create a worker and pass in the pipeline and then do the translation
+Twister2PipelineExecutionEnvironment env = new 
Twister2PipelineExecutionEnvironment(options);
+LOG.info("Translating pipeline to