This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e8fad9ca2ba Adding DLQ support for ZetaSQL (#25873)
e8fad9ca2ba is described below

commit e8fad9ca2ba80220b4817ddba95aabc0a48067dd
Author: Pablo Estrada <pabl...@users.noreply.github.com>
AuthorDate: Fri Mar 17 18:14:03 2023 -0400

    Adding DLQ support for ZetaSQL (#25873)
---
 .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 126 ++++++++++++++++-----
 1 file changed, 97 insertions(+), 29 deletions(-)

diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index 744fbd0bcd4..dee87d370e6 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -37,18 +37,24 @@ import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
 import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamSqlUnparseContext;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptCluster;
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelTraitSet;
 import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelNode;
@@ -64,7 +70,6 @@ import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.fun.SqlStdO
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -83,6 +88,9 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
   private static final int MAX_PENDING_WINDOW = 32;
   private final BeamSqlUnparseContext context;
 
+  private static final TupleTag<Row> rows = new TupleTag<Row>("output") {};
+  private static final TupleTag<Row> errors = new TupleTag<Row>("errors") {};
+
   private static String columnName(int i) {
     return "_" + i;
   }
@@ -101,21 +109,36 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
 
   @Override
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
-    return new Transform();
+    return buildPTransform(null);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform(
+      @Nullable PTransform<PCollection<Row>, ? extends POutput> 
errorsTransformer) {
+    return new Transform(errorsTransformer);
   }
 
   @AutoValue
   abstract static class TimestampedFuture {
-    private static TimestampedFuture create(Instant t, Future<Value> f) {
-      return new AutoValue_BeamZetaSqlCalcRel_TimestampedFuture(t, f);
+    private static TimestampedFuture create(Instant t, Future<Value> f, Row r) 
{
+      return new AutoValue_BeamZetaSqlCalcRel_TimestampedFuture(t, f, r);
     }
 
     abstract Instant timestamp();
 
     abstract Future<Value> future();
+
+    abstract Row row();
   }
 
   private class Transform extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
+
+    private final @Nullable PTransform<PCollection<Row>, ? extends POutput> 
errorsTransformer;
+
+    Transform(@Nullable PTransform<PCollection<Row>, ? extends POutput> 
errorsTransformer) {
+      this.errorsTransformer = errorsTransformer;
+    }
+
     @Override
     public PCollection<Row> expand(PCollectionList<Row> pinput) {
       Preconditions.checkArgument(
@@ -135,9 +158,10 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
                 SqlStdOperatorTable.CASE, condition, rex, 
rexBuilder.makeNullLiteral(getRowType()));
       }
 
+      final Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
       BeamSqlPipelineOptions options =
           pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class);
-      Schema outputSchema = CalciteUtils.toSchema(getRowType());
       CalcFn calcFn =
           new CalcFn(
               context.toSql(getProgram(), rex).toSqlString(DIALECT).getSql(),
@@ -145,9 +169,18 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
               upstream.getSchema(),
               outputSchema,
               options.getZetaSqlDefaultTimezone(),
-              options.getVerifyRowValues());
+              options.getVerifyRowValues(),
+              errorsTransformer != null);
+
+      PCollectionTuple tuple =
+          upstream.apply(ParDo.of(calcFn).withOutputTags(rows, 
TupleTagList.of(errors)));
+      tuple.get(errors).setRowSchema(calcFn.errorsSchema);
+
+      if (errorsTransformer != null) {
+        tuple.get(errors).apply(errorsTransformer);
+      }
 
-      return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema);
+      return tuple.get(rows).setRowSchema(outputSchema);
     }
   }
 
