apilloud commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r435497089



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFSlidingWindowFn.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.sql.impl.utils.TVFStreamingUtils;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+
+/**
+ * TVFSlidingWindowFn assigns window based on input row's "window_start" and 
"window_end"
+ * timestamps.
+ */
+public class TVFSlidingWindowFn extends NonMergingWindowFn<Object, 
IntervalWindow> {
+  /** Amount of time between generated windows. */
+  private final Duration period;
+
+  /** Size of the generated windows. */
+  private final Duration size;
+
+  public static TVFSlidingWindowFn of(Duration size, Duration period) {
+    return new TVFSlidingWindowFn(size, period);
+  }
+
+  private TVFSlidingWindowFn(Duration size, Duration period) {
+    this.period = period;
+    this.size = size;
+  }
+
+  @Override
+  public Collection<IntervalWindow> assignWindows(AssignContext c) throws 
Exception {
+    Row curRow = (Row) c.element();
+    // In sliding window as TVF syntax, each row contains's its window's start 
and end as metadata,
+    // thus we can assign a window directly based on window's start and end 
metadata.
+    return Arrays.asList(
+        new IntervalWindow(
+            curRow.getDateTime(TVFStreamingUtils.WINDOW_START).toInstant(),
+            curRow.getDateTime(TVFStreamingUtils.WINDOW_END).toInstant()));
+  }
+
+  @Override
+  public boolean isCompatible(WindowFn<?, ?> other) {
+    return equals(other);
+  }
+
+  @Override
+  public Coder<IntervalWindow> windowCoder() {
+    return IntervalWindow.getCoder();
+  }
+
+  @Override
+  public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
+    throw new UnsupportedOperationException(
+        "TVFSlidingWindow does not support side input windows.");
+  }
+
+  @Override

Review comment:
       nit: From here to the end of the file is boilerplate that `AutoValue` 
does for you. Could you use that instead?

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -85,6 +97,85 @@ public TableFunctionScan copy(
   }
 
   private class Transform extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
+    private TVFToPTransform tumbleToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          FixedWindows windowFn = 
FixedWindows.of(durationParameter(call.getOperands().get(2)));
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new FixedWindowDoFn(windowFn, 
wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) 
windowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform hopToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
+          Duration size = durationParameter(call.getOperands().get(2));
+          Duration period = durationParameter(call.getOperands().get(3));
+          SlidingWindows windowFn = SlidingWindows.of(size).every(period);
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new SlidingWindowDoFn(windowFn, 
wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          // Sliding window needs this special WindowFn to assign windows 
based on window_start,
+          // window_end metadata.
+          WindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), specialWindowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform sessionToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Duration gap = durationParameter(call.getOperands().get(2));
+
+          Sessions sessions = Sessions.withGapDuration(gap);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(upstream, wmCol.getIndex(), sessions);
+
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          // To extract session's window metadata, we apply a GroupByKey with 
a dummy key. It is
+          // because
+          // session is merging window. After GBK, SessionWindowDoFn will help 
extract window_start,
+          // window_end metadata.
+          PCollection<Row> streamWithWindowMetadata =
+              windowedStream
+                  .apply(WithKeys.of("dummy"))

Review comment:
       This block here looks to be really innovative to me! It looks like this 
is what makes `session_end` work? This also scares me, I don't see how it can 
work without creating hot key and scalability problems. Is there a doc 
explaining how this works?

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+          "Only support %s table-valued functions. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF,
           operatorName);
-      RexCall call = ((RexCall) getCall());
-      RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
-      PCollection<Row> upstream = input.get(0);
-      Schema outputSchema = CalciteUtils.toSchema(getRowType());
-      FixedWindows windowFn = 
FixedWindows.of(durationParameter(call.getOperands().get(2)));
-      PCollection<Row> streamWithWindowMetadata =
-          upstream
-              .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), 
outputSchema)))
-              .setRowSchema(outputSchema);
-
-      PCollection<Row> windowedStream =
-          assignTimestampsAndWindow(
-              streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
 
-      return windowedStream;
+      return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall) 
getCall()), input.get(0));

Review comment:
       This approach works, but it increases complexity, reduces debugability, 
and is more error prone when compared to a simple 'switch case' statement and 
static methods.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -166,6 +247,54 @@ public void processElement(ProcessContext c) {
     }
   }
 
