[ https://issues.apache.org/jira/browse/BEAM-4575?focusedWorklogId=113533&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113533 ]
ASF GitHub Bot logged work on BEAM-4575: ---------------------------------------- Author: ASF GitHub Bot Created on: 20/Jun/18 03:18 Start Date: 20/Jun/18 03:18 Worklog Time Spent: 10m Work Description: kennknowles closed pull request #5673: [BEAM-4575] Cleanly transform graph from Calcite to Beam SQL URL: https://github.com/apache/beam/pull/5673 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java index bb06d72a33c..eb32d2a9a36 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java @@ -22,9 +22,9 @@ import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.ParseException; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.values.PCollectionTuple; /** {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. */ @Experimental @@ -59,7 +59,7 @@ public void execute(String sqlString) throws ParseException { BeamEnumerableConverter.createPipelineOptions(env.getPipelineOptions()); options.setJobName("BeamPlanCreator"); Pipeline pipeline = Pipeline.create(options); - PCollectionTuple.empty(pipeline).apply(env.parseQuery(sqlString)); + BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery(sqlString)); pipeline.run(); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java index 8bfecd5381e..7f849d4e70f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java @@ -18,20 +18,19 @@ package org.apache.beam.sdk.extensions.sql; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; /** This interface defines a Beam Sql Table. */ public interface BeamSqlTable { - /** create a {@code PCollection<BeamSqlRow>} from source. */ - PCollection<Row> buildIOReader(Pipeline pipeline); + /** create a {@code PCollection<Row>} from source. */ + PCollection<Row> buildIOReader(PBegin begin); /** create a {@code IO.write()} instance to write to target. */ - PTransform<? super PCollection<Row>, POutput> buildIOWriter(); + POutput buildIOWriter(PCollection<Row> input); /** Get the schema info of the table. */ Schema getSchema(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java index 20d152b67d2..2bee537f90d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; @@ -92,7 +93,7 @@ registerFunctions(sqlEnv); - return PCollectionTuple.empty(input.getPipeline()).apply(sqlEnv.parseQuery(queryString())); + return BeamSqlRelUtils.toPCollection(input.getPipeline(), sqlEnv.parseQuery(queryString())); } private Map<String, BeamSqlTable> toTableMap(PInput inputs) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index 940f0483464..1aca83bac0b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -23,15 +23,12 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.BeamSqlUdf; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.Row; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.jdbc.CalcitePrepare; import org.apache.calcite.jdbc.CalciteSchema; @@ -103,10 +100,9 @@ public void registerUdaf(String functionName, Combine.CombineFn combineFn) { defaultSchema.add(functionName, new UdafImpl(combineFn)); } - public PTransform<PCollectionTuple, PCollection<Row>> parseQuery(String query) - throws ParseException { + public BeamRelNode parseQuery(String query) throws ParseException { try { - return planner.convertToBeamRel(query).toPTransform(); + return planner.convertToBeamRel(query); } catch (ValidationException | RelConversionException | SqlParseException e) { throw new ParseException(String.format("Unable to parse query %s", query), e); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index b0b978ff018..b5156e9e161 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -39,7 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.calcite.plan.RelOptCluster; @@ -73,24 +73,20 @@ public BeamAggregationRel( } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { + public PTransform<PInput, PCollection<Row>> buildPTransform() { return new Transform(); } - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { + private class Transform extends PTransform<PInput, PCollection<Row>> { @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(BeamAggregationRel.this) + "_"; - - PCollection<Row> upstream = - inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform()); + public PCollection<Row> expand(PInput pinput) { + PCollection<Row> upstream = (PCollection<Row>) pinput; if (windowField.isPresent()) { upstream = upstream .apply( - stageName + "assignEventTimestamp", + "assignEventTimestamp", WithTimestamps.of( new BeamAggregationTransforms.WindowTimestampFn(windowFieldIndex)) .withAllowedTimestampSkew(new Duration(Long.MAX_VALUE))) @@ -99,7 +95,7 @@ public BeamAggregationRel( PCollection<Row> windowedStream = windowField.isPresent() - ? upstream.apply(stageName + "window", Window.into(windowField.get().windowFn())) + ? upstream.apply(Window.into(windowField.get().windowFn())) : upstream; validateWindowIsSupported(windowedStream); @@ -109,7 +105,7 @@ public BeamAggregationRel( PCollection<KV<Row, Row>> exCombineByStream = windowedStream .apply( - stageName + "exCombineBy", + "exCombineBy", WithKeys.of( new BeamAggregationTransforms.AggregationGroupByKeyFn( keySchema, windowFieldIndex, groupSet))) @@ -120,7 +116,7 @@ public BeamAggregationRel( PCollection<KV<Row, Row>> aggregatedStream = exCombineByStream .apply( - stageName + "combineBy", + "combineBy", Combine.perKey( new BeamAggregationTransforms.AggregationAdaptor( getNamedAggCalls(), CalciteUtils.toBeamSchema(input.getRowType())))) @@ -128,7 +124,7 @@ public BeamAggregationRel( PCollection<Row> mergedStream = aggregatedStream.apply( - stageName + "mergeRecord", + "mergeRecord", ParDo.of( new BeamAggregationTransforms.MergeAggregationRecord( CalciteUtils.toBeamSchema(getRowType()), windowFieldIndex))); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index b0c9e57b787..5488460a15d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -54,25 +54,20 @@ public Calc copy(RelTraitSet traitSet, RelNode input, RexProgram program) { } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { + public PTransform<PInput, PCollection<Row>> buildPTransform() { return new Transform(); } - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { + private class Transform extends PTransform<PInput, PCollection<Row>> { @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(BeamCalcRel.this); - - PCollection<Row> upstream = - inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform()); + public PCollection<Row> expand(PInput pinput) { + PCollection<Row> upstream = (PCollection<Row>) pinput; BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(BeamCalcRel.this.getProgram()); PCollection<Row> projectStream = - upstream.apply( - stageName, ParDo.of(new CalcFn(executor, CalciteUtils.toBeamSchema(rowType)))); + upstream.apply(ParDo.of(new CalcFn(executor, CalciteUtils.toBeamSchema(rowType)))); projectStream.setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder()); return projectStream; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 32e4564d408..2fbcb5b1e72 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.apache.calcite.adapter.enumerable.EnumerableRel; import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; @@ -119,7 +118,7 @@ public static PipelineOptions createPipelineOptions(Map<String, String> map) { private static PipelineResult run( PipelineOptions options, BeamRelNode node, DoFn<Row, Void> doFn) { Pipeline pipeline = Pipeline.create(options); - PCollectionTuple.empty(pipeline).apply(node.toPTransform()).apply(ParDo.of(doFn)); + BeamSqlRelUtils.toPCollection(pipeline, node).apply(ParDo.of(doFn)); PipelineResult result = pipeline.run(); result.waitUntilFinish(); return result; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java index 3989dc5cf3c..fe33c704b87 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptPlanner; @@ -98,27 +98,19 @@ public void register(RelOptPlanner planner) { } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { + public PTransform<PInput, PCollection<Row>> buildPTransform() { return new Transform(); } - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { + private class Transform extends PTransform<PInput, PCollection<Row>> { - /** - * Note that {@code BeamIOSinkRel} returns the input PCollection, which is the persisted - * PCollection. - */ @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(BeamIOSinkRel.this); + public PCollection<Row> expand(PInput pinput) { + PCollection<Row> input = (PCollection<Row>) pinput; - PCollection<Row> upstream = - inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform()); + sqlTable.buildIOWriter(input); - upstream.apply(stageName, sqlTable.buildIOWriter()); - - return upstream; + return input; } } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java index 3a478a8e80e..538e56a26af 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java @@ -20,8 +20,9 @@ import java.util.Map; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; @@ -44,15 +45,15 @@ public BeamIOSourceRel( } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { + public PTransform<PInput, PCollection<Row>> buildPTransform() { return new Transform(); } - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { + private class Transform extends PTransform<PInput, PCollection<Row>> { @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - return sqlTable.buildIOReader(inputPCollections.getPipeline()); + public PCollection<Row> expand(PInput input) { + return sqlTable.buildIOReader((PBegin) input); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java index b5002ea9718..241f745de7d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -36,13 +36,9 @@ * statement that are identical to a row in the second SELECT statement. */ public class BeamIntersectRel extends Intersect implements BeamRelNode { - private BeamSetOperatorRelBase delegate; - public BeamIntersectRel( RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) { super(cluster, traits, inputs, all); - delegate = - new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all); } @Override @@ -51,15 +47,7 @@ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { - return new Transform(); - } - - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { - - @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - return delegate.buildBeamPipeline(inputPCollections); - } + public PTransform<PInput, PCollection<Row>> buildPTransform() { + return new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.INTERSECT, all); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 5967f76a7bb..25b0ae378eb 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable; @@ -44,8 +45,9 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.calcite.plan.RelOptCluster; @@ -116,30 +118,41 @@ public Join copy( } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { + public PInput buildPInput(Pipeline pipeline, Map<Integer, PCollection<Row>> cache) { + BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); + BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); + if (!seekable(leftRelNode) && seekable(rightRelNode)) { + return BeamSqlRelUtils.toPCollection(pipeline, leftRelNode, cache); + } + return BeamRelNode.super.buildPInput(pipeline, cache); + } + + @Override + public PTransform<PInput, PCollection<Row>> buildPTransform() { return new Transform(); } - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { + private class Transform extends PTransform<PInput, PCollection<Row>> { @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { + public PCollection<Row> expand(PInput pinput) { BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); - Schema leftSchema = CalciteUtils.toBeamSchema(left.getRowType()); final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); - if (!seekable(leftRelNode) && seekable(rightRelNode)) { - return joinAsLookup(leftRelNode, rightRelNode, inputPCollections) + if (pinput instanceof PCollection) { + return joinAsLookup(leftRelNode, rightRelNode, (PCollection<Row>) pinput) .setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder()); } - PCollection<Row> leftRows = inputPCollections.apply("left", leftRelNode.toPTransform()); - PCollection<Row> rightRows = inputPCollections.apply("right", rightRelNode.toPTransform()); + Schema leftSchema = CalciteUtils.toBeamSchema(left.getRowType()); + PCollectionList<Row> input = (PCollectionList<Row>) pinput; + assert input.size() == 2; + PCollection<Row> leftRows = input.get(0); + PCollection<Row> rightRows = input.get(1); verifySupportedTrigger(leftRows); verifySupportedTrigger(rightRows); - String stageName = BeamSqlRelUtils.getStageName(BeamJoinRel.this); WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn(); @@ -158,14 +171,14 @@ public Join copy( PCollection<KV<Row, Row>> extractedLeftRows = leftRows .apply( - stageName + "_left_ExtractJoinFields", + "left_ExtractJoinFields", MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs))) .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder())); PCollection<KV<Row, Row>> extractedRightRows = rightRows .apply( - stageName + "_right_ExtractJoinFields", + "right_ExtractJoinFields", MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs))) .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder())); @@ -184,8 +197,7 @@ public Join copy( "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e); } - return standardJoin( - extractedLeftRows, extractedRightRows, leftNullRow, rightNullRow, stageName); + return standardJoin(extractedLeftRows, extractedRightRows, leftNullRow, rightNullRow); } else if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED && rightRows.isBounded() == UNBOUNDED) || (leftRows.isBounded() == UNBOUNDED @@ -242,8 +254,7 @@ private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) { PCollection<KV<Row, Row>> extractedLeftRows, PCollection<KV<Row, Row>> extractedRightRows, Row leftNullRow, - Row rightNullRow, - String stageName) { + Row rightNullRow) { PCollection<KV<Row, KV<Row, Row>>> joinedRows = null; switch (joinType) { case LEFT: @@ -272,8 +283,7 @@ private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) { PCollection<Row> ret = joinedRows .apply( - stageName + "_JoinParts2WholeRow", - MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) + "JoinParts2WholeRow", MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) .setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder()); return ret; } @@ -363,8 +373,7 @@ private Row buildNullRow(BeamRelNode relNode) { } private PCollection<Row> joinAsLookup( - BeamRelNode leftRelNode, BeamRelNode rightRelNode, PCollectionTuple inputPCollections) { - PCollection<Row> factStream = inputPCollections.apply(leftRelNode.toPTransform()); + BeamRelNode leftRelNode, BeamRelNode rightRelNode, PCollection<Row> factStream) { BeamIOSourceRel srcRel = (BeamIOSourceRel) rightRelNode; BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable) srcRel.getBeamSqlTable(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java index eb93f54c270..2db3b13000c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -36,12 +36,9 @@ */ public class BeamMinusRel extends Minus implements BeamRelNode { - private BeamSetOperatorRelBase delegate; - public BeamMinusRel( RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) { super(cluster, traits, inputs, all); - delegate = new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.MINUS, inputs, all); } @Override @@ -50,15 +47,7 @@ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { - return new Transform(); - } - - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { - - @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - return delegate.buildBeamPipeline(inputPCollections); - } + public PTransform<PInput, PCollection<Row>> buildPTransform() { + return new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.MINUS, all); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java index 5fc885b7428..7a30df21d1d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java @@ -17,20 +17,37 @@ */ package org.apache.beam.sdk.extensions.sql.impl.rel; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.rel.RelNode; /** A {@link RelNode} that can also give a {@link PTransform} that implements the expression. */ public interface BeamRelNode extends RelNode { - /** - * A {@link BeamRelNode} is a recursive structure, the {@code BeamQueryPlanner} visits it with a - * DFS(Depth-First-Search) algorithm. - */ - PTransform<PCollectionTuple, PCollection<Row>> toPTransform(); + + /** Transforms the inputs into a PInput. */ + default PInput buildPInput(Pipeline pipeline, Map<Integer, PCollection<Row>> cache) { + List<RelNode> inputs = getInputs(); + if (inputs.size() == 0) { + return pipeline.begin(); + } + List<PCollection<Row>> pInputs = new ArrayList(inputs.size()); + for (RelNode input : inputs) { + pInputs.add(BeamSqlRelUtils.toPCollection(pipeline, (BeamRelNode) input, cache)); + } + if (pInputs.size() == 1) { + return pInputs.get(0); + } + return PCollectionList.of(pInputs); + } + + PTransform<PInput, PCollection<Row>> buildPTransform(); /** Perform a DFS(Depth-First-Search) to find the PipelineOptions config. */ default Map<String, String> getPipelineOptions() { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java index df74fd65bf5..73b7d4ca076 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java @@ -19,9 +19,9 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.io.Serializable; -import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -29,16 +29,16 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; -import org.apache.calcite.rel.RelNode; /** * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel} and {@code * BeamMinusRel}. */ -public class BeamSetOperatorRelBase { +public class BeamSetOperatorRelBase extends PTransform<PInput, PCollection<Row>> { /** Set operator type. */ public enum OpType implements Serializable { UNION, @@ -47,25 +47,20 @@ } private BeamRelNode beamRelNode; - private List<RelNode> inputs; private boolean all; private OpType opType; - public BeamSetOperatorRelBase( - BeamRelNode beamRelNode, OpType opType, List<RelNode> inputs, boolean all) { + public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, boolean all) { this.beamRelNode = beamRelNode; this.opType = opType; - this.inputs = inputs; this.all = all; } - public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections) { - PCollection<Row> leftRows = - inputPCollections.apply( - "left", BeamSqlRelUtils.getBeamRelInput(inputs.get(0)).toPTransform()); - PCollection<Row> rightRows = - inputPCollections.apply( - "right", BeamSqlRelUtils.getBeamRelInput(inputs.get(1)).toPTransform()); + public PCollection<Row> expand(PInput pinput) { + PCollectionList<Row> inputs = (PCollectionList<Row>) pinput; + assert inputs.size() == 2; + PCollection<Row> leftRows = inputs.get(0); + PCollection<Row> rightRows = inputs.get(1); WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); @@ -83,17 +78,16 @@ public BeamSetOperatorRelBase( final TupleTag<Row> rightTag = new TupleTag<>(); // co-group - String stageName = BeamSqlRelUtils.getStageName(beamRelNode); PCollection<KV<Row, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of( leftTag, leftRows.apply( - stageName + "_CreateLeftIndex", + "CreateLeftIndex", MapElements.via(new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) .and( rightTag, rightRows.apply( - stageName + "_CreateRightIndex", + "CreateRightIndex", MapElements.via(new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) .apply(CoGroupByKey.create()); PCollection<Row> ret = diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java index 84d8ed95277..99583c2894a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java @@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -121,17 +121,15 @@ public BeamSortRel( } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { + public PTransform<PInput, PCollection<Row>> buildPTransform() { return new Transform(); } - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { + private class Transform extends PTransform<PInput, PCollection<Row>> { @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - RelNode input = getInput(); - PCollection<Row> upstream = - inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform()); + public PCollection<Row> expand(PInput pinput) { + PCollection<Row> upstream = (PCollection<Row>) pinput; Type windowType = upstream.getWindowingStrategy().getWindowFn().getWindowTypeDescriptor().getType(); if (!windowType.equals(GlobalWindow.class)) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java index 30d52516036..d5c4f7a2d6c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java @@ -17,36 +17,42 @@ */ package org.apache.beam.sdk.extensions.sql.impl.rel; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.calcite.plan.RelOptUtil; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.sql.SqlExplainLevel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Utilities for {@code BeamRelNode}. */ -class BeamSqlRelUtils { - private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class); +public class BeamSqlRelUtils { - private static final AtomicInteger sequence = new AtomicInteger(0); - private static final AtomicInteger classSequence = new AtomicInteger(0); - - public static String getStageName(BeamRelNode relNode) { - return relNode.getClass().getSimpleName().toUpperCase() - + "_" - + relNode.getId() - + "_" - + sequence.getAndIncrement(); + public static PCollection<Row> toPCollection(Pipeline pipeline, BeamRelNode node) { + return toPCollection(pipeline, node, new HashMap()); } - public static String getClassName(BeamRelNode relNode) { - return "Generated_" - + relNode.getClass().getSimpleName().toUpperCase() - + "_" - + relNode.getId() - + "_" - + classSequence.getAndIncrement(); + /** + * A {@link BeamRelNode} is a recursive structure, the {@code BeamQueryPlanner} visits it with a + * DFS(Depth-First-Search) algorithm. + */ + static PCollection<Row> toPCollection( + Pipeline pipeline, BeamRelNode node, Map<Integer, PCollection<Row>> cache) { + PCollection<Row> output = cache.get(node.getId()); + if (output != null) { + return output; + } + + String name = node.getClass().getSimpleName() + "_" + node.getId(); + PInput input = node.buildPInput(pipeline, cache); + PTransform<PInput, PCollection<Row>> transform = node.buildPTransform(); + + output = Pipeline.applyTransform(name, input, transform); + + cache.put(node.getId(), output); + return output; } public static BeamRelNode getBeamRelInput(RelNode input) { @@ -56,24 +62,4 @@ public static BeamRelNode getBeamRelInput(RelNode input) { } return (BeamRelNode) input; } - - public static String explain(final RelNode rel) { - return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); - } - - public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { - String explain = ""; - try { - explain = RelOptUtil.toString(rel); - } catch (StackOverflowError e) { - LOG.error( - "StackOverflowError occurred while extracting plan. " - + "Please report it to the dev@ mailing list."); - LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); - LOG.error( - "Forcing plan to empty string and continue... " - + "SQL Runner may not working properly after."); - } - return explain; - } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java index fdb15ada438..99a78d332b7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -44,18 +44,14 @@ public RelNode copy(RelTraitSet traitSet, RelNode input) { } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { + public PTransform<PInput, PCollection<Row>> buildPTransform() { return new Transform(); } - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { + private class Transform extends PTransform<PInput, PCollection<Row>> { @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(BeamUncollectRel.this); - - PCollection<Row> upstream = - inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform()); + public PCollection<Row> expand(PInput pinput) { + PCollection<Row> upstream = (PCollection<Row>) pinput; // Each row of the input contains a single array of things to be emitted; Calcite knows // what the row looks like @@ -63,7 +59,7 @@ public RelNode copy(RelTraitSet traitSet, RelNode input) { PCollection<Row> uncollected = upstream - .apply(stageName, ParDo.of(new UncollectDoFn(outputSchema))) + .apply(ParDo.of(new UncollectDoFn(outputSchema))) .setCoder(outputSchema.getRowCoder()); return uncollected; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java index f8fd0aff72d..d52c48fd7db 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -63,13 +63,9 @@ * }</pre> */ public class BeamUnionRel extends Union implements BeamRelNode { - private BeamSetOperatorRelBase delegate; - public BeamUnionRel( RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) { super(cluster, traits, inputs, all); - this.delegate = - new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.UNION, inputs, all); } @Override @@ -78,14 +74,7 @@ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { - return new Transform(); - } - - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { - @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - return delegate.buildBeamPipeline(inputPCollections); - } + public PTransform<PInput, PCollection<Row>> buildPTransform() { + return new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.UNION, all); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java index 35dbdac6997..a21e86a63f5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java @@ -22,7 +22,9 @@ import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironments; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; @@ -35,7 +37,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -63,11 +65,6 @@ public BeamUnnestRel( super(cluster, traits, left, right, correlationId, requiredColumns, joinType); } - @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { - return new Transform(); - } - @Override public Correlate copy( RelTraitSet relTraitSet, @@ -80,14 +77,22 @@ public Correlate copy( getCluster(), relTraitSet, left, right, correlationId, requiredColumns, joinType); } - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { - @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { - String stageName = BeamSqlRelUtils.getStageName(BeamUnnestRel.this); + @Override + public PInput buildPInput(Pipeline pipeline, Map<Integer, PCollection<Row>> cache) { + BeamRelNode input = BeamSqlRelUtils.getBeamRelInput(left); + return BeamSqlRelUtils.toPCollection(pipeline, input, cache); + } + + @Override + public PTransform<PInput, PCollection<Row>> buildPTransform() { + return new Transform(); + } + private class Transform extends PTransform<PInput, PCollection<Row>> { + @Override + public PCollection<Row> expand(PInput pinput) { // The set of rows where we run the correlated unnest for each row - PCollection<Row> outer = - inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(left).toPTransform()); + PCollection<Row> outer = (PCollection<Row>) pinput; // The correlated subquery BeamUncollectRel uncollect = (BeamUncollectRel) BeamSqlRelUtils.getBeamRelInput(right); @@ -103,7 +108,6 @@ public Correlate copy( return outer .apply( - stageName, ParDo.of( new UnnestFn(correlationId.getId(), expr, joinedSchema, innerSchema.getField(0)))) .setCoder(joinedSchema.getRowCoder()); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index 68a5366fd99..d7b13156ecb 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -29,8 +29,9 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -59,16 +60,15 @@ public BeamValuesRel( } @Override - public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() { + public PTransform<PInput, PCollection<Row>> buildPTransform() { return new Transform(); } - private class Transform extends PTransform<PCollectionTuple, PCollection<Row>> { + private class Transform extends PTransform<PInput, PCollection<Row>> { @Override - public PCollection<Row> expand(PCollectionTuple inputPCollections) { + public PCollection<Row> expand(PInput pinput) { - String stageName = BeamSqlRelUtils.getStageName(BeamValuesRel.this); if (tuples.isEmpty()) { throw new IllegalStateException("Values with empty tuples!"); } @@ -77,10 +77,7 @@ public BeamValuesRel( List<Row> rows = tuples.stream().map(tuple -> tupleToRow(schema, tuple)).collect(toList()); - return inputPCollections - .getPipeline() - .apply(stageName, Create.of(rows)) - .setCoder(schema.getRowCoder()); + return ((PBegin) pinput).apply(Create.of(rows)).setCoder(schema.getRowCoder()); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java index b946111179f..b5d2d0ef53b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java @@ -17,9 +17,8 @@ */ package org.apache.beam.sdk.extensions.sql.impl.schema; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.RowCoder; -import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; @@ -37,12 +36,13 @@ public BeamPCollectionTable(PCollection<Row> upstream) { } @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { + public PCollection<Row> buildIOReader(PBegin begin) { + assert begin.getPipeline() == upstream.getPipeline(); return upstream; } @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { + public POutput buildIOWriter(PCollection<Row> input) { throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target"); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java index 68eac812ebd..7d1e68e9667 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java @@ -18,14 +18,12 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; -import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; @@ -44,22 +42,17 @@ public BeamBigQueryTable(Schema beamSchema, String tableSpec) { } @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { + public PCollection<Row> buildIOReader(PBegin begin) { throw new UnsupportedOperationException(); } @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { - return new PTransform<PCollection<Row>, POutput>() { - @Override - public WriteResult expand(PCollection<Row> input) { - return input.apply( - BigQueryIO.<Row>write() - .withSchema(BigQueryUtils.toTableSchema(getSchema())) - .withFormatFunction(BigQueryUtils.toTableRow()) - .to(tableSpec)); - } - }; + public POutput buildIOWriter(PCollection<Row> input) { + return input.apply( + BigQueryIO.<Row>write() + .withSchema(BigQueryUtils.toTableSchema(getSchema())) + .withFormatFunction(BigQueryUtils.toTableRow()) + .to(tableSpec)); } String getTableSpec() { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java index d9b7fa44e60..352e4d2db64 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.io.kafka.KafkaIO; @@ -30,7 +29,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.apache.kafka.common.TopicPartition; @@ -76,7 +74,7 @@ public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates getPTransformForOutput(); @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { + public PCollection<Row> buildIOReader(PBegin begin) { KafkaIO.Read<byte[], byte[]> kafkaRead = null; if (topics != null) { kafkaRead = @@ -98,30 +96,26 @@ public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates throw new IllegalArgumentException("One of topics and topicPartitions must be configurated."); } - return PBegin.in(pipeline) + return begin .apply("read", kafkaRead.withoutMetadata()) .apply("in_format", getPTransformForInput()); } @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { + public POutput buildIOWriter(PCollection<Row> input) { checkArgument( topics != null && topics.size() == 1, "Only one topic can be acceptable as output."); + assert topics != null; - return new PTransform<PCollection<Row>, POutput>() { - @Override - public PDone expand(PCollection<Row> input) { - return input - .apply("out_reformat", getPTransformForOutput()) - .apply( - "persistent", - KafkaIO.<byte[], byte[]>write() - .withBootstrapServers(bootstrapServers) - .withTopic(topics.get(0)) - .withKeySerializer(ByteArraySerializer.class) - .withValueSerializer(ByteArraySerializer.class)); - } - }; + return input + .apply("out_reformat", getPTransformForOutput()) + .apply( + "persistent", + KafkaIO.<byte[], byte[]>write() + .withBootstrapServers(bootstrapServers) + .withTopic(topics.get(0)) + .withKeySerializer(ByteArraySerializer.class) + .withValueSerializer(ByteArraySerializer.class)); } public String getBootstrapServers() { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java index ee332e6df2d..b4a5a6af273 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java @@ -23,14 +23,12 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; import javax.annotation.Nullable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -139,9 +137,9 @@ static Builder builder() { public abstract Schema getSchema(); @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { + public PCollection<Row> buildIOReader(PBegin begin) { PCollectionTuple rowsWithDlq = - PBegin.in(pipeline) + begin .apply("readFromPubsub", readMessagesWithAttributes()) .apply("parseMessageToRow", createParserParDo()); @@ -178,7 +176,7 @@ static Builder builder() { } @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { + public POutput buildIOWriter(PCollection<Row> input) { throw new UnsupportedOperationException("Writing to a Pubsub topic is not supported"); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java index f4a7d8e2cd3..5473c02aa8a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java @@ -18,10 +18,8 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.text; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; @@ -50,15 +48,15 @@ public BeamTextCSVTable(Schema schema, String filePattern, CSVFormat csvFormat) } @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline) + public PCollection<Row> buildIOReader(PBegin begin) { + return begin .apply("decodeRecord", TextIO.read().from(filePattern)) .apply("parseCSVLine", new BeamTextCSVTableIOReader(schema, filePattern, csvFormat)); } @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { - return new BeamTextCSVTableIOWriter(schema, filePattern, csvFormat); + public POutput buildIOWriter(PCollection<Row> input) { + return input.apply(new BeamTextCSVTableIOWriter(schema, filePattern, csvFormat)); } public CSVFormat getCsvFormat() { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java index 8f53f4d2cfe..f00949bd4ad 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java @@ -332,8 +332,8 @@ public void testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception { private PCollection<Row> queryFromOrderTables(String sql) { return tuple( - "ORDER_DETAILS1", ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER), - "ORDER_DETAILS2", ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER)) + "ORDER_DETAILS1", ORDER_DETAILS1.buildIOReader(pipeline.begin()).setCoder(SOURCE_CODER), + "ORDER_DETAILS2", ORDER_DETAILS2.buildIOReader(pipeline.begin()).setCoder(SOURCE_CODER)) .apply("join", SqlTransform.query(sql)) .setCoder(RESULT_CODER); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java index 217aa8dc252..0d7d6d1d845 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java @@ -27,6 +27,7 @@ import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider; import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery; @@ -34,7 +35,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.joda.time.Duration; import org.joda.time.Instant; @@ -92,7 +92,7 @@ public void testSimpleInsert() throws Exception { + " pubsub_topic.payload.name \n" + "FROM pubsub_topic"; - PCollectionTuple.empty(pipeline).apply(sqlEnv.parseQuery(insertStatement)); + BeamSqlRelUtils.toPCollection(pipeline, sqlEnv.parseQuery(insertStatement)); pipeline.run(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/TestTableProvider.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/TestTableProvider.java index c6f975a4438..5eeafa49135 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/TestTableProvider.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/TestTableProvider.java @@ -25,7 +25,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -34,8 +33,8 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.POutput; @@ -123,17 +122,18 @@ public InMemoryTable(TableWithRows tableWithRows) { } @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { + public PCollection<Row> buildIOReader(PBegin begin) { TableWithRows tableWithRows = GLOBAL_TABLES .get(this.tableWithRows.tableProviderInstanceId) .get(this.tableWithRows.table.getName()); - return pipeline.apply(Create.of(tableWithRows.rows).withCoder(rowCoder())); + return begin.apply(Create.of(tableWithRows.rows).withCoder(rowCoder())); } @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { - return new Collector(tableWithRows); + public POutput buildIOWriter(PCollection<Row> input) { + input.apply(ParDo.of(new CollectorFn(tableWithRows))); + return PDone.in(input.getPipeline()); } @Override @@ -142,21 +142,6 @@ public Schema getSchema() { } } - private static final class Collector extends PTransform<PCollection<Row>, POutput> { - - private TableWithRows tableWithRows; - - Collector(TableWithRows tableWithRows) { - this.tableWithRows = tableWithRows; - } - - @Override - public POutput expand(PCollection<Row> input) { - input.apply(ParDo.of(new CollectorFn(tableWithRows))); - return PDone.in(input.getPipeline()); - } - } - private static final class CollectorFn extends DoFn<Row, Row> { private TableWithRows tableWithRows; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java index 2cef73b81a0..35079dbf4c9 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; /** Base class for rel test. */ @@ -33,7 +32,7 @@ private static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables); protected static PCollection<Row> compilePipeline(String sql, Pipeline pipeline) { - return PCollectionTuple.empty(pipeline).apply(env.parseQuery(sql)); + return BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery(sql)); } protected static void registerTable(String tableName, BeamSqlTable table) { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java index 65943fce44e..b245c206a27 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java @@ -24,15 +24,14 @@ import com.google.common.collect.ImmutableList; import java.math.BigDecimal; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.POutput; @@ -97,26 +96,18 @@ public void testToEnumerable_collectMultiple() { enumerator.close(); } - private static class FakeTable implements BeamSqlTable { - @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { - return null; - } - - @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { - return new FakeIOWriter(); + private static class FakeTable extends BaseBeamTable { + public FakeTable() { + super(null); } @Override - public Schema getSchema() { + public PCollection<Row> buildIOReader(PBegin begin) { return null; } - } - private static class FakeIOWriter extends PTransform<PCollection<Row>, POutput> { @Override - public POutput expand(PCollection<Row> input) { + public POutput buildIOWriter(PCollection<Row> input) { input.apply( ParDo.of( new DoFn<Row, Void>() { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java index 990071283db..850ddb9b497 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -20,7 +20,6 @@ import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; @@ -30,8 +29,8 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; @@ -112,12 +111,12 @@ public SiteLookupTable(Schema schema) { } @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { + public PCollection<Row> buildIOReader(PBegin begin) { throw new UnsupportedOperationException(); } @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { + public POutput buildIOWriter(PCollection<Row> input) { throw new UnsupportedOperationException(); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java index 3cd9ebfb93a..0b277d655db 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -91,7 +91,7 @@ (short) 32767, 2147483647, 9223372036854775807L) - .buildIOReader(pipeline) + .buildIOReader(pipeline.begin()) .setCoder(ROW_TYPE.getRowCoder()); } catch (Exception e) { throw new RuntimeException(e); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java index f4d0aaa12f3..2261cc9c348 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java @@ -356,7 +356,7 @@ public void testLike() throws Exception { true, "string_true_test", "string_false_test") - .buildIOReader(pipeline) + .buildIOReader(pipeline.begin()) .setCoder(type.getRowCoder()); } catch (Exception e) { throw new RuntimeException(e); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java index 89657f46b80..52111afdfcf 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryWriteIT.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable; import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; @@ -37,7 +38,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.joda.time.Duration; import org.junit.Rule; @@ -76,7 +76,7 @@ public void testInsertValues() throws Exception { String insertStatement = "INSERT INTO ORDERS VALUES (1, 'foo', ARRAY['123', '456'])"; - PCollectionTuple.empty(pipeline).apply(sqlEnv.parseQuery(insertStatement)); + BeamSqlRelUtils.toPCollection(pipeline, sqlEnv.parseQuery(insertStatement)); pipeline.run().waitUntilFinish(Duration.standardMinutes(5)); @@ -116,7 +116,7 @@ public void testInsertSelect() throws Exception { + " name as `name`, \n" + " arr as `arr` \n" + " FROM ORDERS_IN_MEMORY"; - PCollectionTuple.empty(pipeline).apply(sqlEnv.parseQuery(insertStatement)); + BeamSqlRelUtils.toPCollection(pipeline, sqlEnv.parseQuery(insertStatement)); pipeline.run().waitUntilFinish(Duration.standardMinutes(5)); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java index ed492aa297e..e9dbff8bcca 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Set; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; @@ -35,7 +36,6 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.joda.time.Duration; import org.joda.time.Instant; @@ -173,7 +173,7 @@ private Row row(Schema schema, Object... values) { private PCollection<Row> query(BeamSqlEnv sqlEnv, TestPipeline pipeline, String queryString) throws Exception { - return PCollectionTuple.empty(pipeline).apply(sqlEnv.parseQuery(queryString)); + return BeamSqlRelUtils.toPCollection(pipeline, sqlEnv.parseQuery(queryString)); } private PubsubMessage message(Instant timestamp, int id, String name) { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java index 916d4f935e9..4a45d1054d5 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java @@ -78,20 +78,23 @@ @Test public void testBuildIOReader() { PCollection<Row> rows = - new BeamTextCSVTable(schema, readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); + new BeamTextCSVTable(schema, readerSourceFile.getAbsolutePath()) + .buildIOReader(pipeline.begin()); PAssert.that(rows).containsInAnyOrder(testDataRows); pipeline.run(); } @Test public void testBuildIOWriter() { - new BeamTextCSVTable(schema, readerSourceFile.getAbsolutePath()) - .buildIOReader(pipeline) - .apply(new BeamTextCSVTable(schema, writerTargetFile.getAbsolutePath()).buildIOWriter()); + PCollection<Row> input = + new BeamTextCSVTable(schema, readerSourceFile.getAbsolutePath()) + .buildIOReader(pipeline.begin()); + new BeamTextCSVTable(schema, writerTargetFile.getAbsolutePath()).buildIOWriter(input); pipeline.run(); PCollection<Row> rows = - new BeamTextCSVTable(schema, writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); + new BeamTextCSVTable(schema, writerTargetFile.getAbsolutePath()) + .buildIOReader(pipeline2.begin()); // confirm the two reads match PAssert.that(rows).containsInAnyOrder(testDataRows); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java index 8f43e1c9b80..399778817ff 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java @@ -24,11 +24,9 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -89,35 +87,25 @@ public MockedBoundedTable addRows(Object... args) { } @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline) - .apply("MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows)); + public PCollection<Row> buildIOReader(PBegin begin) { + return begin.apply("MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows)); } @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { - return new OutputStore(); - } - - /** Keep output in {@code CONTENT} for validation. */ - public static class OutputStore extends PTransform<PCollection<Row>, POutput> { - - @Override - public PDone expand(PCollection<Row> input) { - input.apply( - ParDo.of( - new DoFn<Row, Void>() { - @ProcessElement - public void processElement(ProcessContext c) { - CONTENT.add(c.element()); - } + public POutput buildIOWriter(PCollection<Row> input) { + input.apply( + ParDo.of( + new DoFn<Row, Void>() { + @ProcessElement + public void processElement(ProcessContext c) { + CONTENT.add(c.element()); + } - @Teardown - public void close() { - CONTENT.clear(); - } - })); - return PDone.in(input.getPipeline()); - } + @Teardown + public void close() { + CONTENT.clear(); + } + })); + return PDone.in(input.getPipeline()); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java index 5cfdf789960..fed4eb36b40 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java @@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; @@ -35,7 +34,7 @@ public MockedTable(Schema beamSchema) { } @Override - public PTransform<? super PCollection<Row>, POutput> buildIOWriter() { + public POutput buildIOWriter(PCollection<Row> input) { throw new UnsupportedOperationException("buildIOWriter unsupported!"); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java index f440b54686b..785746603e9 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java @@ -21,10 +21,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TimestampedValue; @@ -87,7 +87,7 @@ public MockedUnboundedTable addRows(Duration duration, Object... args) { } @Override - public PCollection<Row> buildIOReader(Pipeline pipeline) { + public PCollection<Row> buildIOReader(PBegin begin) { TestStream.Builder<Row> values = TestStream.create(schema.getRowCoder()); for (Pair<Duration, List<Row>> pair : timestampedRows) { @@ -101,10 +101,7 @@ public MockedUnboundedTable addRows(Duration duration, Object... args) { } } - return pipeline - .begin() - .apply( - "MockedUnboundedTable_" + COUNTER.incrementAndGet(), - values.advanceWatermarkToInfinity()); + return begin.apply( + "MockedUnboundedTable_" + COUNTER.incrementAndGet(), values.advanceWatermarkToInfinity()); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 113533) Time Spent: 5h (was: 4h 50m) > Beam SQL should cleanly transform graph from Calcite > ---------------------------------------------------- > > Key: BEAM-4575 > URL: https://issues.apache.org/jira/browse/BEAM-4575 > Project: Beam > Issue Type: New Feature > Components: dsl-sql > Reporter: Andrew Pilloud > Assignee: Andrew Pilloud > Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > It would be nice if the Beam graph matched the Calcite graph in structure > with each node generating a PTransform that is applied onto the PCollection > of it's parent. We should also ensure that each Calcite node only appears in > the Beam graph one time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)