@@ -173,6 +206,9 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel 
{
     private final Schema outputSchema;
     private final String defaultTimezone;
     private final boolean verifyRowValues;
+    private final boolean dlqTransformDownstream;
+
+    final Schema errorsSchema;
     private final List<Integer> referencedColumns;
 
     @FieldAccess("row")
@@ -188,7 +224,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel 
{
         Schema inputSchema,
         Schema outputSchema,
         String defaultTimezone,
-        boolean verifyRowValues) {
+        boolean verifyRowValues,
+        boolean dlqTransformDownstream) {
       this.sql = sql;
       this.exp = new PreparedExpression(sql);
       this.nullParams = nullParams;
@@ -196,6 +233,7 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel 
{
       this.outputSchema = outputSchema;
       this.defaultTimezone = defaultTimezone;
       this.verifyRowValues = verifyRowValues;
+      this.dlqTransformDownstream = dlqTransformDownstream;
 
       try (PreparedExpression exp =
           prepareExpression(sql, nullParams, inputSchema, defaultTimezone)) {
@@ -205,6 +243,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel 
{
         }
         this.referencedColumns = columns.build();
         this.fieldAccess = 
FieldAccessDescriptor.withFieldIds(this.referencedColumns);
+        Schema inputRowSchema = SelectHelpers.getOutputSchema(inputSchema, 
fieldAccess);
+        this.errorsSchema = BeamSqlRelUtils.getErrorRowSchema(inputRowSchema);
       }
     }
 
@@ -242,30 +282,42 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
 
     @ProcessElement
     public void processElement(
-        @FieldAccess("row") Row row, @Timestamp Instant t, BoundedWindow w, 
OutputReceiver<Row> r)
+        @FieldAccess("row") Row row,
+        @Timestamp Instant t,
+        BoundedWindow w,
+        OutputReceiver<Row> r,
+        MultiOutputReceiver multiOutputReceiver)
         throws InterruptedException {
-      Map<String, Value> columns = new HashMap<>();
-      for (int i : referencedColumns) {
-        final Field field = inputSchema.getField(i);
-        columns.put(
-            columnName(i),
-            ZetaSqlBeamTranslationUtils.toZetaSqlValue(
-                row.getBaseValue(field.getName(), Object.class), 
field.getType()));
-      }
-
-      @NonNull
-      Future<Value> valueFuture = 
checkArgumentNotNull(stream).execute(columns, nullParams);
 
       @Nullable Queue<TimestampedFuture> pendingWindow = pending.get(w);
       if (pendingWindow == null) {
         pendingWindow = new ArrayDeque<>();
         pending.put(w, pendingWindow);
       }
-      pendingWindow.add(TimestampedFuture.create(t, valueFuture));
+      try {
+        Map<String, Value> columns = new HashMap<>();
+        for (int i : referencedColumns) {
+          final Field field = inputSchema.getField(i);
+          columns.put(
+              columnName(i),
+              ZetaSqlBeamTranslationUtils.toZetaSqlValue(
+                  row.getBaseValue(field.getName(), Object.class), 
field.getType()));
+        }
+        Future<Value> valueFuture = 
checkArgumentNotNull(stream).execute(columns, nullParams);
+        pendingWindow.add(TimestampedFuture.create(t, valueFuture, row));
+
+      } catch (UnsupportedOperationException | ArithmeticException | 
IllegalArgumentException e) {
+        if (!dlqTransformDownstream) {
+          throw e;
+        }
+        multiOutputReceiver
+            .get(errors)
+            .output(Row.withSchema(errorsSchema).addValues(row, 
e.toString()).build());
+      }
 
       while ((!pendingWindow.isEmpty() && 
pendingWindow.element().future().isDone())
           || pendingWindow.size() > MAX_PENDING_WINDOW) {
-        outputRow(pendingWindow.remove(), r);
+        outputRow(pendingWindow.remove(), r, multiOutputReceiver.get(errors));
       }
     }
 
@@ -274,9 +326,12 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
       checkArgumentNotNull(stream).flush();
       for (Map.Entry<BoundedWindow, Queue<TimestampedFuture>> pendingWindow : 
pending.entrySet()) {
         OutputReceiver<Row> rowOutputReciever =
-            new OutputReceiverForFinishBundle(c, pendingWindow.getKey());
+            new OutputReceiverForFinishBundle(c, pendingWindow.getKey(), rows);
+        OutputReceiver<Row> errorOutputReciever =
+            new OutputReceiverForFinishBundle(c, pendingWindow.getKey(), 
errors);
+
         for (TimestampedFuture timestampedFuture : pendingWindow.getValue()) {
-          outputRow(timestampedFuture, rowOutputReciever);
+          outputRow(timestampedFuture, rowOutputReciever, errorOutputReciever);
         }
       }
     }
@@ -288,9 +343,13 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
       private final FinishBundleContext c;
       private final BoundedWindow w;
 
-      private OutputReceiverForFinishBundle(FinishBundleContext c, 
BoundedWindow w) {
+      private final TupleTag<Row> tag;
+
+      private OutputReceiverForFinishBundle(
+          FinishBundleContext c, BoundedWindow w, TupleTag<Row> tag) {
         this.c = c;
         this.w = w;
+        this.tag = tag;
       }
 
       @Override
@@ -300,11 +359,11 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
 
       @Override
       public void outputWithTimestamp(Row output, Instant timestamp) {
-        c.output(output, timestamp, w);
+        c.output(tag, output, timestamp, w);
       }
     }
 
-    private static RuntimeException extractException(ExecutionException e) {
+    private static RuntimeException extractException(Throwable e) {
       try {
         throw checkArgumentNotNull(e.getCause());
       } catch (RuntimeException r) {
@@ -314,12 +373,21 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
       }
     }
 
-    private void outputRow(TimestampedFuture c, OutputReceiver<Row> r) throws 
InterruptedException {
+    private void outputRow(
+        TimestampedFuture c, OutputReceiver<Row> r, OutputReceiver<Row> 
errorOutputReceiver)
+        throws InterruptedException {
       final Value v;
       try {
         v = c.future().get();
       } catch (ExecutionException e) {
-        throw extractException(e);
+        if (!dlqTransformDownstream) {
+          throw extractException(e);
+        }
+        errorOutputReceiver.outputWithTimestamp(
+            Row.withSchema(errorsSchema).addValues(c.row(), 
e.toString()).build(), c.timestamp());
+        return;
+      } catch (Throwable thr) {
+        throw extractException(thr);
       }
       if (!v.isNull()) {
         Row row = ZetaSqlBeamTranslationUtils.toBeamRow(v, outputSchema, 
verifyRowValues);

Reply via email to