apilloud commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r437068988
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -85,6 +97,85 @@ public TableFunctionScan copy(
}
private class Transform extends PTransform<PCollectionList<Row>,
PCollection<Row>> {
+ private TVFToPTransform tumbleToPTransform =
+ (call, upstream) -> {
+ RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+ Schema outputSchema = CalciteUtils.toSchema(getRowType());
+ FixedWindows windowFn =
FixedWindows.of(durationParameter(call.getOperands().get(2)));
+ PCollection<Row> streamWithWindowMetadata =
+ upstream
+ .apply(ParDo.of(new FixedWindowDoFn(windowFn,
wmCol.getIndex(), outputSchema)))
+ .setRowSchema(outputSchema);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(
+ streamWithWindowMetadata, wmCol.getIndex(), (WindowFn)
windowFn);
+
+ return windowedStream;
+ };
+
+ private TVFToPTransform hopToPTransform =
+ (call, upstream) -> {
+ RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+ Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
+ Duration size = durationParameter(call.getOperands().get(2));
+ Duration period = durationParameter(call.getOperands().get(3));
+ SlidingWindows windowFn = SlidingWindows.of(size).every(period);
+ PCollection<Row> streamWithWindowMetadata =
+ upstream
+ .apply(ParDo.of(new SlidingWindowDoFn(windowFn,
wmCol.getIndex(), outputSchema)))
+ .setRowSchema(outputSchema);
+
+ // Sliding window needs this special WindowFn to assign windows
based on window_start,
+ // window_end metadata.
+ WindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(
+ streamWithWindowMetadata, wmCol.getIndex(), specialWindowFn);
+
+ return windowedStream;
+ };
+
+ private TVFToPTransform sessionToPTransform =
+ (call, upstream) -> {
+ RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+ Duration gap = durationParameter(call.getOperands().get(2));
+
+ Sessions sessions = Sessions.withGapDuration(gap);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(upstream, wmCol.getIndex(), sessions);
+
+ Schema outputSchema = CalciteUtils.toSchema(getRowType());
+ // To extract session's window metadata, we apply a GroupByKey with
a dummy key. It is
+ // because
+ // session is merging window. After GBK, SessionWindowDoFn will help
extract window_start,
+ // window_end metadata.
+ PCollection<Row> streamWithWindowMetadata =
+ windowedStream
+ .apply(WithKeys.of("dummy"))
Review comment:
This product is for processing big data so 'super large' is the target
workflow. Can you point me at the existing code that does this?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -206,7 +206,37 @@ private void addBuiltinFunctionsToCatalog(SimpleCatalog
catalog, AnalyzerOptions
// TUMBLE
catalog.addTableValuedFunction(
new
TableValuedFunction.ForwardInputSchemaToOutputSchemaWithAppendedColumnTVF(
- ImmutableList.of("TUMBLE"),
+ ImmutableList.of(TVFStreamingUtils.FIXED_WINDOW_TVF),
+ new FunctionSignature(
+ retType, ImmutableList.of(inputTableType, descriptorType,
stringType), 123),
Review comment:
`-1` is `__FunctionSignatureId__switch_must_have_a_default__`, but I
don't see any harm in using that.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/TVFStreamingUtils.java
##########
@@ -17,8 +17,17 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.utils;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
/** Provides static constants or utils for TVF streaming. */
public class TVFStreamingUtils {
public static final String WINDOW_START = "window_start";
public static final String WINDOW_END = "window_end";
+
+ public static final String FIXED_WINDOW_TVF = "TUMBLE";
+ public static final String SLIDING_WINDOW_TVF = "HOP";
+ public static final String SESSION_WINDOW_TVF = "SESSION";
+
+ public static final ImmutableSet<String> WINDOWING_TVF =
Review comment:
This appears to be unused now?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]