[ 
https://issues.apache.org/jira/browse/BEAM-4575?focusedWorklogId=113533&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113533
 ]

ASF GitHub Bot logged work on BEAM-4575:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Jun/18 03:18
            Start Date: 20/Jun/18 03:18
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5673: [BEAM-4575] 
Cleanly transform graph from Calcite to Beam SQL
URL: https://github.com/apache/beam/pull/5673
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index bb06d72a33c..eb32d2a9a36 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -22,9 +22,9 @@
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.ParseException;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.PCollectionTuple;
 
 /** {@link BeamSqlCli} provides methods to execute Beam SQL with an 
interactive client. */
 @Experimental
@@ -59,7 +59,7 @@ public void execute(String sqlString) throws ParseException {
           
BeamEnumerableConverter.createPipelineOptions(env.getPipelineOptions());
       options.setJobName("BeamPlanCreator");
       Pipeline pipeline = Pipeline.create(options);
-      PCollectionTuple.empty(pipeline).apply(env.parseQuery(sqlString));
+      BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery(sqlString));
       pipeline.run();
     }
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 8bfecd5381e..7f849d4e70f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -18,20 +18,19 @@
 
 package org.apache.beam.sdk.extensions.sql;
 
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 
 /** This interface defines a Beam Sql Table. */
 public interface BeamSqlTable {
-  /** create a {@code PCollection<BeamSqlRow>} from source. */
-  PCollection<Row> buildIOReader(Pipeline pipeline);
+  /** create a {@code PCollection<Row>} from source. */
+  PCollection<Row> buildIOReader(PBegin begin);
 
   /** create a {@code IO.write()} instance to write to target. */
-  PTransform<? super PCollection<Row>, POutput> buildIOWriter();
+  POutput buildIOWriter(PCollection<Row> input);
 
   /** Get the schema info of the table. */
   Schema getSchema();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index 20d152b67d2..2bee537f90d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -28,6 +28,7 @@
 import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -92,7 +93,7 @@
 
     registerFunctions(sqlEnv);
 
-    return 
PCollectionTuple.empty(input.getPipeline()).apply(sqlEnv.parseQuery(queryString()));
+    return BeamSqlRelUtils.toPCollection(input.getPipeline(), 
sqlEnv.parseQuery(queryString()));
   }
 
   private Map<String, BeamSqlTable> toTableMap(PInput inputs) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 940f0483464..1aca83bac0b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -23,15 +23,12 @@
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.jdbc.CalciteSchema;
@@ -103,10 +100,9 @@ public void registerUdaf(String functionName, 
Combine.CombineFn combineFn) {
     defaultSchema.add(functionName, new UdafImpl(combineFn));
   }
 
