apilloud commented on a change in pull request #11868: URL: https://github.com/apache/beam/pull/11868#discussion_r435497089
########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFSlidingWindowFn.java ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.sql.impl.utils.TVFStreamingUtils; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Duration; + +/** + * TVFSlidingWindowFn assigns window based on input row's "window_start" and "window_end" + * timestamps. + */ +public class TVFSlidingWindowFn extends NonMergingWindowFn<Object, IntervalWindow> { + /** Amount of time between generated windows. */ + private final Duration period; + + /** Size of the generated windows. */ + private final Duration size; + + public static TVFSlidingWindowFn of(Duration size, Duration period) { + return new TVFSlidingWindowFn(size, period); + } + + private TVFSlidingWindowFn(Duration size, Duration period) { + this.period = period; + this.size = size; + } + + @Override + public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception { + Row curRow = (Row) c.element(); + // In sliding window as TVF syntax, each row contains's its window's start and end as metadata, + // thus we can assign a window directly based on window's start and end metadata. + return Arrays.asList( + new IntervalWindow( + curRow.getDateTime(TVFStreamingUtils.WINDOW_START).toInstant(), + curRow.getDateTime(TVFStreamingUtils.WINDOW_END).toInstant())); + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + return equals(other); + } + + @Override + public Coder<IntervalWindow> windowCoder() { + return IntervalWindow.getCoder(); + } + + @Override + public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { + throw new UnsupportedOperationException( + "TVFSlidingWindow does not support side input windows."); + } + + @Override Review comment: nit: From here to the end of the file is boilerplate that `AutoValue` does for you. Could you use that instead? ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java ########## @@ -85,6 +97,85 @@ public TableFunctionScan copy( } private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> { + private TVFToPTransform tumbleToPTransform = + (call, upstream) -> { + RexInputRef wmCol = (RexInputRef) call.getOperands().get(1); + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2))); + PCollection<Row> streamWithWindowMetadata = + upstream + .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema))) + .setRowSchema(outputSchema); + + PCollection<Row> windowedStream = + assignTimestampsAndWindow( + streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn); + + return windowedStream; + }; + + private TVFToPTransform hopToPTransform = + (call, upstream) -> { + RexInputRef wmCol = (RexInputRef) call.getOperands().get(1); + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + + Duration size = durationParameter(call.getOperands().get(2)); + Duration period = durationParameter(call.getOperands().get(3)); + SlidingWindows windowFn = SlidingWindows.of(size).every(period); + PCollection<Row> streamWithWindowMetadata = + upstream + .apply(ParDo.of(new SlidingWindowDoFn(windowFn, wmCol.getIndex(), outputSchema))) + .setRowSchema(outputSchema); + + // Sliding window needs this special WindowFn to assign windows based on window_start, + // window_end metadata. + WindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period); + + PCollection<Row> windowedStream = + assignTimestampsAndWindow( + streamWithWindowMetadata, wmCol.getIndex(), specialWindowFn); + + return windowedStream; + }; + + private TVFToPTransform sessionToPTransform = + (call, upstream) -> { + RexInputRef wmCol = (RexInputRef) call.getOperands().get(1); + Duration gap = durationParameter(call.getOperands().get(2)); + + Sessions sessions = Sessions.withGapDuration(gap); + + PCollection<Row> windowedStream = + assignTimestampsAndWindow(upstream, wmCol.getIndex(), sessions); + + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + // To extract session's window metadata, we apply a GroupByKey with a dummy key. It is + // because + // session is merging window. After GBK, SessionWindowDoFn will help extract window_start, + // window_end metadata. + PCollection<Row> streamWithWindowMetadata = + windowedStream + .apply(WithKeys.of("dummy")) Review comment: This block here looks to be really innovative to me! It looks like this is what makes `session_end` work? This also scares me, I don't see how it can work without creating hot key and scalability problems. Is there a doc explaining how this works? ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java ########## @@ -95,29 +186,19 @@ public TableFunctionScan copy( input); String operatorName = ((RexCall) getCall()).getOperator().getName(); checkArgument( - operatorName.equals("TUMBLE"), - "Only support TUMBLE table-valued function. Current operator: %s", + TVFStreamingUtils.WINDOWING_TVF.contains(operatorName), + "Only support %s table-valued functions. Current operator: %s", + TVFStreamingUtils.WINDOWING_TVF, operatorName); - RexCall call = ((RexCall) getCall()); - RexInputRef wmCol = (RexInputRef) call.getOperands().get(1); - PCollection<Row> upstream = input.get(0); - Schema outputSchema = CalciteUtils.toSchema(getRowType()); - FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2))); - PCollection<Row> streamWithWindowMetadata = - upstream - .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema))) - .setRowSchema(outputSchema); - - PCollection<Row> windowedStream = - assignTimestampsAndWindow( - streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn); - return windowedStream; + return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall) getCall()), input.get(0)); Review comment: This approach works, but it increases complexity, reduces debugability, and is more error prone when compared to a simple 'switch case' statement and static methods. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java ########## @@ -166,6 +247,54 @@ public void processElement(ProcessContext c) { } } + private static class SlidingWindowDoFn extends DoFn<Row, Row> { + private final int windowFieldIndex; + private final SlidingWindows windowFn; + private final Schema outputSchema; + + public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex, Schema schema) { + this.windowFn = windowFn; + this.windowFieldIndex = windowFieldIndex; + this.outputSchema = schema; + } + + @ProcessElement + public void processElement(ProcessContext c) { + Row row = c.element(); + Collection<IntervalWindow> windows = + windowFn.assignWindows(row.getDateTime(windowFieldIndex).toInstant()); + for (IntervalWindow window : windows) { + Row.Builder builder = Row.withSchema(outputSchema); + builder.addValues(row.getValues()); + builder.addValue(window.start()); + builder.addValue(window.end()); + c.output(builder.build()); + } + } + } + + private static class SessionWindowDoFn extends DoFn<KV<String, Iterable<Row>>, Row> { + private final Schema outputSchema; + + public SessionWindowDoFn(Schema schema) { + this.outputSchema = schema; + } + + @ProcessElement + public void processElement( + @Element KV<String, Iterable<Row>> element, BoundedWindow window, OutputReceiver<Row> out) { + IntervalWindow intervalWindow = (IntervalWindow) window; + for (Row cur : element.getValue()) { Review comment: This is going to iterate over every element in the window, that won't work. You might be able to make something that works reasonably well with `CombineFn`. (I don't think this is something we will figure out in PR comments.) ########## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java ########## @@ -607,7 +630,13 @@ private RexInputRef convertWatermarkedResolvedColumnToRexInputRef( private ResolvedColumn extractWatermarkColumnFromDescriptor( ResolvedNodes.ResolvedDescriptor descriptor) { - return descriptor.getDescriptorColumnList().get(0); + ResolvedColumn wmCol = descriptor.getDescriptorColumnList().get(0); + if (wmCol.getType().getKind() != TYPE_TIMESTAMP) { Review comment: nit: Use checkArgument? ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFToPTransform.java ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall; + +/** Provides a function that produces a PCollection based on TVF and upstream PCollection. */ +public interface TVFToPTransform { Review comment: This isn't a `public interface`, it is a private implementation detail of `BeamTableFunctionScanRel`. Please move it there. ########## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ########## @@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test Review comment: nit: Are these really `ZetaSQLDialectSpecTests`? There are no matching compliance tests. Its probably worth moving these tests into their own file. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java ########## @@ -95,29 +186,19 @@ public TableFunctionScan copy( input); String operatorName = ((RexCall) getCall()).getOperator().getName(); checkArgument( - operatorName.equals("TUMBLE"), - "Only support TUMBLE table-valued function. Current operator: %s", + TVFStreamingUtils.WINDOWING_TVF.contains(operatorName), + "Only support %s table-valued functions. Current operator: %s", + TVFStreamingUtils.WINDOWING_TVF, operatorName); - RexCall call = ((RexCall) getCall()); - RexInputRef wmCol = (RexInputRef) call.getOperands().get(1); - PCollection<Row> upstream = input.get(0); - Schema outputSchema = CalciteUtils.toSchema(getRowType()); - FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2))); - PCollection<Row> streamWithWindowMetadata = - upstream - .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema))) - .setRowSchema(outputSchema); - - PCollection<Row> windowedStream = - assignTimestampsAndWindow( - streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn); - return windowedStream; + return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall) getCall()), input.get(0)); } /** Extract timestamps from the windowFieldIndex, then window into windowFns. */ private PCollection<Row> assignTimestampsAndWindow( - PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row, IntervalWindow> windowFn) { + PCollection<Row> upstream, + int windowFieldIndex, + WindowFn<Object, IntervalWindow> windowFn) { Review comment: Why did `Row` become `Object` here? How can we keep this as `Row`? ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java ########## @@ -95,29 +186,19 @@ public TableFunctionScan copy( input); String operatorName = ((RexCall) getCall()).getOperator().getName(); checkArgument( - operatorName.equals("TUMBLE"), - "Only support TUMBLE table-valued function. Current operator: %s", + TVFStreamingUtils.WINDOWING_TVF.contains(operatorName), Review comment: `TVFStreamingUtils.WINDOWING_TVF` is not correct here, you should be checking against the map keys `tvfToPTransformMap.keySet()` (or you could make this the default case on a switch statement). ########## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java ########## @@ -206,7 +206,37 @@ private void addBuiltinFunctionsToCatalog(SimpleCatalog catalog, AnalyzerOptions // TUMBLE catalog.addTableValuedFunction( new TableValuedFunction.ForwardInputSchemaToOutputSchemaWithAppendedColumnTVF( - ImmutableList.of("TUMBLE"), + ImmutableList.of(TVFStreamingUtils.FIXED_WINDOW_TVF), + new FunctionSignature( + retType, ImmutableList.of(inputTableType, descriptorType, stringType), 123), Review comment: The constant `123` here (and repeated below) is suppose to be a globally unique ID out of `com.google.zetasql.ZetaSQLFunction.FunctionSignatureId`. This one is `FN_BITWISE_NOT_UINT64`. I'm pretty sure that doesn't match this function signature. If we don't care about these, could you use `com.google.zetasql.ZetaSQLFunction.FunctionSignatureId.FN_INVALID_FUNCTION_ID`. ########## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java ########## @@ -607,7 +630,13 @@ private RexInputRef convertWatermarkedResolvedColumnToRexInputRef( private ResolvedColumn extractWatermarkColumnFromDescriptor( ResolvedNodes.ResolvedDescriptor descriptor) { - return descriptor.getDescriptorColumnList().get(0); + ResolvedColumn wmCol = descriptor.getDescriptorColumnList().get(0); + if (wmCol.getType().getKind() != TYPE_TIMESTAMP) { + throw new IllegalArgumentException( + "Watermarked column should be TIMESTAMP type: " + + extractWatermarkColumnNameFromDescriptor(descriptor)); + } + return wmCol; } private String extractWatermarkColumnNameFromDescriptor( Review comment: nit: inline this function? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