+  private static class SlidingWindowDoFn extends DoFn<Row, Row> {
+    private final int windowFieldIndex;
+    private final SlidingWindows windowFn;
+    private final Schema outputSchema;
+
+    public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex, 
Schema schema) {
+      this.windowFn = windowFn;
+      this.windowFieldIndex = windowFieldIndex;
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Row row = c.element();
+      Collection<IntervalWindow> windows =
+          
windowFn.assignWindows(row.getDateTime(windowFieldIndex).toInstant());
+      for (IntervalWindow window : windows) {
+        Row.Builder builder = Row.withSchema(outputSchema);
+        builder.addValues(row.getValues());
+        builder.addValue(window.start());
+        builder.addValue(window.end());
+        c.output(builder.build());
+      }
+    }
+  }
+
+  private static class SessionWindowDoFn extends DoFn<KV<String, 
Iterable<Row>>, Row> {
+    private final Schema outputSchema;
+
+    public SessionWindowDoFn(Schema schema) {
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<String, Iterable<Row>> element, BoundedWindow window, 
OutputReceiver<Row> out) {
+      IntervalWindow intervalWindow = (IntervalWindow) window;
+      for (Row cur : element.getValue()) {

Review comment:
       This is going to iterate over every element in the window, that won't 
work. You might be able to make something that works reasonably well with 
`CombineFn`. (I don't think this is something we will figure out in PR 
comments.)

##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -607,7 +630,13 @@ private RexInputRef 
convertWatermarkedResolvedColumnToRexInputRef(
 
   private ResolvedColumn extractWatermarkColumnFromDescriptor(
       ResolvedNodes.ResolvedDescriptor descriptor) {
-    return descriptor.getDescriptorColumnList().get(0);
+    ResolvedColumn wmCol = descriptor.getDescriptorColumnList().get(0);
+    if (wmCol.getType().getKind() != TYPE_TIMESTAMP) {

Review comment:
       nit: Use checkArgument?

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFToPTransform.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+
+/** Provides a function that produces a PCollection based on TVF and upstream 
PCollection. */
+public interface TVFToPTransform {

Review comment:
       This isn't a `public interface`, it is a private implementation detail 
of `BeamTableFunctionScanRel`. Please move it there.

##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() {
     
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test

Review comment:
       nit: Are these really `ZetaSQLDialectSpecTests`? There are no matching 
compliance tests. Its probably worth moving these tests into their own file.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+          "Only support %s table-valued functions. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF,
           operatorName);
-      RexCall call = ((RexCall) getCall());
-      RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
-      PCollection<Row> upstream = input.get(0);
-      Schema outputSchema = CalciteUtils.toSchema(getRowType());
-      FixedWindows windowFn = 
FixedWindows.of(durationParameter(call.getOperands().get(2)));
-      PCollection<Row> streamWithWindowMetadata =
-          upstream
-              .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), 
outputSchema)))
-              .setRowSchema(outputSchema);
-
-      PCollection<Row> windowedStream =
-          assignTimestampsAndWindow(
-              streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
 
-      return windowedStream;
+      return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall) 
getCall()), input.get(0));
     }
 
     /** Extract timestamps from the windowFieldIndex, then window into 
windowFns. */
     private PCollection<Row> assignTimestampsAndWindow(
-        PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row, 
IntervalWindow> windowFn) {
+        PCollection<Row> upstream,
+        int windowFieldIndex,
+        WindowFn<Object, IntervalWindow> windowFn) {

Review comment:
       Why did `Row` become `Object` here? How can we keep this as `Row`?

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),

Review comment:
       `TVFStreamingUtils.WINDOWING_TVF` is not correct here, you should be 
checking against the map keys `tvfToPTransformMap.keySet()` (or you could make 
this the default case on a switch statement).

##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -206,7 +206,37 @@ private void addBuiltinFunctionsToCatalog(SimpleCatalog 
catalog, AnalyzerOptions
     // TUMBLE
     catalog.addTableValuedFunction(
         new 
TableValuedFunction.ForwardInputSchemaToOutputSchemaWithAppendedColumnTVF(
-            ImmutableList.of("TUMBLE"),
+            ImmutableList.of(TVFStreamingUtils.FIXED_WINDOW_TVF),
+            new FunctionSignature(
+                retType, ImmutableList.of(inputTableType, descriptorType, 
stringType), 123),

Review comment:
       The constant `123` here (and repeated below) is suppose to be a globally 
unique ID out of `com.google.zetasql.ZetaSQLFunction.FunctionSignatureId`. This 
one is `FN_BITWISE_NOT_UINT64`. I'm pretty sure that doesn't match this 
function signature. If we don't care about these, could you use 
`com.google.zetasql.ZetaSQLFunction.FunctionSignatureId.FN_INVALID_FUNCTION_ID`.

##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -607,7 +630,13 @@ private RexInputRef 
convertWatermarkedResolvedColumnToRexInputRef(
 
   private ResolvedColumn extractWatermarkColumnFromDescriptor(
       ResolvedNodes.ResolvedDescriptor descriptor) {
-    return descriptor.getDescriptorColumnList().get(0);
+    ResolvedColumn wmCol = descriptor.getDescriptorColumnList().get(0);
+    if (wmCol.getType().getKind() != TYPE_TIMESTAMP) {
+      throw new IllegalArgumentException(
+          "Watermarked column should be TIMESTAMP type: "
+              + extractWatermarkColumnNameFromDescriptor(descriptor));
+    }
+    return wmCol;
   }
 
   private String extractWatermarkColumnNameFromDescriptor(

Review comment:
       nit: inline this function?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to