This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e8fad9ca2ba Adding DLQ support for ZetaSQL (#25873) e8fad9ca2ba is described below commit e8fad9ca2ba80220b4817ddba95aabc0a48067dd Author: Pablo Estrada <pabl...@users.noreply.github.com> AuthorDate: Fri Mar 17 18:14:03 2023 -0400 Adding DLQ support for ZetaSQL (#25873) --- .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 126 ++++++++++++++++----- 1 file changed, 97 insertions(+), 29 deletions(-) diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java index 744fbd0bcd4..dee87d370e6 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java @@ -37,18 +37,24 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters; import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect; import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamSqlUnparseContext; import org.apache.beam.sdk.schemas.FieldAccessDescriptor; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.SelectHelpers; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptCluster; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelTraitSet; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelNode; @@ -64,7 +70,6 @@ import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.fun.SqlStdO import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -83,6 +88,9 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { private static final int MAX_PENDING_WINDOW = 32; private final BeamSqlUnparseContext context; + private static final TupleTag<Row> rows = new TupleTag<Row>("output") {}; + private static final TupleTag<Row> errors = new TupleTag<Row>("errors") {}; + private static String columnName(int i) { return "_" + i; } @@ -101,21 +109,36 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { @Override public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { - return new Transform(); + return buildPTransform(null); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform( + @Nullable PTransform<PCollection<Row>, ? extends POutput> errorsTransformer) { + return new Transform(errorsTransformer); } @AutoValue abstract static class TimestampedFuture { - private static TimestampedFuture create(Instant t, Future<Value> f) { - return new AutoValue_BeamZetaSqlCalcRel_TimestampedFuture(t, f); + private static TimestampedFuture create(Instant t, Future<Value> f, Row r) { + return new AutoValue_BeamZetaSqlCalcRel_TimestampedFuture(t, f, r); } abstract Instant timestamp(); abstract Future<Value> future(); + + abstract Row row(); } private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> { + + private final @Nullable PTransform<PCollection<Row>, ? extends POutput> errorsTransformer; + + Transform(@Nullable PTransform<PCollection<Row>, ? extends POutput> errorsTransformer) { + this.errorsTransformer = errorsTransformer; + } + @Override public PCollection<Row> expand(PCollectionList<Row> pinput) { Preconditions.checkArgument( @@ -135,9 +158,10 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { SqlStdOperatorTable.CASE, condition, rex, rexBuilder.makeNullLiteral(getRowType())); } + final Schema outputSchema = CalciteUtils.toSchema(getRowType()); + BeamSqlPipelineOptions options = pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class); - Schema outputSchema = CalciteUtils.toSchema(getRowType()); CalcFn calcFn = new CalcFn( context.toSql(getProgram(), rex).toSqlString(DIALECT).getSql(), @@ -145,9 +169,18 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { upstream.getSchema(), outputSchema, options.getZetaSqlDefaultTimezone(), - options.getVerifyRowValues()); + options.getVerifyRowValues(), + errorsTransformer != null); + + PCollectionTuple tuple = + upstream.apply(ParDo.of(calcFn).withOutputTags(rows, TupleTagList.of(errors))); + tuple.get(errors).setRowSchema(calcFn.errorsSchema); + + if (errorsTransformer != null) { + tuple.get(errors).apply(errorsTransformer); + } - return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema); + return tuple.get(rows).setRowSchema(outputSchema); } } @@ -173,6 +206,9 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { private final Schema outputSchema; private final String defaultTimezone; private final boolean verifyRowValues; + private final boolean dlqTransformDownstream; + + final Schema errorsSchema; private final List<Integer> referencedColumns; @FieldAccess("row") @@ -188,7 +224,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { Schema inputSchema, Schema outputSchema, String defaultTimezone, - boolean verifyRowValues) { + boolean verifyRowValues, + boolean dlqTransformDownstream) { this.sql = sql; this.exp = new PreparedExpression(sql); this.nullParams = nullParams; @@ -196,6 +233,7 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { this.outputSchema = outputSchema; this.defaultTimezone = defaultTimezone; this.verifyRowValues = verifyRowValues; + this.dlqTransformDownstream = dlqTransformDownstream; try (PreparedExpression exp = prepareExpression(sql, nullParams, inputSchema, defaultTimezone)) { @@ -205,6 +243,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { } this.referencedColumns = columns.build(); this.fieldAccess = FieldAccessDescriptor.withFieldIds(this.referencedColumns); + Schema inputRowSchema = SelectHelpers.getOutputSchema(inputSchema, fieldAccess); + this.errorsSchema = BeamSqlRelUtils.getErrorRowSchema(inputRowSchema); } } @@ -242,30 +282,42 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { @ProcessElement public void processElement( - @FieldAccess("row") Row row, @Timestamp Instant t, BoundedWindow w, OutputReceiver<Row> r) + @FieldAccess("row") Row row, + @Timestamp Instant t, + BoundedWindow w, + OutputReceiver<Row> r, + MultiOutputReceiver multiOutputReceiver) throws InterruptedException { - Map<String, Value> columns = new HashMap<>(); - for (int i : referencedColumns) { - final Field field = inputSchema.getField(i); - columns.put( - columnName(i), - ZetaSqlBeamTranslationUtils.toZetaSqlValue( - row.getBaseValue(field.getName(), Object.class), field.getType())); - } - - @NonNull - Future<Value> valueFuture = checkArgumentNotNull(stream).execute(columns, nullParams); @Nullable Queue<TimestampedFuture> pendingWindow = pending.get(w); if (pendingWindow == null) { pendingWindow = new ArrayDeque<>(); pending.put(w, pendingWindow); } - pendingWindow.add(TimestampedFuture.create(t, valueFuture)); + try { + Map<String, Value> columns = new HashMap<>(); + for (int i : referencedColumns) { + final Field field = inputSchema.getField(i); + columns.put( + columnName(i), + ZetaSqlBeamTranslationUtils.toZetaSqlValue( + row.getBaseValue(field.getName(), Object.class), field.getType())); + } + Future<Value> valueFuture = checkArgumentNotNull(stream).execute(columns, nullParams); + pendingWindow.add(TimestampedFuture.create(t, valueFuture, row)); + + } catch (UnsupportedOperationException | ArithmeticException | IllegalArgumentException e) { + if (!dlqTransformDownstream) { + throw e; + } + multiOutputReceiver + .get(errors) + .output(Row.withSchema(errorsSchema).addValues(row, e.toString()).build()); + } while ((!pendingWindow.isEmpty() && pendingWindow.element().future().isDone()) || pendingWindow.size() > MAX_PENDING_WINDOW) { - outputRow(pendingWindow.remove(), r); + outputRow(pendingWindow.remove(), r, multiOutputReceiver.get(errors)); } } @@ -274,9 +326,12 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { checkArgumentNotNull(stream).flush(); for (Map.Entry<BoundedWindow, Queue<TimestampedFuture>> pendingWindow : pending.entrySet()) { OutputReceiver<Row> rowOutputReciever = - new OutputReceiverForFinishBundle(c, pendingWindow.getKey()); + new OutputReceiverForFinishBundle(c, pendingWindow.getKey(), rows); + OutputReceiver<Row> errorOutputReciever = + new OutputReceiverForFinishBundle(c, pendingWindow.getKey(), errors); + for (TimestampedFuture timestampedFuture : pendingWindow.getValue()) { - outputRow(timestampedFuture, rowOutputReciever); + outputRow(timestampedFuture, rowOutputReciever, errorOutputReciever); } } } @@ -288,9 +343,13 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { private final FinishBundleContext c; private final BoundedWindow w; - private OutputReceiverForFinishBundle(FinishBundleContext c, BoundedWindow w) { + private final TupleTag<Row> tag; + + private OutputReceiverForFinishBundle( + FinishBundleContext c, BoundedWindow w, TupleTag<Row> tag) { this.c = c; this.w = w; + this.tag = tag; } @Override @@ -300,11 +359,11 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { @Override public void outputWithTimestamp(Row output, Instant timestamp) { - c.output(output, timestamp, w); + c.output(tag, output, timestamp, w); } } - private static RuntimeException extractException(ExecutionException e) { + private static RuntimeException extractException(Throwable e) { try { throw checkArgumentNotNull(e.getCause()); } catch (RuntimeException r) { @@ -314,12 +373,21 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel { } } - private void outputRow(TimestampedFuture c, OutputReceiver<Row> r) throws InterruptedException { + private void outputRow( + TimestampedFuture c, OutputReceiver<Row> r, OutputReceiver<Row> errorOutputReceiver) + throws InterruptedException { final Value v; try { v = c.future().get(); } catch (ExecutionException e) { - throw extractException(e); + if (!dlqTransformDownstream) { + throw extractException(e); + } + errorOutputReceiver.outputWithTimestamp( + Row.withSchema(errorsSchema).addValues(c.row(), e.toString()).build(), c.timestamp()); + return; + } catch (Throwable thr) { + throw extractException(thr); } if (!v.isNull()) { Row row = ZetaSqlBeamTranslationUtils.toBeamRow(v, outputSchema, verifyRowValues);