This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 67b7962f8e7 Fixing issue with ErrorCapture transform where pipeline issues are caused by lack of proper expansion (#25465) 67b7962f8e7 is described below commit 67b7962f8e7c93e7a3e7fefc5911de45c2693644 Author: Pablo Estrada <pabl...@users.noreply.github.com> AuthorDate: Tue Feb 14 10:35:09 2023 -0800 Fixing issue with ErrorCapture transform where pipeline issues are caused by lack of proper expansion (#25465) --- .../sql/expansion/SqlTransformSchemaTransformProvider.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java index 0649a0978e4..7502d0881bb 100644 --- a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java +++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java @@ -37,11 +37,13 @@ import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @@ -126,6 +128,13 @@ public class SqlTransformSchemaTransformProvider implements SchemaTransformProvi @Override public PDone expand(PCollection<Row> input) { + input.apply( + "noop_" + inputs.size(), + MapElements.into(TypeDescriptors.nulls()) + .via( + err -> { + return null; + })); inputs.add(input); return PDone.in(input.getPipeline()); }