-  public PTransform<PCollectionTuple, PCollection<Row>> parseQuery(String 
query)
-      throws ParseException {
+  public BeamRelNode parseQuery(String query) throws ParseException {
     try {
-      return planner.convertToBeamRel(query).toPTransform();
+      return planner.convertToBeamRel(query);
     } catch (ValidationException | RelConversionException | SqlParseException 
e) {
       throw new ParseException(String.format("Unable to parse query %s", 
query), e);
     }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index b0b978ff018..b5156e9e161 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -39,7 +39,7 @@
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.calcite.plan.RelOptCluster;
@@ -73,24 +73,20 @@ public BeamAggregationRel(
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
     return new Transform();
   }
 
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+  private class Transform extends PTransform<PInput, PCollection<Row>> {
 
     @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      RelNode input = getInput();
-      String stageName = BeamSqlRelUtils.getStageName(BeamAggregationRel.this) 
+ "_";
-
-      PCollection<Row> upstream =
-          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
+    public PCollection<Row> expand(PInput pinput) {
+      PCollection<Row> upstream = (PCollection<Row>) pinput;
       if (windowField.isPresent()) {
         upstream =
             upstream
                 .apply(
-                    stageName + "assignEventTimestamp",
+                    "assignEventTimestamp",
                     WithTimestamps.of(
                             new 
BeamAggregationTransforms.WindowTimestampFn(windowFieldIndex))
                         .withAllowedTimestampSkew(new 
Duration(Long.MAX_VALUE)))
@@ -99,7 +95,7 @@ public BeamAggregationRel(
 
       PCollection<Row> windowedStream =
           windowField.isPresent()
-              ? upstream.apply(stageName + "window", 
Window.into(windowField.get().windowFn()))
+              ? upstream.apply(Window.into(windowField.get().windowFn()))
               : upstream;
 
       validateWindowIsSupported(windowedStream);
@@ -109,7 +105,7 @@ public BeamAggregationRel(
       PCollection<KV<Row, Row>> exCombineByStream =
           windowedStream
               .apply(
-                  stageName + "exCombineBy",
+                  "exCombineBy",
                   WithKeys.of(
                       new BeamAggregationTransforms.AggregationGroupByKeyFn(
                           keySchema, windowFieldIndex, groupSet)))
@@ -120,7 +116,7 @@ public BeamAggregationRel(
       PCollection<KV<Row, Row>> aggregatedStream =
           exCombineByStream
               .apply(
-                  stageName + "combineBy",
+                  "combineBy",
                   Combine.perKey(
                       new BeamAggregationTransforms.AggregationAdaptor(
                           getNamedAggCalls(), 
CalciteUtils.toBeamSchema(input.getRowType()))))
@@ -128,7 +124,7 @@ public BeamAggregationRel(
 
       PCollection<Row> mergedStream =
           aggregatedStream.apply(
-              stageName + "mergeRecord",
+              "mergeRecord",
               ParDo.of(
                   new BeamAggregationTransforms.MergeAggregationRecord(
                       CalciteUtils.toBeamSchema(getRowType()), 
windowFieldIndex)));
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index b0c9e57b787..5488460a15d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -33,7 +33,7 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -54,25 +54,20 @@ public Calc copy(RelTraitSet traitSet, RelNode input, 
RexProgram program) {
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
     return new Transform();
   }
 
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+  private class Transform extends PTransform<PInput, PCollection<Row>> {
 
     @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      RelNode input = getInput();
-      String stageName = BeamSqlRelUtils.getStageName(BeamCalcRel.this);
-
-      PCollection<Row> upstream =
-          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
+    public PCollection<Row> expand(PInput pinput) {
+      PCollection<Row> upstream = (PCollection<Row>) pinput;
 
       BeamSqlExpressionExecutor executor = new 
BeamSqlFnExecutor(BeamCalcRel.this.getProgram());
 
       PCollection<Row> projectStream =
-          upstream.apply(
-              stageName, ParDo.of(new CalcFn(executor, 
CalciteUtils.toBeamSchema(rowType))));
+          upstream.apply(ParDo.of(new CalcFn(executor, 
CalciteUtils.toBeamSchema(rowType))));
       
projectStream.setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
 
       return projectStream;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 32e4564d408..2fbcb5b1e72 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -37,7 +37,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.adapter.enumerable.EnumerableRel;
 import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
@@ -119,7 +118,7 @@ public static PipelineOptions 
createPipelineOptions(Map<String, String> map) {
   private static PipelineResult run(
       PipelineOptions options, BeamRelNode node, DoFn<Row, Void> doFn) {
     Pipeline pipeline = Pipeline.create(options);
-    
PCollectionTuple.empty(pipeline).apply(node.toPTransform()).apply(ParDo.of(doFn));
+    BeamSqlRelUtils.toPCollection(pipeline, node).apply(ParDo.of(doFn));
     PipelineResult result = pipeline.run();
     result.waitUntilFinish();
     return result;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index 3989dc5cf3c..fe33c704b87 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -22,7 +22,7 @@
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -98,27 +98,19 @@ public void register(RelOptPlanner planner) {
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
     return new Transform();
   }
 
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+  private class Transform extends PTransform<PInput, PCollection<Row>> {
 
-    /**
-     * Note that {@code BeamIOSinkRel} returns the input PCollection, which is 
the persisted
-     * PCollection.
-     */
     @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      RelNode input = getInput();
-      String stageName = BeamSqlRelUtils.getStageName(BeamIOSinkRel.this);
+    public PCollection<Row> expand(PInput pinput) {
+      PCollection<Row> input = (PCollection<Row>) pinput;
 
-      PCollection<Row> upstream =
-          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
+      sqlTable.buildIOWriter(input);
 
-      upstream.apply(stageName, sqlTable.buildIOWriter());
-
-      return upstream;
+      return input;
     }
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 3a478a8e80e..538e56a26af 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -20,8 +20,9 @@
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
@@ -44,15 +45,15 @@ public BeamIOSourceRel(
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
     return new Transform();
   }
 
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+  private class Transform extends PTransform<PInput, PCollection<Row>> {
 
     @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      return sqlTable.buildIOReader(inputPCollections.getPipeline());
+    public PCollection<Row> expand(PInput input) {
+      return sqlTable.buildIOReader((PBegin) input);
     }
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index b5002ea9718..241f745de7d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -21,7 +21,7 @@
 import java.util.List;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -36,13 +36,9 @@
  * statement that are identical to a row in the second SELECT statement.
  */
 public class BeamIntersectRel extends Intersect implements BeamRelNode {
-  private BeamSetOperatorRelBase delegate;
-
   public BeamIntersectRel(
       RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean 
all) {
     super(cluster, traits, inputs, all);
-    delegate =
-        new BeamSetOperatorRelBase(this, 
BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
   }
 
   @Override
@@ -51,15 +47,7 @@ public SetOp copy(RelTraitSet traitSet, List<RelNode> 
inputs, boolean all) {
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
-    return new Transform();
-  }
-
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
-
-    @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      return delegate.buildBeamPipeline(inputPCollections);
-    }
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
+    return new BeamSetOperatorRelBase(this, 
BeamSetOperatorRelBase.OpType.INTERSECT, all);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 5967f76a7bb..25b0ae378eb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
@@ -44,8 +45,9 @@
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.calcite.plan.RelOptCluster;
@@ -116,30 +118,41 @@ public Join copy(
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+  public PInput buildPInput(Pipeline pipeline, Map<Integer, PCollection<Row>> 
cache) {
+    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+    BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+    if (!seekable(leftRelNode) && seekable(rightRelNode)) {
+      return BeamSqlRelUtils.toPCollection(pipeline, leftRelNode, cache);
+    }
+    return BeamRelNode.super.buildPInput(pipeline, cache);
+  }
+
+  @Override
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
     return new Transform();
   }
 
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+  private class Transform extends PTransform<PInput, PCollection<Row>> {
 
     @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+    public PCollection<Row> expand(PInput pinput) {
       BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-      Schema leftSchema = CalciteUtils.toBeamSchema(left.getRowType());
       final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
 
-      if (!seekable(leftRelNode) && seekable(rightRelNode)) {
-        return joinAsLookup(leftRelNode, rightRelNode, inputPCollections)
+      if (pinput instanceof PCollection) {
+        return joinAsLookup(leftRelNode, rightRelNode, (PCollection<Row>) 
pinput)
             .setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
       }
 
-      PCollection<Row> leftRows = inputPCollections.apply("left", 
leftRelNode.toPTransform());
-      PCollection<Row> rightRows = inputPCollections.apply("right", 
rightRelNode.toPTransform());
+      Schema leftSchema = CalciteUtils.toBeamSchema(left.getRowType());
+      PCollectionList<Row> input = (PCollectionList<Row>) pinput;
+      assert input.size() == 2;
+      PCollection<Row> leftRows = input.get(0);
+      PCollection<Row> rightRows = input.get(1);
 
       verifySupportedTrigger(leftRows);
       verifySupportedTrigger(rightRows);
 
-      String stageName = BeamSqlRelUtils.getStageName(BeamJoinRel.this);
       WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
       WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
 
@@ -158,14 +171,14 @@ public Join copy(
       PCollection<KV<Row, Row>> extractedLeftRows =
           leftRows
               .apply(
-                  stageName + "_left_ExtractJoinFields",
+                  "left_ExtractJoinFields",
                   MapElements.via(new 
BeamJoinTransforms.ExtractJoinFields(true, pairs)))
               .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
 
       PCollection<KV<Row, Row>> extractedRightRows =
           rightRows
               .apply(
-                  stageName + "_right_ExtractJoinFields",
+                  "right_ExtractJoinFields",
                   MapElements.via(new 
BeamJoinTransforms.ExtractJoinFields(false, pairs)))
               .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
 
@@ -184,8 +197,7 @@ public Join copy(
               "WindowFns must match for a 
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
         }
 
-        return standardJoin(
-            extractedLeftRows, extractedRightRows, leftNullRow, rightNullRow, 
stageName);
+        return standardJoin(extractedLeftRows, extractedRightRows, 
leftNullRow, rightNullRow);
       } else if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
               && rightRows.isBounded() == UNBOUNDED)
           || (leftRows.isBounded() == UNBOUNDED
@@ -242,8 +254,7 @@ private boolean triggersOncePerWindow(WindowingStrategy 
windowingStrategy) {
       PCollection<KV<Row, Row>> extractedLeftRows,
       PCollection<KV<Row, Row>> extractedRightRows,
       Row leftNullRow,
-      Row rightNullRow,
-      String stageName) {
+      Row rightNullRow) {
     PCollection<KV<Row, KV<Row, Row>>> joinedRows = null;
     switch (joinType) {
       case LEFT:
@@ -272,8 +283,7 @@ private boolean triggersOncePerWindow(WindowingStrategy 
windowingStrategy) {
     PCollection<Row> ret =
         joinedRows
             .apply(
-                stageName + "_JoinParts2WholeRow",
-                MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
+                "JoinParts2WholeRow", MapElements.via(new 
BeamJoinTransforms.JoinParts2WholeRow()))
             .setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
     return ret;
   }
@@ -363,8 +373,7 @@ private Row buildNullRow(BeamRelNode relNode) {
   }
 
   private PCollection<Row> joinAsLookup(
-      BeamRelNode leftRelNode, BeamRelNode rightRelNode, PCollectionTuple 
inputPCollections) {
-    PCollection<Row> factStream = 
inputPCollections.apply(leftRelNode.toPTransform());
+      BeamRelNode leftRelNode, BeamRelNode rightRelNode, PCollection<Row> 
factStream) {
     BeamIOSourceRel srcRel = (BeamIOSourceRel) rightRelNode;
     BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable) 
srcRel.getBeamSqlTable();
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index eb93f54c270..2db3b13000c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -21,7 +21,7 @@
 import java.util.List;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -36,12 +36,9 @@
  */
 public class BeamMinusRel extends Minus implements BeamRelNode {
 
-  private BeamSetOperatorRelBase delegate;
-
   public BeamMinusRel(
       RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean 
all) {
     super(cluster, traits, inputs, all);
-    delegate = new BeamSetOperatorRelBase(this, 
BeamSetOperatorRelBase.OpType.MINUS, inputs, all);
   }
 
   @Override
@@ -50,15 +47,7 @@ public SetOp copy(RelTraitSet traitSet, List<RelNode> 
inputs, boolean all) {
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
-    return new Transform();
-  }
-
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
-
-    @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      return delegate.buildBeamPipeline(inputPCollections);
-    }
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
+    return new BeamSetOperatorRelBase(this, 
BeamSetOperatorRelBase.OpType.MINUS, all);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index 5fc885b7428..7a30df21d1d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -17,20 +17,37 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.rel.RelNode;
 
 /** A {@link RelNode} that can also give a {@link PTransform} that implements 
the expression. */
 public interface BeamRelNode extends RelNode {
-  /**
-   * A {@link BeamRelNode} is a recursive structure, the {@code 
BeamQueryPlanner} visits it with a
-   * DFS(Depth-First-Search) algorithm.
-   */
-  PTransform<PCollectionTuple, PCollection<Row>> toPTransform();
+
+  /** Transforms the inputs into a PInput. */
+  default PInput buildPInput(Pipeline pipeline, Map<Integer, PCollection<Row>> 
cache) {
+    List<RelNode> inputs = getInputs();
+    if (inputs.size() == 0) {
+      return pipeline.begin();
+    }
+    List<PCollection<Row>> pInputs = new ArrayList(inputs.size());
+    for (RelNode input : inputs) {
+      pInputs.add(BeamSqlRelUtils.toPCollection(pipeline, (BeamRelNode) input, 
cache));
+    }
+    if (pInputs.size() == 1) {
+      return pInputs.get(0);
+    }
+    return PCollectionList.of(pInputs);
+  }
+
+  PTransform<PInput, PCollection<Row>> buildPTransform();
 
   /** Perform a DFS(Depth-First-Search) to find the PipelineOptions config. */
   default Map<String, String> getPipelineOptions() {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index df74fd65bf5..73b7d4ca076 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -19,9 +19,9 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.io.Serializable;
-import java.util.List;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -29,16 +29,16 @@
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.rel.RelNode;
 
 /**
  * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel} 
and {@code
  * BeamMinusRel}.
  */
-public class BeamSetOperatorRelBase {
+public class BeamSetOperatorRelBase extends PTransform<PInput, 
PCollection<Row>> {
   /** Set operator type. */
   public enum OpType implements Serializable {
     UNION,
@@ -47,25 +47,20 @@
   }
 
   private BeamRelNode beamRelNode;
-  private List<RelNode> inputs;
   private boolean all;
   private OpType opType;
 
-  public BeamSetOperatorRelBase(
-      BeamRelNode beamRelNode, OpType opType, List<RelNode> inputs, boolean 
all) {
+  public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, 
boolean all) {
     this.beamRelNode = beamRelNode;
     this.opType = opType;
-    this.inputs = inputs;
     this.all = all;
   }
 
-  public PCollection<Row> buildBeamPipeline(PCollectionTuple 
inputPCollections) {
-    PCollection<Row> leftRows =
-        inputPCollections.apply(
-            "left", 
BeamSqlRelUtils.getBeamRelInput(inputs.get(0)).toPTransform());
-    PCollection<Row> rightRows =
-        inputPCollections.apply(
-            "right", 
BeamSqlRelUtils.getBeamRelInput(inputs.get(1)).toPTransform());
+  public PCollection<Row> expand(PInput pinput) {
+    PCollectionList<Row> inputs = (PCollectionList<Row>) pinput;
+    assert inputs.size() == 2;
+    PCollection<Row> leftRows = inputs.get(0);
+    PCollection<Row> rightRows = inputs.get(1);
 
     WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
     WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
@@ -83,17 +78,16 @@ public BeamSetOperatorRelBase(
     final TupleTag<Row> rightTag = new TupleTag<>();
 
     // co-group
-    String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
     PCollection<KV<Row, CoGbkResult>> coGbkResultCollection =
         KeyedPCollectionTuple.of(
                 leftTag,
                 leftRows.apply(
-                    stageName + "_CreateLeftIndex",
+                    "CreateLeftIndex",
                     MapElements.via(new 
BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
             .and(
                 rightTag,
                 rightRows.apply(
-                    stageName + "_CreateRightIndex",
+                    "CreateRightIndex",
                     MapElements.via(new 
BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
             .apply(CoGroupByKey.create());
     PCollection<Row> ret =
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 84d8ed95277..99583c2894a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -34,7 +34,7 @@
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -121,17 +121,15 @@ public BeamSortRel(
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
     return new Transform();
   }
 
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+  private class Transform extends PTransform<PInput, PCollection<Row>> {
 
     @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      RelNode input = getInput();
-      PCollection<Row> upstream =
-          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
+    public PCollection<Row> expand(PInput pinput) {
+      PCollection<Row> upstream = (PCollection<Row>) pinput;
       Type windowType =
           
upstream.getWindowingStrategy().getWindowFn().getWindowTypeDescriptor().getType();
       if (!windowType.equals(GlobalWindow.class)) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
index 30d52516036..d5c4f7a2d6c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
@@ -17,36 +17,42 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.calcite.plan.RelOptUtil;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** Utilities for {@code BeamRelNode}. */
-class BeamSqlRelUtils {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BeamSqlRelUtils.class);
+public class BeamSqlRelUtils {
 
-  private static final AtomicInteger sequence = new AtomicInteger(0);
-  private static final AtomicInteger classSequence = new AtomicInteger(0);
-
-  public static String getStageName(BeamRelNode relNode) {
-    return relNode.getClass().getSimpleName().toUpperCase()
-        + "_"
-        + relNode.getId()
-        + "_"
-        + sequence.getAndIncrement();
+  public static PCollection<Row> toPCollection(Pipeline pipeline, BeamRelNode 
node) {
+    return toPCollection(pipeline, node, new HashMap());
   }
 
-  public static String getClassName(BeamRelNode relNode) {
-    return "Generated_"
-        + relNode.getClass().getSimpleName().toUpperCase()
-        + "_"
-        + relNode.getId()
-        + "_"
-        + classSequence.getAndIncrement();
+  /**
+   * A {@link BeamRelNode} is a recursive structure, the {@code 
BeamQueryPlanner} visits it with a
+   * DFS(Depth-First-Search) algorithm.
+   */
+  static PCollection<Row> toPCollection(
+      Pipeline pipeline, BeamRelNode node, Map<Integer, PCollection<Row>> 
cache) {
+    PCollection<Row> output = cache.get(node.getId());
+    if (output != null) {
+      return output;
+    }
+
+    String name = node.getClass().getSimpleName() + "_" + node.getId();
+    PInput input = node.buildPInput(pipeline, cache);
+    PTransform<PInput, PCollection<Row>> transform = node.buildPTransform();
+
+    output = Pipeline.applyTransform(name, input, transform);
+
+    cache.put(node.getId(), output);
+    return output;
   }
 
   public static BeamRelNode getBeamRelInput(RelNode input) {
@@ -56,24 +62,4 @@ public static BeamRelNode getBeamRelInput(RelNode input) {
     }
     return (BeamRelNode) input;
   }
-
-  public static String explain(final RelNode rel) {
-    return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
-  }
-
-  public static String explain(final RelNode rel, SqlExplainLevel detailLevel) 
{
-    String explain = "";
-    try {
-      explain = RelOptUtil.toString(rel);
-    } catch (StackOverflowError e) {
-      LOG.error(
-          "StackOverflowError occurred while extracting plan. "
-              + "Please report it to the dev@ mailing list.");
-      LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
-      LOG.error(
-          "Forcing plan to empty string and continue... "
-              + "SQL Runner may not working properly after.");
-    }
-    return explain;
-  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
index fdb15ada438..99a78d332b7 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
@@ -23,7 +23,7 @@
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -44,18 +44,14 @@ public RelNode copy(RelTraitSet traitSet, RelNode input) {
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
     return new Transform();
   }
 
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+  private class Transform extends PTransform<PInput, PCollection<Row>> {
     @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      RelNode input = getInput();
-      String stageName = BeamSqlRelUtils.getStageName(BeamUncollectRel.this);
-
-      PCollection<Row> upstream =
-          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
+    public PCollection<Row> expand(PInput pinput) {
+      PCollection<Row> upstream = (PCollection<Row>) pinput;
 
       // Each row of the input contains a single array of things to be 
emitted; Calcite knows
       // what the row looks like
@@ -63,7 +59,7 @@ public RelNode copy(RelTraitSet traitSet, RelNode input) {
 
       PCollection<Row> uncollected =
           upstream
-              .apply(stageName, ParDo.of(new UncollectDoFn(outputSchema)))
+              .apply(ParDo.of(new UncollectDoFn(outputSchema)))
               .setCoder(outputSchema.getRowCoder());
 
       return uncollected;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index f8fd0aff72d..d52c48fd7db 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -22,7 +22,7 @@
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -63,13 +63,9 @@
  * }</pre>
  */
 public class BeamUnionRel extends Union implements BeamRelNode {
-  private BeamSetOperatorRelBase delegate;
-
   public BeamUnionRel(
       RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean 
all) {
     super(cluster, traits, inputs, all);
-    this.delegate =
-        new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.UNION, 
inputs, all);
   }
 
   @Override
@@ -78,14 +74,7 @@ public SetOp copy(RelTraitSet traitSet, List<RelNode> 
inputs, boolean all) {
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
-    return new Transform();
-  }
-
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
-    @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      return delegate.buildBeamPipeline(inputPCollections);
-    }
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
+    return new BeamSetOperatorRelBase(this, 
BeamSetOperatorRelBase.OpType.UNION, all);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
index 35dbdac6997..a21e86a63f5 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
@@ -22,7 +22,9 @@
 
 import com.google.common.collect.ImmutableList;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironments;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
@@ -35,7 +37,7 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -63,11 +65,6 @@ public BeamUnnestRel(
     super(cluster, traits, left, right, correlationId, requiredColumns, 
joinType);
   }
 
-  @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
-    return new Transform();
-  }
-
   @Override
   public Correlate copy(
       RelTraitSet relTraitSet,
@@ -80,14 +77,22 @@ public Correlate copy(
         getCluster(), relTraitSet, left, right, correlationId, 
requiredColumns, joinType);
   }
 
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
-    @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      String stageName = BeamSqlRelUtils.getStageName(BeamUnnestRel.this);
+  @Override
+  public PInput buildPInput(Pipeline pipeline, Map<Integer, PCollection<Row>> 
cache) {
+    BeamRelNode input = BeamSqlRelUtils.getBeamRelInput(left);
+    return BeamSqlRelUtils.toPCollection(pipeline, input, cache);
+  }
+
+  @Override
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
+    return new Transform();
+  }
 
+  private class Transform extends PTransform<PInput, PCollection<Row>> {
+    @Override
+    public PCollection<Row> expand(PInput pinput) {
       // The set of rows where we run the correlated unnest for each row
-      PCollection<Row> outer =
-          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(left).toPTransform());
+      PCollection<Row> outer = (PCollection<Row>) pinput;
 
       // The correlated subquery
       BeamUncollectRel uncollect = (BeamUncollectRel) 
BeamSqlRelUtils.getBeamRelInput(right);
@@ -103,7 +108,6 @@ public Correlate copy(
 
       return outer
           .apply(
-              stageName,
               ParDo.of(
                   new UnnestFn(correlationId.getId(), expr, joinedSchema, 
innerSchema.getField(0))))
           .setCoder(joinedSchema.getRowCoder());
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index 68a5366fd99..d7b13156ecb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -29,8 +29,9 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -59,16 +60,15 @@ public BeamValuesRel(
   }
 
   @Override
-  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+  public PTransform<PInput, PCollection<Row>> buildPTransform() {
     return new Transform();
   }
 
-  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+  private class Transform extends PTransform<PInput, PCollection<Row>> {
 
     @Override
-    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+    public PCollection<Row> expand(PInput pinput) {
 
-      String stageName = BeamSqlRelUtils.getStageName(BeamValuesRel.this);
       if (tuples.isEmpty()) {
         throw new IllegalStateException("Values with empty tuples!");
       }
@@ -77,10 +77,7 @@ public BeamValuesRel(
 
       List<Row> rows = tuples.stream().map(tuple -> tupleToRow(schema, 
tuple)).collect(toList());
 
-      return inputPCollections
-          .getPipeline()
-          .apply(stageName, Create.of(rows))
-          .setCoder(schema.getRowCoder());
+      return ((PBegin) 
pinput).apply(Create.of(rows)).setCoder(schema.getRowCoder());
     }
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
index b946111179f..b5d2d0ef53b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
@@ -17,9 +17,8 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.schema;
 
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.RowCoder;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
@@ -37,12 +36,13 @@ public BeamPCollectionTable(PCollection<Row> upstream) {
   }
 
   @Override
-  public PCollection<Row> buildIOReader(Pipeline pipeline) {
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    assert begin.getPipeline() == upstream.getPipeline();
     return upstream;
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
+  public POutput buildIOWriter(PCollection<Row> input) {
     throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as 
target");
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
index 68eac812ebd..7d1e68e9667 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
@@ -18,14 +18,12 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
-import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
@@ -44,22 +42,17 @@ public BeamBigQueryTable(Schema beamSchema, String 
tableSpec) {
   }
 
   @Override
-  public PCollection<Row> buildIOReader(Pipeline pipeline) {
+  public PCollection<Row> buildIOReader(PBegin begin) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
-    return new PTransform<PCollection<Row>, POutput>() {
-      @Override
-      public WriteResult expand(PCollection<Row> input) {
-        return input.apply(
-            BigQueryIO.<Row>write()
-                .withSchema(BigQueryUtils.toTableSchema(getSchema()))
-                .withFormatFunction(BigQueryUtils.toTableRow())
-                .to(tableSpec));
-      }
-    };
+  public POutput buildIOWriter(PCollection<Row> input) {
+    return input.apply(
+        BigQueryIO.<Row>write()
+            .withSchema(BigQueryUtils.toTableSchema(getSchema()))
+            .withFormatFunction(BigQueryUtils.toTableRow())
+            .to(tableSpec));
   }
 
   String getTableSpec() {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index d9b7fa44e60..352e4d2db64 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -21,7 +21,6 @@
 
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -30,7 +29,6 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.kafka.common.TopicPartition;
@@ -76,7 +74,7 @@ public BeamKafkaTable updateConsumerProperties(Map<String, 
Object> configUpdates
       getPTransformForOutput();
 
   @Override
-  public PCollection<Row> buildIOReader(Pipeline pipeline) {
+  public PCollection<Row> buildIOReader(PBegin begin) {
     KafkaIO.Read<byte[], byte[]> kafkaRead = null;
     if (topics != null) {
       kafkaRead =
@@ -98,30 +96,26 @@ public BeamKafkaTable updateConsumerProperties(Map<String, 
Object> configUpdates
       throw new IllegalArgumentException("One of topics and topicPartitions 
must be configurated.");
     }
 
-    return PBegin.in(pipeline)
+    return begin
         .apply("read", kafkaRead.withoutMetadata())
         .apply("in_format", getPTransformForInput());
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
+  public POutput buildIOWriter(PCollection<Row> input) {
     checkArgument(
         topics != null && topics.size() == 1, "Only one topic can be 
acceptable as output.");
+    assert topics != null;
 
-    return new PTransform<PCollection<Row>, POutput>() {
-      @Override
-      public PDone expand(PCollection<Row> input) {
-        return input
-            .apply("out_reformat", getPTransformForOutput())
-            .apply(
-                "persistent",
-                KafkaIO.<byte[], byte[]>write()
-                    .withBootstrapServers(bootstrapServers)
-                    .withTopic(topics.get(0))
-                    .withKeySerializer(ByteArraySerializer.class)
-                    .withValueSerializer(ByteArraySerializer.class));
-      }
-    };
+    return input
+        .apply("out_reformat", getPTransformForOutput())
+        .apply(
+            "persistent",
+            KafkaIO.<byte[], byte[]>write()
+                .withBootstrapServers(bootstrapServers)
+                .withTopic(topics.get(0))
+                .withKeySerializer(ByteArraySerializer.class)
+                .withValueSerializer(ByteArraySerializer.class));
   }
 
   public String getBootstrapServers() {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
index ee332e6df2d..b4a5a6af273 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
@@ -23,14 +23,12 @@
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -139,9 +137,9 @@ static Builder builder() {
   public abstract Schema getSchema();
 
   @Override
-  public PCollection<Row> buildIOReader(Pipeline pipeline) {
+  public PCollection<Row> buildIOReader(PBegin begin) {
     PCollectionTuple rowsWithDlq =
-        PBegin.in(pipeline)
+        begin
             .apply("readFromPubsub", readMessagesWithAttributes())
             .apply("parseMessageToRow", createParserParDo());
 
@@ -178,7 +176,7 @@ static Builder builder() {
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
+  public POutput buildIOWriter(PCollection<Row> input) {
     throw new UnsupportedOperationException("Writing to a Pubsub topic is not 
supported");
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
index f4a7d8e2cd3..5473c02aa8a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
@@ -18,10 +18,8 @@
 
 package org.apache.beam.sdk.extensions.sql.meta.provider.text;
 
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
@@ -50,15 +48,15 @@ public BeamTextCSVTable(Schema schema, String filePattern, 
CSVFormat csvFormat)
   }
 
   @Override
-  public PCollection<Row> buildIOReader(Pipeline pipeline) {
-    return PBegin.in(pipeline)
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    return begin
         .apply("decodeRecord", TextIO.read().from(filePattern))
         .apply("parseCSVLine", new BeamTextCSVTableIOReader(schema, 
filePattern, csvFormat));
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
-    return new BeamTextCSVTableIOWriter(schema, filePattern, csvFormat);
+  public POutput buildIOWriter(PCollection<Row> input) {
+    return input.apply(new BeamTextCSVTableIOWriter(schema, filePattern, 
csvFormat));
   }
 
   public CSVFormat getCsvFormat() {
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index 8f53f4d2cfe..f00949bd4ad 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -332,8 +332,8 @@ public void 
testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception {
 
   private PCollection<Row> queryFromOrderTables(String sql) {
     return tuple(
-            "ORDER_DETAILS1", 
ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER),
-            "ORDER_DETAILS2", 
ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER))
+            "ORDER_DETAILS1", 
ORDER_DETAILS1.buildIOReader(pipeline.begin()).setCoder(SOURCE_CODER),
+            "ORDER_DETAILS2", 
ORDER_DETAILS2.buildIOReader(pipeline.begin()).setCoder(SOURCE_CODER))
         .apply("join", SqlTransform.query(sql))
         .setCoder(RESULT_CODER);
   }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java
index 217aa8dc252..0d7d6d1d845 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java
@@ -27,6 +27,7 @@
 import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
 import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
@@ -34,7 +35,6 @@
 import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -92,7 +92,7 @@ public void testSimpleInsert() throws Exception {
             + "  pubsub_topic.payload.name \n"
             + "FROM pubsub_topic";
 
-    PCollectionTuple.empty(pipeline).apply(sqlEnv.parseQuery(insertStatement));
+    BeamSqlRelUtils.toPCollection(pipeline, 
sqlEnv.parseQuery(insertStatement));
 
     pipeline.run();
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/TestTableProvider.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/TestTableProvider.java
index c6f975a4438..5eeafa49135 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/TestTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/TestTableProvider.java
@@ -25,7 +25,6 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
@@ -34,8 +33,8 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.POutput;
@@ -123,17 +122,18 @@ public InMemoryTable(TableWithRows tableWithRows) {
     }
 
     @Override
-    public PCollection<Row> buildIOReader(Pipeline pipeline) {
+    public PCollection<Row> buildIOReader(PBegin begin) {
       TableWithRows tableWithRows =
           GLOBAL_TABLES
               .get(this.tableWithRows.tableProviderInstanceId)
               .get(this.tableWithRows.table.getName());
-      return 
pipeline.apply(Create.of(tableWithRows.rows).withCoder(rowCoder()));
+      return begin.apply(Create.of(tableWithRows.rows).withCoder(rowCoder()));
     }
 
     @Override
-    public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
-      return new Collector(tableWithRows);
+    public POutput buildIOWriter(PCollection<Row> input) {
+      input.apply(ParDo.of(new CollectorFn(tableWithRows)));
+      return PDone.in(input.getPipeline());
     }
 
     @Override
@@ -142,21 +142,6 @@ public Schema getSchema() {
     }
   }
 
-  private static final class Collector extends PTransform<PCollection<Row>, 
POutput> {
-
-    private TableWithRows tableWithRows;
-
-    Collector(TableWithRows tableWithRows) {
-      this.tableWithRows = tableWithRows;
-    }
-
-    @Override
-    public POutput expand(PCollection<Row> input) {
-      input.apply(ParDo.of(new CollectorFn(tableWithRows)));
-      return PDone.in(input.getPipeline());
-    }
-  }
-
   private static final class CollectorFn extends DoFn<Row, Row> {
     private TableWithRows tableWithRows;
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
index 2cef73b81a0..35079dbf4c9 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
@@ -24,7 +24,6 @@
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 
 /** Base class for rel test. */
@@ -33,7 +32,7 @@
   private static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
   protected static PCollection<Row> compilePipeline(String sql, Pipeline 
pipeline) {
-    return PCollectionTuple.empty(pipeline).apply(env.parseQuery(sql));
+    return BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery(sql));
   }
 
   protected static void registerTable(String tableName, BeamSqlTable table) {
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
index 65943fce44e..b245c206a27 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
@@ -24,15 +24,14 @@
 
 import com.google.common.collect.ImmutableList;
 import java.math.BigDecimal;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.POutput;
@@ -97,26 +96,18 @@ public void testToEnumerable_collectMultiple() {
     enumerator.close();
   }
 
-  private static class FakeTable implements BeamSqlTable {
-    @Override
-    public PCollection<Row> buildIOReader(Pipeline pipeline) {
-      return null;
-    }
-
-    @Override
-    public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
-      return new FakeIOWriter();
+  private static class FakeTable extends BaseBeamTable {
+    public FakeTable() {
+      super(null);
     }
 
     @Override
-    public Schema getSchema() {
+    public PCollection<Row> buildIOReader(PBegin begin) {
       return null;
     }
-  }
 
-  private static class FakeIOWriter extends PTransform<PCollection<Row>, 
POutput> {
     @Override
-    public POutput expand(PCollection<Row> input) {
+    public POutput buildIOWriter(PCollection<Row> input) {
       input.apply(
           ParDo.of(
               new DoFn<Row, Void>() {
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 990071283db..850ddb9b497 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -20,7 +20,6 @@
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
@@ -30,8 +29,8 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
@@ -112,12 +111,12 @@ public SiteLookupTable(Schema schema) {
     }
 
     @Override
-    public PCollection<Row> buildIOReader(Pipeline pipeline) {
+    public PCollection<Row> buildIOReader(PBegin begin) {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
+    public POutput buildIOWriter(PCollection<Row> input) {
       throw new UnsupportedOperationException();
     }
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 3cd9ebfb93a..0b277d655db 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -91,7 +91,7 @@
               (short) 32767,
               2147483647,
               9223372036854775807L)
-          .buildIOReader(pipeline)
+          .buildIOReader(pipeline.begin())
           .setCoder(ROW_TYPE.getRowCoder());
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
index f4d0aaa12f3..2261cc9c348 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -356,7 +356,7 @@ public void testLike() throws Exception {
               true,
               "string_true_test",
               "string_false_test")
-          .buildIOReader(pipeline)
+          .buildIOReader(pipeline.begin())
           .setCoder(type.getRowCoder());
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java
index 89657f46b80..52111afdfcf 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java
@@ -28,6 +28,7 @@
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -37,7 +38,6 @@
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.Duration;
 import org.junit.Rule;
@@ -76,7 +76,7 @@ public void testInsertValues() throws Exception {
 
     String insertStatement = "INSERT INTO ORDERS VALUES (1, 'foo', 
ARRAY['123', '456'])";
 
-    PCollectionTuple.empty(pipeline).apply(sqlEnv.parseQuery(insertStatement));
+    BeamSqlRelUtils.toPCollection(pipeline, 
sqlEnv.parseQuery(insertStatement));
 
     pipeline.run().waitUntilFinish(Duration.standardMinutes(5));
 
@@ -116,7 +116,7 @@ public void testInsertSelect() throws Exception {
             + "    name as `name`, \n"
             + "    arr as `arr` \n"
             + " FROM ORDERS_IN_MEMORY";
-    PCollectionTuple.empty(pipeline).apply(sqlEnv.parseQuery(insertStatement));
+    BeamSqlRelUtils.toPCollection(pipeline, 
sqlEnv.parseQuery(insertStatement));
 
     pipeline.run().waitUntilFinish(Duration.standardMinutes(5));
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index ed492aa297e..e9dbff8bcca 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -35,7 +36,6 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -173,7 +173,7 @@ private Row row(Schema schema, Object... values) {
   private PCollection<Row> query(BeamSqlEnv sqlEnv, TestPipeline pipeline, 
String queryString)
       throws Exception {
 
-    return 
PCollectionTuple.empty(pipeline).apply(sqlEnv.parseQuery(queryString));
+    return BeamSqlRelUtils.toPCollection(pipeline, 
sqlEnv.parseQuery(queryString));
   }
 
   private PubsubMessage message(Instant timestamp, int id, String name) {
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java
index 916d4f935e9..4a45d1054d5 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java
@@ -78,20 +78,23 @@
   @Test
   public void testBuildIOReader() {
     PCollection<Row> rows =
-        new BeamTextCSVTable(schema, 
readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
+        new BeamTextCSVTable(schema, readerSourceFile.getAbsolutePath())
+            .buildIOReader(pipeline.begin());
     PAssert.that(rows).containsInAnyOrder(testDataRows);
     pipeline.run();
   }
 
   @Test
   public void testBuildIOWriter() {
-    new BeamTextCSVTable(schema, readerSourceFile.getAbsolutePath())
-        .buildIOReader(pipeline)
-        .apply(new BeamTextCSVTable(schema, 
writerTargetFile.getAbsolutePath()).buildIOWriter());
+    PCollection<Row> input =
+        new BeamTextCSVTable(schema, readerSourceFile.getAbsolutePath())
+            .buildIOReader(pipeline.begin());
+    new BeamTextCSVTable(schema, 
writerTargetFile.getAbsolutePath()).buildIOWriter(input);
     pipeline.run();
 
     PCollection<Row> rows =
-        new BeamTextCSVTable(schema, 
writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
+        new BeamTextCSVTable(schema, writerTargetFile.getAbsolutePath())
+            .buildIOReader(pipeline2.begin());
 
     // confirm the two reads match
     PAssert.that(rows).containsInAnyOrder(testDataRows);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
index 8f43e1c9b80..399778817ff 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -24,11 +24,9 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -89,35 +87,25 @@ public MockedBoundedTable addRows(Object... args) {
   }
 
   @Override
-  public PCollection<Row> buildIOReader(Pipeline pipeline) {
-    return PBegin.in(pipeline)
-        .apply("MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), 
Create.of(rows));
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    return begin.apply("MockedBoundedTable_Reader_" + 
COUNTER.incrementAndGet(), Create.of(rows));
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
-    return new OutputStore();
-  }
-
-  /** Keep output in {@code CONTENT} for validation. */
-  public static class OutputStore extends PTransform<PCollection<Row>, 
POutput> {
-
-    @Override
-    public PDone expand(PCollection<Row> input) {
-      input.apply(
-          ParDo.of(
-              new DoFn<Row, Void>() {
-                @ProcessElement
-                public void processElement(ProcessContext c) {
-                  CONTENT.add(c.element());
-                }
+  public POutput buildIOWriter(PCollection<Row> input) {
+    input.apply(
+        ParDo.of(
+            new DoFn<Row, Void>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) {
+                CONTENT.add(c.element());
+              }
 
-                @Teardown
-                public void close() {
-                  CONTENT.clear();
-                }
-              }));
-      return PDone.in(input.getPipeline());
-    }
+              @Teardown
+              public void close() {
+                CONTENT.clear();
+              }
+            }));
+    return PDone.in(input.getPipeline());
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
index 5cfdf789960..fed4eb36b40 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
@@ -21,7 +21,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
@@ -35,7 +34,7 @@ public MockedTable(Schema beamSchema) {
   }
 
   @Override
-  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
+  public POutput buildIOWriter(PCollection<Row> input) {
     throw new UnsupportedOperationException("buildIOWriter unsupported!");
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
index f440b54686b..785746603e9 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
@@ -21,10 +21,10 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -87,7 +87,7 @@ public MockedUnboundedTable addRows(Duration duration, 
Object... args) {
   }
 
   @Override
-  public PCollection<Row> buildIOReader(Pipeline pipeline) {
+  public PCollection<Row> buildIOReader(PBegin begin) {
     TestStream.Builder<Row> values = TestStream.create(schema.getRowCoder());
 
     for (Pair<Duration, List<Row>> pair : timestampedRows) {
@@ -101,10 +101,7 @@ public MockedUnboundedTable addRows(Duration duration, 
Object... args) {
       }
     }
 
-    return pipeline
-        .begin()
-        .apply(
-            "MockedUnboundedTable_" + COUNTER.incrementAndGet(),
-            values.advanceWatermarkToInfinity());
+    return begin.apply(
+        "MockedUnboundedTable_" + COUNTER.incrementAndGet(), 
values.advanceWatermarkToInfinity());
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 113533)
    Time Spent: 5h  (was: 4h 50m)

> Beam SQL should cleanly transform graph from Calcite
> ----------------------------------------------------
>
>                 Key: BEAM-4575
>                 URL: https://issues.apache.org/jira/browse/BEAM-4575
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Andrew Pilloud
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> It would be nice if the Beam graph matched the Calcite graph in structure 
> with each node generating a PTransform that is applied onto the PCollection 
> of it's parent. We should also ensure that each Calcite node only appears in 
> the Beam graph one time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to