(beam) branch master updated: Improve IcebergIO utils (#31958)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2aefd5b9c0c Improve IcebergIO utils (#31958) 2aefd5b9c0c is described below commit 2aefd5b9c0c8a4dcb577d367d859455e4a93ec14 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Jul 31 14:25:14 2024 -0400 Improve IcebergIO utils (#31958) * Improve Iceberg utils * add documentation; clarify variable name * fix kinks, add type tests --- .../org/apache/beam/sdk/io/iceberg/IcebergIO.java | 2 +- ...emaAndRowConversions.java => IcebergUtils.java} | 217 +-- .../apache/beam/sdk/io/iceberg/RecordWriter.java | 4 +- .../org/apache/beam/sdk/io/iceberg/ScanSource.java | 2 +- .../apache/beam/sdk/io/iceberg/ScanTaskReader.java | 4 +- .../apache/beam/sdk/io/iceberg/IcebergIOIT.java| 5 +- .../beam/sdk/io/iceberg/IcebergIOReadTest.java | 8 +- .../beam/sdk/io/iceberg/IcebergIOWriteTest.java| 12 +- .../IcebergReadSchemaTransformProviderTest.java| 8 +- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 676 + .../IcebergWriteSchemaTransformProviderTest.java | 5 +- .../apache/beam/sdk/io/iceberg/ScanSourceTest.java | 6 +- .../io/iceberg/SchemaAndRowConversionsTest.java| 268 .../apache/beam/sdk/io/iceberg/TestFixtures.java | 2 +- 14 files changed, 865 insertions(+), 354 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 50e0ea8b63d..c3c1da7c788 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -137,7 +137,7 @@ public class IcebergIO { .setCatalogConfig(getCatalogConfig()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(tableId) - .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema())) + .setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema())) .build(; } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java similarity index 50% rename from sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java rename to sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index e1a8685614f..a2f84e6475c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -20,36 +20,42 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -class SchemaAndRowConversions { +/** Utilities for converting between Beam and Iceberg types. */ +public class IcebergUtils { + // This is made public for users convenience, as many may have more experience working with + // Iceberg types. - private SchemaAndRowConversions() {} + private IcebergUtils() {} - static final Map BEAM_TYPES_TO_ICEBERG_TYPES = - ImmutableMap.builder() - .put(Schema.FieldType.BOOLEAN, Types.BooleanType.get()) - .put(Schema.FieldType.INT32, Types.IntegerType.get()) - .put(Schema.FieldType.INT64, Types.LongType.get()) - .put(Schema.FieldType.FLOAT, Types.FloatType.get()) - .put(Schema.FieldType.DOUBLE, Types.DoubleType.get()) - .put(Schema.FieldType.STRING, Types.StringType.get()) - .put(Schema.FieldType.BYTES, Types.BinaryType.get()) + private static final Map BEAM_TYPES_TO_ICEBERG_TYPES = + ImmutableMap.builder() +
(beam) branch master updated (835630b12e2 -> b61ef7591fe)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 835630b12e2 Replace Class with TypeDescriptor in SchemaProvider implementations (#31785) add b61ef7591fe update document in AwsOptions (#32036) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java | 8 1 file changed, 4 insertions(+), 4 deletions(-)
(beam) branch master updated: Update Dataflow containers (#31944)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 53a804e25b5 Update Dataflow containers (#31944) 53a804e25b5 is described below commit 53a804e25b5baf901cd66f605f3cf2724a91c8c8 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Jul 22 13:31:03 2024 -0400 Update Dataflow containers (#31944) --- runners/google-cloud-dataflow-java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 053d44f0074..177f8acf804 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -52,7 +52,7 @@ evaluationDependsOn(":sdks:java:container:java11") ext.dataflowLegacyEnvironmentMajorVersion = '8' ext.dataflowFnapiEnvironmentMajorVersion = '8' ext.dataflowLegacyContainerVersion = 'beam-master-20240718' -ext.dataflowFnapiContainerVersion = 'beam-master-20240716' +ext.dataflowFnapiContainerVersion = 'beam-master-20240718' ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3' processResources {
(beam) branch master updated: Use default auth for Iceberg integration tests (#31940)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 12ad2afd2d6 Use default auth for Iceberg integration tests (#31940) 12ad2afd2d6 is described below commit 12ad2afd2d667d6897680b01fe555477cde3742d Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Fri Jul 19 15:32:59 2024 -0400 Use default auth for Iceberg integration tests (#31940) --- .github/trigger_files/IO_Iceberg_Integration_Tests.json| 2 +- .../src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 1efc8e9e440..3f63c0c9975 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", -"modification": 1 +"modification": 2 } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 0420e2f5779..1c5686bfde9 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -108,8 +108,7 @@ public class IcebergIOIT implements Serializable { catalogHadoopConf = new Configuration(); catalogHadoopConf.set("fs.gs.project.id", options.getProject()); -catalogHadoopConf.set( -"fs.gs.auth.service.account.json.keyfile", System.getenv("GOOGLE_APPLICATION_CREDENTIALS")); +catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT"); } @Before
(beam) branch master updated: Update Dataflow containers (#31936)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 8760a677e81 Update Dataflow containers (#31936) 8760a677e81 is described below commit 8760a677e810b69832e16a9e14fd88dd9c779907 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Jul 18 16:17:19 2024 -0400 Update Dataflow containers (#31936) --- runners/google-cloud-dataflow-java/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index c9fa85e4165..d01fca437b9 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -51,8 +51,8 @@ evaluationDependsOn(":sdks:java:container:java11") ext.dataflowLegacyEnvironmentMajorVersion = '8' ext.dataflowFnapiEnvironmentMajorVersion = '8' -ext.dataflowLegacyContainerVersion = 'beam-master-20240306' -ext.dataflowFnapiContainerVersion = 'beam-master-20240306' +ext.dataflowLegacyContainerVersion = 'beam-master-20240718' +ext.dataflowFnapiContainerVersion = 'beam-master-20240716' ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3' processResources {
(beam) branch master updated (2d6d55b98ce -> f3e6c66c0a5)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 2d6d55b98ce Fix generateYamlDocs gradle task (#31909) add f3e6c66c0a5 Improve performance of BigQueryIO connector when withPropagateSuccessfulStorageApiWrites(true) is used (#31840) No new revisions were added by this update. Summary of changes: CHANGES.md | 1 + .../bigquery/StorageApiWriteUnshardedRecords.java | 39 +- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 2 +- 3 files changed, 26 insertions(+), 16 deletions(-)
(beam) branch master updated: [CsvIO] Create CsvIOStringToCsvRecord Class (#31857)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ea761115998 [CsvIO] Create CsvIOStringToCsvRecord Class (#31857) ea761115998 is described below commit ea7611159989c64e2190604f5c50e9457c4ca2d8 Author: lahariguduru <108150650+laharigud...@users.noreply.github.com> AuthorDate: Mon Jul 15 21:50:53 2024 + [CsvIO] Create CsvIOStringToCsvRecord Class (#31857) * Create CsvIOStringToCsvRecord class * Create CsvIOStringToCsvRecord Class * Create CsvIOStringToCsvRecord Class * Create CsvIOStringToCsvRecord Class * Fixed BadRecord Output * Make class final - Co-authored-by: Lahari Guduru --- .../beam/sdk/io/csv/CsvIOStringToCsvRecord.java| 61 + .../sdk/io/csv/CsvIOStringToCsvRecordTest.java | 143 + 2 files changed, 204 insertions(+) diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java new file mode 100644 index 000..995052bf7f7 --- /dev/null +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import java.io.IOException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; + +/** + * {@link CsvIOStringToCsvRecord} is a class that takes a {@link PCollection} input and + * outputs a {@link PCollection} with potential {@link PCollection} for + * targeted error detection. + */ +final class CsvIOStringToCsvRecord +extends PTransform, PCollection>> { + private final CSVFormat csvFormat; + + CsvIOStringToCsvRecord(CSVFormat csvFormat) { +this.csvFormat = csvFormat; + } + + /** + * Creates {@link PCollection} from {@link PCollection} for future processing + * to Row or custom type. + */ + @Override + public PCollection> expand(PCollection input) { +return input.apply(ParDo.of(new ProcessLineToRecordFn())); + } + + /** Processes each line in order to convert it to a {@link CSVRecord}. */ + private class ProcessLineToRecordFn extends DoFn> { +@ProcessElement +public void process(@Element String line, OutputReceiver> receiver) +throws IOException { + for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) { +receiver.output(record); + } +} + } +} diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java new file mode 100644 index 000..44db791cbee --- /dev/null +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package or
(beam) branch master updated: Increase retry backoff for Storage API batch to survive AppendRows quota refill (#31837)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 0b61035f36f Increase retry backoff for Storage API batch to survive AppendRows quota refill (#31837) 0b61035f36f is described below commit 0b61035f36fb099a8dcd39978c71779a2e81f957 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Jul 15 15:23:04 2024 -0400 Increase retry backoff for Storage API batch to survive AppendRows quota refill (#31837) * Increase retry backoff for Storage API batch * longer waits for quota error only * cleanup * add to CHANGES.md * no need for quota backoff. just increase allowed retries * cleanup --- CHANGES.md | 3 ++- .../beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index fc94877a2bb..243596e6f2e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ * Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)). * Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721)) +* [BigQueryIO] Better handling for batch Storage Write API when it hits AppendRows throughput quota ([#31837](https://github.com/apache/beam/pull/31837)) * [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726)) * Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)). * Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)]) @@ -83,7 +84,7 @@ ## Bugfixes -* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) +* [BigQueryIO] Fixed a bug in batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 21c1d961e84..f0c4a56ed3d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -771,7 +771,7 @@ public class StorageApiWriteUnshardedRecords invalidateWriteStream(); allowedRetry = 5; } else { -allowedRetry = 10; +allowedRetry = 35; } // Maximum number of times we retry before we fail the work item.
(beam) branch master updated: Add options to control number of Storage API connections when using multiplexing (#31721)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 4d429dde47b Add options to control number of Storage API connections when using multiplexing (#31721) 4d429dde47b is described below commit 4d429dde47b570ccabba6b86173eb2546f76a5f2 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Fri Jul 12 14:46:37 2024 -0400 Add options to control number of Storage API connections when using multiplexing (#31721) * add options to set min and max connections to connection management pool; rename counter to be more accurate * add multiplexing description * add to CHANGES.md * clarify documentation and address comments * adjust description * add details --- CHANGES.md | 2 ++ .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 8 +++ .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 27 ++ .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 9 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 96c436d89ec..fc94877a2bb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,7 @@ ## New Features / Improvements * Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)). +* Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721)) * [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726)) * Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)). * Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)]) @@ -82,6 +83,7 @@ ## Bugfixes +* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index 5a12e81ea79..7505f77fb5b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -40,8 +40,8 @@ import org.apache.beam.sdk.metrics.Metrics; */ @AutoValue abstract class AppendClientInfo { - private final Counter activeConnections = - Metrics.counter(AppendClientInfo.class, "activeConnections"); + private final Counter activeStreamAppendClients = + Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients"); abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient(); @@ -123,7 +123,7 @@ abstract class AppendClientInfo { writeStreamService.getStreamAppendClient( streamName, getDescriptor(), useConnectionPool, missingValueInterpretation); - activeConnections.inc(); + activeStreamAppendClients.inc(); return toBuilder().setStreamName(streamName).setStreamAppendClient(client).build(); } @@ -133,7 +133,7 @@ abstract class AppendClientInfo { BigQueryServices.StreamAppendClient client = getStreamAppendClient(); if (client != null) { getCloseAppendClient().accept(client); - activeConnections.dec(); + activeStreamAppendClients.dec(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index cd1fc6d3842..ba76f483f77 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -109,6 +109,28 @@ public interface BigQueryOptions void setNumStorageWriteApiStreamAppendClients(Integer value); + @Description( + "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), " + + "this option sets the minimum number of connections each pool creates before any connections are shared. This is " +
(beam) branch release-2.58.0 updated: add doc warning against using icebergio directly (#31834)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch release-2.58.0 in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/release-2.58.0 by this push: new 6e6f28027bc add doc warning against using icebergio directly (#31834) 6e6f28027bc is described below commit 6e6f28027bcd875733f20ee1be303815d0227eb7 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Jul 10 14:33:25 2024 -0400 add doc warning against using icebergio directly (#31834) --- .../src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java | 8 1 file changed, 8 insertions(+) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 75a35e6f8a3..50e0ea8b63d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; import java.util.Arrays; import java.util.List; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; @@ -33,6 +34,13 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; +/** + * The underlying Iceberg connector used by {@link org.apache.beam.sdk.managed.Managed#ICEBERG}. Not + * intended to be used directly. + * + * For internal use only; no backwards compatibility guarantees + */ +@Internal public class IcebergIO { public static WriteRows writeRows(IcebergCatalogConfig catalog) {
(beam) branch master updated: add doc warning against using icebergio directly (#31833)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new dd0912460c4 add doc warning against using icebergio directly (#31833) dd0912460c4 is described below commit dd0912460c4e106cc27ef4547d00ace07235431e Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Jul 10 14:01:39 2024 -0400 add doc warning against using icebergio directly (#31833) --- .../src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java | 8 1 file changed, 8 insertions(+) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 75a35e6f8a3..50e0ea8b63d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; import java.util.Arrays; import java.util.List; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; @@ -33,6 +34,13 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; +/** + * The underlying Iceberg connector used by {@link org.apache.beam.sdk.managed.Managed#ICEBERG}. Not + * intended to be used directly. + * + * For internal use only; no backwards compatibility guarantees + */ +@Internal public class IcebergIO { public static WriteRows writeRows(IcebergCatalogConfig catalog) {
(beam) branch master updated: Pass-through IcebergIO catalog properties (#31726)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ac423af5699 Pass-through IcebergIO catalog properties (#31726) ac423af5699 is described below commit ac423af5699682c5519ae8d1a3af035ba7a5eab7 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Jul 8 13:15:03 2024 -0400 Pass-through IcebergIO catalog properties (#31726) * Pass-through iceberg catalog properties * add to CHANGES.md * trigger integration test * remove custom configuration; pass catalog name --- .../IO_Iceberg_Integration_Tests.json | 3 +- CHANGES.md | 2 + .../beam/sdk/io/iceberg/IcebergCatalogConfig.java | 197 + .../IcebergReadSchemaTransformProvider.java| 36 ++-- .../IcebergSchemaTransformCatalogConfig.java | 107 --- .../IcebergWriteSchemaTransformProvider.java | 42 ++--- .../apache/beam/sdk/io/iceberg/IcebergIOIT.java| 16 +- .../beam/sdk/io/iceberg/IcebergIOReadTest.java | 11 +- .../beam/sdk/io/iceberg/IcebergIOWriteTest.java| 31 ++-- .../IcebergReadSchemaTransformProviderTest.java| 34 ++-- .../IcebergSchemaTransformTranslationTest.java | 49 +++-- .../IcebergWriteSchemaTransformProviderTest.java | 34 ++-- .../apache/beam/sdk/io/iceberg/ScanSourceTest.java | 28 ++- 13 files changed, 152 insertions(+), 438 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index a03c067d2c4..1efc8e9e440 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,3 +1,4 @@ { -"comment": "Modify this file in a trivial way to cause this test suite to run" +"comment": "Modify this file in a trivial way to cause this test suite to run", +"modification": 1 } diff --git a/CHANGES.md b/CHANGES.md index 0a620038f11..85f1be48cfb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,12 +67,14 @@ ## New Features / Improvements * Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)). +* [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726)) * Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)). * Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)]) ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog properties in a key-store fashion ([#31726](https://github.com/apache/beam/pull/31726)) ## Deprecations diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index fefef4aa491..2956d75a266 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -19,214 +19,35 @@ package org.apache.beam.sdk.io.iceberg; import com.google.auto.value.AutoValue; import java.io.Serializable; -import javax.annotation.Nullable; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import java.util.Properties; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.checkerframework.dataflow.qual.Pure; @AutoValue public abstract class IcebergCatalogConfig implements Serializable { - - @Pure - public abstract String getName(); - - /* Core Properties */ - @Pure - public abstract @Nullable String getIcebergCatalogType(); - - @Pure - public abstract @Nullable String getCatalogImplementation(); - - @Pure - public abstract @Nullable String getFileIOImplementation(); - - @Pure - public abstract @Nullable String getWarehouseLocation(); - - @Pure - public abstract @Nullable String getMetricsReporterImplementation(); - - /* Caching */ - @Pure - public abstract boolean getCacheEnabled(); - - @Pure - public abstract boolean getCacheCaseSensitive(); - - @Pure - public abstract long getCacheExpirationIntervalMillis(); - - @Pure - public
(beam) branch master updated: Properly close Storage API batch connections (#31710)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 20aa916931f Properly close Storage API batch connections (#31710) 20aa916931f is described below commit 20aa916931f70d3bc75c5a22bc70bed0099c549e Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Fri Jun 28 15:52:06 2024 -0400 Properly close Storage API batch connections (#31710) * properly close connections; add active connection counter * only invalidate stream at teardown for PENDING type * cleanup --- .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java| 19 +-- .../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 5 + 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index 211027c12b0..5a12e81ea79 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -30,6 +30,8 @@ import com.google.protobuf.Message; import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.Nullable; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; /** * Container class used by {@link StorageApiWritesShardedRecords} and {@link @@ -38,6 +40,9 @@ import javax.annotation.Nullable; */ @AutoValue abstract class AppendClientInfo { + private final Counter activeConnections = + Metrics.counter(AppendClientInfo.class, "activeConnections"); + abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient(); abstract TableSchema getTableSchema(); @@ -114,12 +119,13 @@ abstract class AppendClientInfo { return this; } else { String streamName = getStreamName.get(); - return toBuilder() - .setStreamName(streamName) - .setStreamAppendClient( - writeStreamService.getStreamAppendClient( - streamName, getDescriptor(), useConnectionPool, missingValueInterpretation)) - .build(); + BigQueryServices.StreamAppendClient client = + writeStreamService.getStreamAppendClient( + streamName, getDescriptor(), useConnectionPool, missingValueInterpretation); + + activeConnections.inc(); + + return toBuilder().setStreamName(streamName).setStreamAppendClient(client).build(); } } @@ -127,6 +133,7 @@ abstract class AppendClientInfo { BigQueryServices.StreamAppendClient client = getStreamAppendClient(); if (client != null) { getCloseAppendClient().accept(client); + activeConnections.dec(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8af88f8f7dc..ce5e7b4854e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -334,6 +334,11 @@ public class StorageApiWriteUnshardedRecords if (client != null) { runAsyncIgnoreFailure(closeWriterExecutor, client::unpin); } + // if this is a PENDING stream, we won't be using it again after cleaning up this + // destination state, so clear it from the cache + if (!useDefaultStream) { +APPEND_CLIENTS.invalidate(streamName); + } appendClientInfo = null; } }
(beam) branch master updated: Fix nullable array issue 31674 in AvroGenericRecordToStorageApiProto (#31675)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 911c525348b Fix nullable array issue 31674 in AvroGenericRecordToStorageApiProto (#31675) 911c525348b is described below commit 911c525348b81afd48a414018ba579c3b78e3262 Author: codertimu AuthorDate: Tue Jun 25 22:30:18 2024 +0200 Fix nullable array issue 31674 in AvroGenericRecordToStorageApiProto (#31675) * Handle nullable array in fieldDescriptorFromAvroField function. * rename test method. * apply spotless * Add test case with null array. Allow null value for union with a null type. - Co-authored-by: Ahmet Timucin --- .../AvroGenericRecordToStorageApiProto.java| 5 +- .../AvroGenericRecordToStorageApiProtoTest.java| 99 ++ 2 files changed, 86 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index 27e75176e57..0b7e17b8909 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -277,6 +277,7 @@ public class AvroGenericRecordToStorageApiProto { builder = builder .setType(unionFieldSchema.getType()) +.setMode(unionFieldSchema.getMode()) .addAllFields(unionFieldSchema.getFieldsList()); break; default: @@ -311,7 +312,9 @@ public class AvroGenericRecordToStorageApiProto { FieldDescriptor fieldDescriptor, Schema.Field avroField, String name, GenericRecord record) { @Nullable Object value = record.get(name); if (value == null) { - if (fieldDescriptor.isOptional()) { + if (fieldDescriptor.isOptional() + || avroField.schema().getTypes().stream() + .anyMatch(t -> t.getType() == Schema.Type.NULL)) { return null; } else { throw new IllegalArgumentException( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index 6f31831db13..6a59afeed82 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -31,6 +31,7 @@ import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -268,24 +269,32 @@ public class AvroGenericRecordToStorageApiProtoTest { .noDefault() .endRecord(); - private static final Schema SCHEMA_WITH_MAP; + private static final Schema SCHEMA_WITH_MAP = + SchemaBuilder.record("TestMap") + .fields() + .name("nested") + .type() + .optional() + .type(BASE_SCHEMA) + .name("aMap") + .type() + .map() + .values() + .stringType() + .mapDefault(ImmutableMap.builder().put("key1", "value1").build()) + .endRecord(); - static { -SCHEMA_WITH_MAP = -SchemaBuilder.record("TestMap") -.fields() -.name("nested") -.type() -.optional() -.type(BASE_SCHEMA) -.name("aMap") -.type() -.map() -.values() -.stringType() -.mapDefault(ImmutableMap.builder().put("key1", "value1").build()) -.endRecord(); - } + private static final Schema SCHEMA_WITH_NULLABLE_ARRAY = + SchemaBuilder.record("TestNullableArray") + .fields() + .name("aNullableArray") + .type() + .nullable() + .array() + .items() + .stringType() + .noDefault() + .endRecord(); private static GenericRecord baseRecord; private static GenericRecord logicalTypesRecord; @@ -567,4 +576,60 @@ public
(beam) branch master updated: Replace getElementType with getValueType for MAP in AvroGenericRecord… (#31653)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 0f2e1963987 Replace getElementType with getValueType for MAP in AvroGenericRecord… (#31653) 0f2e1963987 is described below commit 0f2e1963987f1fbb3329016d8c862639ed4fbe43 Author: codertimu AuthorDate: Sun Jun 23 04:44:13 2024 +0200 Replace getElementType with getValueType for MAP in AvroGenericRecord… (#31653) --- .../AvroGenericRecordToStorageApiProto.java| 10 ++-- .../AvroGenericRecordToStorageApiProtoTest.java| 62 ++ 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index 519f9391db6..27e75176e57 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -247,17 +247,15 @@ public class AvroGenericRecordToStorageApiProto { break; case MAP: Schema keyType = Schema.create(Schema.Type.STRING); -Schema valueType = TypeWithNullability.create(schema.getElementType()).getType(); +Schema valueType = Schema.create(schema.getValueType().getType()); if (valueType == null) { throw new RuntimeException("Unexpected null element type!"); } TableFieldSchema keyFieldSchema = -fieldDescriptorFromAvroField( -new Schema.Field("key", keyType, "key of the map entry", Schema.Field.NULL_VALUE)); +fieldDescriptorFromAvroField(new Schema.Field("key", keyType, "key of the map entry")); TableFieldSchema valueFieldSchema = fieldDescriptorFromAvroField( -new Schema.Field( -"value", valueType, "value of the map entry", Schema.Field.NULL_VALUE)); +new Schema.Field("value", valueType, "value of the map entry")); builder = builder .setType(TableFieldSchema.Type.STRUCT) @@ -346,7 +344,7 @@ public class AvroGenericRecordToStorageApiProto { return toProtoValue(fieldDescriptor, type.getType(), value); case MAP: Map map = (Map) value; -Schema valueType = TypeWithNullability.create(avroSchema.getElementType()).getType(); +Schema valueType = Schema.create(avroSchema.getValueType().getType()); if (valueType == null) { throw new RuntimeException("Unexpected null element type!"); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index 3a4dcb02ebd..6f31831db13 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -32,6 +32,8 @@ import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; @@ -266,6 +268,25 @@ public class AvroGenericRecordToStorageApiProtoTest { .noDefault() .endRecord(); + private static final Schema SCHEMA_WITH_MAP; + + static { +SCHEMA_WITH_MAP = +SchemaBuilder.record("TestMap") +.fields() +.name("nested") +.type() +.optional() +.type(BASE_SCHEMA) +.name("aMap") +.type() +.map() +.values() +.stringType() +.mapDefault(ImmutableMap.builder().put("key1", "value1").build()) +.endRecord(); + } + private static GenericRecord baseRecord; private static GenericRecord logicalTypesRecord; private static Map baseProtoExpectedFields; @@ -505,4 +526,45 @@ public class AvroGenericRecordToStorageApiProtoTest { assertEquals(7, msg.getAllFields().size()); assertBaseRecord(msg, logicalTypesProtoExpectedFields); } + + @Test + public void testMessageFr
(beam) branch master updated (47736c36a45 -> e5a5ea93e10)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 47736c36a45 Migrate Gradle Enterprise Plugin to Develocity Plugin (#31577) add e5a5ea93e10 Update KafkaIO's docstring to match current implementation (#31496) No new revisions were added by this update. Summary of changes: .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 40 ++ 1 file changed, 26 insertions(+), 14 deletions(-)
(beam) branch master updated: Add missing import (#31533)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 8c6e1a4654b Add missing import (#31533) 8c6e1a4654b is described below commit 8c6e1a4654bced7bf732e1a46092908580da561b Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Jun 6 09:53:45 2024 -0400 Add missing import (#31533) --- .../apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index f0e0fc9a3d0..19c336e1d24 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
(beam) branch release-2.57.0 updated: fix managed doc (#31521)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch release-2.57.0 in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/release-2.57.0 by this push: new 6cbf144f21b fix managed doc (#31521) 6cbf144f21b is described below commit 6cbf144f21b46a1b3d181ea340eba8ef2d3d21bc Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Jun 5 19:34:05 2024 -0400 fix managed doc (#31521) --- .../managed/src/main/java/org/apache/beam/sdk/managed/Managed.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 6f95290e6ee..911e25cdda1 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -62,7 +62,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta * } * * Instead of specifying configuration arguments directly in the code, one can provide the - * location to a YAML file that contains this information. Say we have the following YAML file: + * location to a YAML file that contains this information. Say we have the following {@code + * config.yaml} file: * * {@code * foo: "abc" @@ -74,7 +75,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta * {@code * PCollection inputRows = pipeline.apply(Create.of(...)); * - * input.apply(Managed.write(ICEBERG).withConfigUrl()); + * inputRows.apply(Managed.write(ICEBERG).withConfigUrl("path/to/config.yaml")); * } */ public class Managed {
(beam) branch release-2.57.0 updated: Cherrypick (#31362) kafka schematransform translation (#31518)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch release-2.57.0 in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/release-2.57.0 by this push: new a36857f10a3 Cherrypick (#31362) kafka schematransform translation (#31518) a36857f10a3 is described below commit a36857f10a3314b7dac1445373a6d98dd3c85a31 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Jun 5 16:37:24 2024 -0400 Cherrypick (#31362) kafka schematransform translation (#31518) * kafka schematransform translation and tests * cleanup * spotless * address failing tests * switch existing schematransform tests to use Managed API * fix nullness * add some more mappings * fix mapping * typo * more accurate test name * cleanup after merging snake_case PR * spotless --- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- .../KafkaReadSchemaTransformConfiguration.java | 6 + .../io/kafka/KafkaReadSchemaTransformProvider.java | 260 +++-- .../io/kafka/KafkaSchemaTransformTranslation.java | 93 .../kafka/KafkaWriteSchemaTransformProvider.java | 16 ++ .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java| 45 ++-- .../KafkaReadSchemaTransformProviderTest.java | 49 ++-- .../kafka/KafkaSchemaTransformTranslationTest.java | 216 + 8 files changed, 522 insertions(+), 165 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 8f995a63a10..35aabbbfd97 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -2665,7 +2665,7 @@ public class KafkaIO { abstract Builder setProducerFactoryFn( @Nullable SerializableFunction, Producer> fn); - abstract Builder setKeySerializer(Class> serializer); + abstract Builder setKeySerializer(@Nullable Class> serializer); abstract Builder setValueSerializer(Class> serializer); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java index 13f5249a6c3..693c1371f78 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java @@ -149,6 +149,10 @@ public abstract class KafkaReadSchemaTransformConfiguration { /** Sets the topic from which to read. */ public abstract String getTopic(); + @SchemaFieldDescription("Upper bound of how long to read from Kafka.") + @Nullable + public abstract Integer getMaxReadTimeSeconds(); + @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.") @Nullable public abstract ErrorHandling getErrorHandling(); @@ -179,6 +183,8 @@ public abstract class KafkaReadSchemaTransformConfiguration { /** Sets the topic from which to read. */ public abstract Builder setTopic(String value); +public abstract Builder setMaxReadTimeSeconds(Integer maxReadTimeSeconds); + public abstract Builder setErrorHandling(ErrorHandling errorHandling); /** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */ diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 13240ea9dc4..b2eeb1a54d1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import com.google.auto.service.AutoService; import java.io.FileOutputStream; import java.io.IOException; @@ -38,7 +40,9 @@ import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.Convert; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.S
(beam) branch master updated: fix managed doc (#31517)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 4098fce785c fix managed doc (#31517) 4098fce785c is described below commit 4098fce785c3f3553f1a54ce593224ac3fd117b4 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Jun 5 15:25:03 2024 -0400 fix managed doc (#31517) --- .../managed/src/main/java/org/apache/beam/sdk/managed/Managed.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 6f95290e6ee..911e25cdda1 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -62,7 +62,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta * } * * Instead of specifying configuration arguments directly in the code, one can provide the - * location to a YAML file that contains this information. Say we have the following YAML file: + * location to a YAML file that contains this information. Say we have the following {@code + * config.yaml} file: * * {@code * foo: "abc" @@ -74,7 +75,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta * {@code * PCollection inputRows = pipeline.apply(Create.of(...)); * - * input.apply(Managed.write(ICEBERG).withConfigUrl()); + * inputRows.apply(Managed.write(ICEBERG).withConfigUrl("path/to/config.yaml")); * } */ public class Managed {
(beam) branch master updated (9c9de4919b5 -> f2931d31246)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 9c9de4919b5 Update Go container build version to 1.21.11 (#31515) add f2931d31246 Kafka SchemaTransform translation (#31362) No new revisions were added by this update. Summary of changes: .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- .../KafkaReadSchemaTransformConfiguration.java | 6 + .../io/kafka/KafkaReadSchemaTransformProvider.java | 260 +++-- .../io/kafka/KafkaSchemaTransformTranslation.java} | 45 ++-- .../kafka/KafkaWriteSchemaTransformProvider.java | 16 ++ .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java| 45 ++-- .../KafkaReadSchemaTransformProviderTest.java | 48 ++-- .../kafka/KafkaSchemaTransformTranslationTest.java | 216 + 8 files changed, 453 insertions(+), 185 deletions(-) copy sdks/java/io/{iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java => kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslation.java} (60%) create mode 100644 sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java
(beam) branch master updated: Simplify Managed API to avoid dealing with PCollectionRowTuple (#31470)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 7ea8cd2608e Simplify Managed API to avoid dealing with PCollectionRowTuple (#31470) 7ea8cd2608e is described below commit 7ea8cd2608e1b7550ffca44aff5596dd43fd4aa6 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Tue Jun 4 19:02:00 2024 -0400 Simplify Managed API to avoid dealing with PCollectionRowTuple (#31470) * Managed accepts PInput type * add unit test * spotless * spotless * rename to getSinglePCollection --- .../beam/sdk/values/PCollectionRowTuple.java | 17 .../apache/beam/sdk/io/iceberg/IcebergIOIT.java| 10 ++--- .../IcebergReadSchemaTransformProviderTest.java| 4 +- .../IcebergWriteSchemaTransformProviderTest.java | 14 +++ .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- .../KafkaReadSchemaTransformProviderTest.java | 4 +- .../KafkaWriteSchemaTransformProviderTest.java | 7 +--- .../java/org/apache/beam/sdk/managed/Managed.java | 46 +- .../sdk/managed/ManagedTransformConstants.java | 3 ++ .../ManagedSchemaTransformTranslationTest.java | 3 +- .../org/apache/beam/sdk/managed/ManagedTest.java | 34 ++-- 11 files changed, 104 insertions(+), 40 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionRowTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionRowTuple.java index 0e7c52c4ae7..a2a3aa74e53 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionRowTuple.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionRowTuple.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Objects; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; @@ -180,6 +181,22 @@ public class PCollectionRowTuple implements PInput, POutput { return pcollection; } + /** + * Like {@link #get(String)}, but is a convenience method to get a single PCollection without + * providing a tag for that output. Use only when there is a single collection in this tuple. + * + * Throws {@link IllegalStateException} if more than one output exists in the {@link + * PCollectionRowTuple}. + */ + public PCollection getSinglePCollection() { +Preconditions.checkState( +pcollectionMap.size() == 1, +"Expected exactly one output PCollection, but found %s. " ++ "Please try retrieving a specified output using get() instead.", +pcollectionMap.size()); +return get(pcollectionMap.entrySet().iterator().next().getKey()); + } + /** * Returns an immutable Map from tag to corresponding {@link PCollection}, for all the members of * this {@link PCollectionRowTuple}. diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 06a63909c12..467a2cbaf24 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; @@ -216,11 +215,10 @@ public class IcebergIOIT implements Serializable { .build()) .build(); -PCollectionRowTuple output = -PCollectionRowTuple.empty(readPipeline) -.apply(Managed.read(Managed.ICEBERG).withConfig(config)); +PCollection rows = + readPipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); -PAssert.that(output.get("output")).containsInAnyOrder(expectedRows); +PAssert.that(rows).containsInAnyOrder(expectedRows); readPipeline.run().waitUntilFinish(); } @@ -258,7 +256,7 @@ public class IcebergIOIT implements Serializable { .build(); PCollection input = writePipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); -PCollectionRowTuple.of("input"
(beam) branch master updated: Support unknown repeated STRUCTs (#31447)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 1fea5f797e9 Support unknown repeated STRUCTs (#31447) 1fea5f797e9 is described below commit 1fea5f797e99f2651fd9b2fe147ce93a25f8a60d Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Tue Jun 4 18:09:04 2024 -0400 Support unknown repeated STRUCTs (#31447) * cleanup TableRow of unkonwns for structs * set nested unknown value as an arraylist if it's repeated * spotless * add another test --- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 35 +++--- .../bigquery/TableRowToStorageApiProtoTest.java| 77 -- 2 files changed, 96 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 07457e72050..c1f452ba93f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -50,6 +50,7 @@ import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.format.DateTimeParseException; import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -484,11 +485,11 @@ public class TableRowToStorageApiProto { throws SchemaConversionException { DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); for (final Map.Entry entry : map.entrySet()) { - @Nullable - FieldDescriptor fieldDescriptor = descriptor.findFieldByName(entry.getKey().toLowerCase()); + String key = entry.getKey().toLowerCase(); + @Nullable FieldDescriptor fieldDescriptor = descriptor.findFieldByName(key); if (fieldDescriptor == null) { if (unknownFields != null) { - unknownFields.set(entry.getKey().toLowerCase(), entry.getValue()); + unknownFields.set(key, entry.getValue()); } if (ignoreUnknownValues) { continue; @@ -505,12 +506,19 @@ public class TableRowToStorageApiProto { schemaInformation.getSchemaForField(entry.getKey()); try { Supplier<@Nullable TableRow> getNestedUnknown = -() -> -(unknownFields == null) -? null -: (TableRow) -unknownFields.computeIfAbsent( -entry.getKey().toLowerCase(), k -> new TableRow()); +() -> { + if (unknownFields == null) { +return null; + } + TableRow nestedUnknown = new TableRow(); + if (fieldDescriptor.isRepeated()) { +((List) +(unknownFields.computeIfAbsent(key, k -> new ArrayList( +.add(nestedUnknown); +return nestedUnknown; + } + return (TableRow) unknownFields.computeIfAbsent(key, k -> nestedUnknown); +}; @Nullable Object value = @@ -524,6 +532,15 @@ public class TableRowToStorageApiProto { if (value != null) { builder.setField(fieldDescriptor, value); } +// For STRUCT fields, we add a placeholder to unknownFields using the getNestedUnknown +// supplier (in case we encounter unknown nested fields). If the placeholder comes out +// to be empty, we should clean it up +if (fieldSchemaInformation.getType().equals(TableFieldSchema.Type.STRUCT) +&& unknownFields != null +&& unknownFields.get(key) instanceof Map +&& ((Map) unknownFields.get(key)).isEmpty()) { + unknownFields.remove(key); +} } catch (Exception e) { throw new SchemaDoesntMatchException( "Problem converting field " diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index 847ede9df36..6a98dad55a6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -40,6 +40,8 @@ impor
(beam) branch master updated (a7f5898f885 -> b4b1cc05b5d)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from a7f5898f885 Default SchemaTransform configs to snake_case (#31374) add b4b1cc05b5d Fix iceberg catalog validation (#31349) No new revisions were added by this update. Summary of changes: .../beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java| 2 +- .../src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java | 1 - .../beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java | 3 +-- .../beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java | 6 ++ .../sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java | 3 +-- 5 files changed, 5 insertions(+), 10 deletions(-)
(beam) branch master updated: Default SchemaTransform configs to snake_case (#31374)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a7f5898f885 Default SchemaTransform configs to snake_case (#31374) a7f5898f885 is described below commit a7f5898f8850e5d110ccb299830c12a4f0807a73 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Tue Jun 4 16:29:50 2024 -0400 Default SchemaTransform configs to snake_case (#31374) * default schematransform configs to snake_case * add to CHANGES.md * update Go's bigtable wrapper to export snake_case param names * make more yaml snake_case changes --- .../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- CHANGES.md | 9 +++ sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go | 6 +- .../transforms/TypedSchemaTransformProvider.java | 46 --- .../TypedSchemaTransformProviderTest.java | 8 +- .../IcebergReadSchemaTransformProvider.java| 12 --- .../IcebergWriteSchemaTransformProvider.java | 11 --- .../KafkaReadSchemaTransformProviderTest.java | 16 ++-- .../managed/ManagedSchemaTransformProvider.java| 11 --- .../sdk/managed/ManagedTransformConstants.java | 21 +- .../ManagedSchemaTransformProviderTest.java| 12 +-- .../ManagedSchemaTransformTranslationTest.java | 6 +- .../org/apache/beam/sdk/managed/ManagedTest.java | 2 +- .../managed/src/test/resources/test_config.yaml| 4 +- sdks/python/apache_beam/io/gcp/bigquery.py | 14 ++-- sdks/python/apache_beam/io/gcp/bigtableio.py | 12 +-- .../transforms/external_transform_provider.py | 35 + .../external_transform_provider_it_test.py | 22 -- sdks/python/apache_beam/yaml/standard_io.yaml | 88 +++--- .../apache_beam/yaml/standard_providers.yaml | 8 +- sdks/python/apache_beam/yaml/yaml_provider.py | 2 +- sdks/python/gen_xlang_wrappers.py | 21 +- 22 files changed, 141 insertions(+), 227 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index b268323..e3d6056a5de 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 1 } diff --git a/CHANGES.md b/CHANGES.md index 91bdfef6916..1aee8283bcb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -88,6 +88,15 @@ This new implementation still supports all (immutable) List methods as before, but some of the random access methods like get() and size() will be slower. To use the old implementation one can use View.asList().withRandomAccess(). +* SchemaTransforms implemented with TypedSchemaTransformProvider now produce a + configuration Schema with snake_case naming convention + ([#31374](https://github.com/apache/beam/pull/31374)). This will make the following + cases problematic: + * Running a pre-2.57.0 remote SDK pipeline containing a 2.57.0+ Java SchemaTransform, +and vice versa: + * Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform + * All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381) +should be updated to use new snake_case parameter names. ## Deprecations diff --git a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go index 5b6d7d91631..81df24223ca 100644 --- a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go +++ b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go @@ -62,9 +62,9 @@ import ( ) type bigtableConfig struct { - InstanceId string `beam:"instanceId"` - ProjectId string `beam:"projectId"` - TableIdstring `beam:"tableId"` + InstanceId string `beam:"instance_id"` + ProjectId string `beam:"project_id"` + TableIdstring `beam:"table_id"` } // Cell represents a single cell in a Bigtable row. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index d5c6c724c6f..d9b49dd3ca2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
(beam) branch master updated: [ManagedIO] pass underlying transform URN as an annotation (#31398)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new b50ad0fe8fc [ManagedIO] pass underlying transform URN as an annotation (#31398) b50ad0fe8fc is described below commit b50ad0fe8fc168eaded62efb08f19cf2aea341e2 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu May 30 17:51:07 2024 -0400 [ManagedIO] pass underlying transform URN as an annotation (#31398) * pass underlying transform URN to annotation * move annotation keys to proto * address comments: add descriptions for annotation enums; fail when missing transform_identifier; add unit tests for annotations --- .../model/pipeline/v1/external_transforms.proto| 13 +++ .../beam/sdk/util/construction/BeamUrns.java | 5 +++ .../util/construction/PTransformTranslation.java | 37 ++-- .../sdk/util/construction/TransformUpgrader.java | 6 ++-- .../ManagedSchemaTransformTranslationTest.java | 40 -- 5 files changed, 86 insertions(+), 15 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index aa9e70c7a87..429371e1105 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -111,6 +111,19 @@ message BuilderMethod { bytes payload = 3; } +message Annotations { + enum Enum { +// The annotation key for the encoded configuration Row used to build a transform +CONFIG_ROW_KEY = 0 [(org.apache.beam.model.pipeline.v1.beam_constant) = "config_row"]; +// The annotation key for the configuration Schema used to decode the configuration Row +CONFIG_ROW_SCHEMA_KEY = 1 [(org.apache.beam.model.pipeline.v1.beam_constant) = "config_row_schema"]; +// If ths transform is a SchemaTransform, this is the annotation key for the SchemaTransform's URN +SCHEMATRANSFORM_URN_KEY = 2 [(org.apache.beam.model.pipeline.v1.beam_constant) = "schematransform_urn"]; +// If the transform is a ManagedSchemaTransform, this is the annotation key for the underlying SchemaTransform's URN +MANAGED_UNDERLYING_TRANSFORM_URN_KEY = 3 [(org.apache.beam.model.pipeline.v1.beam_constant) = "managed_underlying_transform_urn"]; + } +} + // Payload for a Schema-aware PTransform. // This is a transform that is aware of its input and output PCollection schemas // and is configured using Beam Schema-compatible parameters. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java index 05bb2b0e0a0..f0493de3696 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java @@ -26,4 +26,9 @@ public class BeamUrns { public static String getUrn(ProtocolMessageEnum value) { return value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn); } + + /** Returns the constant value of a given enum annotated with [(beam_constant)]. */ + public static String getConstant(ProtocolMessageEnum value) { +return value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index 5dc84897d38..e2b6d95057f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.util.construction; +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.Annotations; import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -43,6 +44,7 @@ import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaTranslation; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam
(beam) branch master updated (93cc6a521ee -> 791ead6a05b)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 93cc6a521ee add PR trigger files (#31424) add 791ead6a05b add pull_request_target event (#31426) No new revisions were added by this update. Summary of changes: .github/workflows/IO_Iceberg_Integration_Tests.yml | 1 + .github/workflows/IO_Iceberg_Performance_Tests.yml | 1 + 2 files changed, 2 insertions(+)
(beam) branch master updated: add PR trigger files (#31424)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 93cc6a521ee add PR trigger files (#31424) 93cc6a521ee is described below commit 93cc6a521eeabadfb0cb06abf1dcdc41588d08cf Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Tue May 28 16:23:00 2024 -0400 add PR trigger files (#31424) --- .github/workflows/IO_Iceberg_Integration_Tests.yml | 2 ++ .github/workflows/IO_Iceberg_Performance_Tests.yml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/.github/workflows/IO_Iceberg_Integration_Tests.yml b/.github/workflows/IO_Iceberg_Integration_Tests.yml index ab055c3a609..006cd9d13be 100644 --- a/.github/workflows/IO_Iceberg_Integration_Tests.yml +++ b/.github/workflows/IO_Iceberg_Integration_Tests.yml @@ -18,6 +18,8 @@ name: IcebergIO Integration Tests on: schedule: - cron: '15 4/6 * * *' + pull_request_target: +paths: [ 'release/trigger_all_tests.json', '.github/trigger_files/IO_Iceberg_Integration_Tests.json' ] workflow_dispatch: # Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event diff --git a/.github/workflows/IO_Iceberg_Performance_Tests.yml b/.github/workflows/IO_Iceberg_Performance_Tests.yml index 27d93b8a40d..e9920a5a138 100644 --- a/.github/workflows/IO_Iceberg_Performance_Tests.yml +++ b/.github/workflows/IO_Iceberg_Performance_Tests.yml @@ -18,6 +18,8 @@ name: IcebergIO Performance Tests on: schedule: - cron: '10 10/12 * * *' + pull_request_target: +paths: [ '.github/trigger_files/IO_Iceberg_Performance_Tests.json' ] workflow_dispatch: #Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
(beam) branch master updated (e764fc9c17d -> 37b8c8a87b8)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from e764fc9c17d remove unused field (#31293) add 37b8c8a87b8 Add Iceberg workflows (#31401) No new revisions were added by this update. Summary of changes: ...etrics.yml => IO_Iceberg_Integration_Tests.yml} | 36 -- ...etrics.yml => IO_Iceberg_Performance_Tests.yml} | 34 ++-- .github/workflows/IO_Iceberg_Unit_Tests.yml| 2 +- sdks/java/io/iceberg/build.gradle | 24 ++- .../apache/beam/sdk/io/iceberg/IcebergIOIT.java| 1 + 5 files changed, 62 insertions(+), 35 deletions(-) copy .github/workflows/{beam_Prober_CommunityMetrics.yml => IO_Iceberg_Integration_Tests.yml} (74%) copy .github/workflows/{beam_Prober_CommunityMetrics.yml => IO_Iceberg_Performance_Tests.yml} (78%)
(beam) branch iceberg_lt_fork updated (46c14b0123f -> 666f8fd11cc)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch iceberg_lt_fork in repository https://gitbox.apache.org/repos/asf/beam.git from 46c14b0123f ignore add 666f8fd11cc trigger integration test No new revisions were added by this update. Summary of changes: ..._Python_Xlang_IO_Dataflow.json => IO_Iceberg_Integration_Tests.json} | 0 .github/workflows/IO_Iceberg_Integration_Tests.yml | 2 ++ 2 files changed, 2 insertions(+) copy .github/trigger_files/{beam_PostCommit_Python_Xlang_IO_Dataflow.json => IO_Iceberg_Integration_Tests.json} (100%)
(beam) branch iceberg_lt_fork created (now 46c14b0123f)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch iceberg_lt_fork in repository https://gitbox.apache.org/repos/asf/beam.git at 46c14b0123f ignore No new revisions were added by this update.
(beam) branch master updated (084f23b2a8d -> 743e34e0098)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 084f23b2a8d Install Beam from wheels in Dependency Compat Test Suite. (#31308) add 743e34e0098 Fix iceberg unit tests (#31314) No new revisions were added by this update. Summary of changes: ...Commit_Python_Xlang_Gcp_Direct.json => IO_Iceberg_Unit_Tests.json} | 0 .github/workflows/IO_Iceberg_Unit_Tests.yml | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) copy .github/trigger_files/{beam_PostCommit_Python_Xlang_Gcp_Direct.json => IO_Iceberg_Unit_Tests.json} (100%)
(beam) branch master updated (2196758c20b -> 6cb30cc5c86)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 2196758c20b Merge pull request #31270: Reapply "Add Redistribute translation to Samza runner" add 6cb30cc5c86 setup GCP auth before running tests (#31306) No new revisions were added by this update. Summary of changes: .github/workflows/IO_Iceberg_Unit_Tests.yml | 6 ++ 1 file changed, 6 insertions(+)
(beam) branch master updated: Support Kafka Managed IO (#31172)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 365c2d92965 Support Kafka Managed IO (#31172) 365c2d92965 is described below commit 365c2d92965c5e23c23d6e1f3c7a1cd048c872d8 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu May 9 17:14:17 2024 -0400 Support Kafka Managed IO (#31172) * managed kafka read * managed kafka write --- .../apache/beam/sdk/schemas/utils/YamlUtils.java | 8 +++ sdks/java/io/kafka/build.gradle| 1 + .../io/kafka/KafkaReadSchemaTransformProvider.java | 7 ++- .../KafkaReadSchemaTransformProviderTest.java | 53 ++- .../KafkaWriteSchemaTransformProviderTest.java | 59 ++ sdks/java/managed/build.gradle | 1 + .../java/org/apache/beam/sdk/managed/Managed.java | 14 +++-- .../managed/ManagedSchemaTransformProvider.java| 30 ++- .../sdk/managed/ManagedTransformConstants.java | 52 ++- 9 files changed, 213 insertions(+), 12 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index 122f2d1963b..e631e166e8b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.schemas.utils; import static org.apache.beam.sdk.values.Row.toRow; import java.math.BigDecimal; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -181,4 +182,11 @@ public class YamlUtils { } return new Yaml().dumpAsMap(map); } + + public static Map yamlStringToMap(@Nullable String yaml) { +if (yaml == null || yaml.isEmpty()) { + return Collections.emptyMap(); +} +return new Yaml().load(yaml); + } } diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 269ddb3f5eb..3e095a2bacc 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -90,6 +90,7 @@ dependencies { provided library.java.everit_json_schema testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(":sdks:java:io:synthetic") + testImplementation project(":sdks:java:managed") testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 2776c388f7c..13240ea9dc4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -151,11 +151,10 @@ public class KafkaReadSchemaTransformProvider } }; } - -if (format.equals("RAW")) { +if ("RAW".equals(format)) { beamSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build(); valueMapper = getRawBytesToRowFunction(beamSchema); -} else if (format.equals("PROTO")) { +} else if ("PROTO".equals(format)) { String fileDescriptorPath = configuration.getFileDescriptorPath(); String messageName = configuration.getMessageName(); if (fileDescriptorPath != null) { @@ -165,7 +164,7 @@ public class KafkaReadSchemaTransformProvider beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, messageName); valueMapper = ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName); } -} else if (format.equals("JSON")) { +} else if ("JSON".equals(format)) { beamSchema = JsonUtils.beamSchemaFromJsonSchema(inputSchema); valueMapper = JsonUtils.getJsonBytesToRowFunction(beamSchema); } else { diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index f6e231c758a..d5962a737ba 100644 ---
(beam) branch master updated (13708eaedeb -> e0bc8e770a7)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 13708eaedeb Merge pull request #31229: Turn off abandoned node enforcement in some Reshuffle unit tests where it is not needed add e0bc8e770a7 Add IcebergIO integration tests (#31220) No new revisions were added by this update. Summary of changes: ...n => beam_PostCommit_Java_Hadoop_Versions.json} | 2 +- .../{IO_Iceberg.yml => IO_Iceberg_Unit_Tests.yml} | 16 +- build.gradle.kts | 1 + sdks/java/io/iceberg/build.gradle | 26 +- .../sdk/io/iceberg/SchemaAndRowConversions.java| 29 ++- .../apache/beam/sdk/io/iceberg/IcebergIOIT.java| 289 + .../io/iceberg/SchemaAndRowConversionsTest.java| 42 +++ 7 files changed, 386 insertions(+), 19 deletions(-) copy .github/trigger_files/{beam_PostCommit_Java_ValidatesRunner_Dataflow.json => beam_PostCommit_Java_Hadoop_Versions.json} (97%) rename .github/workflows/{IO_Iceberg.yml => IO_Iceberg_Unit_Tests.yml} (89%) create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
(beam) branch master updated: Revert global snake_case convention for SchemaTransforms (#31109)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new b33a8438ad3 Revert global snake_case convention for SchemaTransforms (#31109) b33a8438ad3 is described below commit b33a8438ad335b75feec0c5c97e9a728795fc6ff Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Apr 25 22:01:15 2024 -0400 Revert global snake_case convention for SchemaTransforms (#31109) * revert global snake_case convention and make it a special case for iceberg and managed * remove docs and comments too * cleanup * revert python and yaml changes too * fix test --- .../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- .../transforms/TypedSchemaTransformProvider.java | 36 +- .../TypedSchemaTransformProviderTest.java | 8 ++--- .../IcebergReadSchemaTransformProvider.java| 12 .../IcebergWriteSchemaTransformProvider.java | 11 +++ .../KafkaReadSchemaTransformProviderTest.java | 16 +- .../managed/ManagedSchemaTransformProvider.java| 12 +++- .../ManagedSchemaTransformProviderTest.java| 12 .../ManagedSchemaTransformTranslationTest.java | 6 ++-- .../org/apache/beam/sdk/managed/ManagedTest.java | 2 +- .../managed/src/test/resources/test_config.yaml| 4 +-- sdks/python/apache_beam/io/gcp/bigquery.py | 14 - sdks/python/apache_beam/io/gcp/bigtableio.py | 12 .../transforms/external_transform_provider.py | 35 +++-- .../external_transform_provider_it_test.py | 22 + sdks/python/apache_beam/yaml/yaml_provider.py | 2 +- sdks/python/gen_xlang_wrappers.py | 21 - 17 files changed, 155 insertions(+), 72 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index e3d6056a5de..b268323 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index cfd298ae87e..d5c6c724c6f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -17,10 +17,8 @@ */ package org.apache.beam.sdk.schemas.transforms; -import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.lang.reflect.ParameterizedType; import java.util.List; @@ -28,10 +26,8 @@ import java.util.Optional; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaProvider; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.values.Row; @@ -45,9 +41,6 @@ import org.apache.beam.sdk.values.Row; * {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link * SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema. * - * NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the - * {@code snake_case} naming convention. - * * Internal only: This interface is actively being worked on and it will likely change as * we provide implementations for more standard Beam transforms. We provide no backwards * compatibility guarantees and it should not be implemented outside of the Beam repository. @@ -85,11 +78,10 @@ public abstract class TypedSchemaTransformProvider implements SchemaTra } @Override - public final Schema configurationSchema() { + public Schema configurationSchema() { try { // Sort the fields by name to ensure a consistent
(beam) branch master updated (485c5198384 -> 45e78572e8f)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 485c5198384 Merge pull request #31086: Simplify intermediate data in Iceberg sink; use manifest files add 45e78572e8f python sdk: fix several bugs regarding avto <-> beam schema conversion (#30770) No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/io/avroio.py | 132 ++ sdks/python/apache_beam/io/avroio_test.py | 84 ++- 2 files changed, 198 insertions(+), 18 deletions(-)
(beam) branch master updated (a0dad088980 -> 37609ba70fa)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from a0dad088980 Bump golang.org/x/net from 0.22.0 to 0.24.0 in /sdks (#31065) add 37609ba70fa Managed Transform protos & translation; Iceberg SchemaTransforms & translation (#30910) No new revisions were added by this update. Summary of changes: .../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 3 +- .../beam_PostCommit_Python_Xlang_IO_Dataflow.json | 3 +- .../beam_PreCommit_Xlang_Generated_Transforms.yml | 2 +- .../model/pipeline/v1/external_transforms.proto| 4 + .../java/org/apache/beam/sdk/schemas/Schema.java | 54 - .../apache/beam/sdk/schemas/SchemaRegistry.java| 23 ++ .../sdk/schemas/annotations/DefaultSchema.java | 18 ++ .../transforms/SchemaTransformTranslation.java | 79 +++ .../transforms/TypedSchemaTransformProvider.java | 35 ++- .../apache/beam/sdk/schemas/utils/YamlUtils.java | 21 +- .../util/construction/PTransformTranslation.java | 11 + .../main/java/org/apache/beam/sdk/values/Row.java | 49 .../beam/sdk/schemas/SchemaRegistryTest.java | 49 .../org/apache/beam/sdk/schemas/SchemaTest.java| 100 + .../TypedSchemaTransformProviderTest.java | 24 +- .../org/apache/beam/sdk/util/YamlUtilsTest.java| 33 +++ .../java/org/apache/beam/sdk/values/RowTest.java | 90 .../sdk/expansion/service/ExpansionService.java| 41 +++- sdks/java/io/expansion-service/build.gradle| 2 + .../FileWriteSchemaTransformProviderTest.java | 32 +-- ...ueryExportReadSchemaTransformConfiguration.java | 14 -- ...FileLoadsWriteSchemaTransformConfiguration.java | 14 -- ...QueryExportReadSchemaTransformProviderTest.java | 25 +-- ...yFileLoadsWriteSchemaTransformProviderTest.java | 12 +- .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 2 +- sdks/java/io/iceberg/build.gradle | 1 + .../java/org/apache/beam/io/iceberg/IcebergIO.java | 93 .../{ => sdk}/io/iceberg/AppendFilesToTables.java | 17 +- .../{ => sdk}/io/iceberg/AssignDestinations.java | 2 +- .../{ => sdk}/io/iceberg/DynamicDestinations.java | 2 +- .../beam/{ => sdk}/io/iceberg/FileWriteResult.java | 12 +- .../{ => sdk}/io/iceberg/IcebergCatalogConfig.java | 2 +- .../{ => sdk}/io/iceberg/IcebergDestination.java | 2 +- .../org/apache/beam/sdk/io/iceberg/IcebergIO.java | 136 +++ .../IcebergReadSchemaTransformProvider.java| 134 +++ .../{ => sdk}/io/iceberg/IcebergScanConfig.java| 2 +- .../IcebergSchemaTransformCatalogConfig.java | 107 + .../iceberg/IcebergSchemaTransformTranslation.java | 88 .../io/iceberg/IcebergTableCreateConfig.java | 2 +- .../{ => sdk}/io/iceberg/IcebergWriteResult.java | 13 +- .../IcebergWriteSchemaTransformProvider.java | 179 +++ .../io/iceberg/OneTableDynamicDestinations.java| 31 ++- .../beam/{ => sdk}/io/iceberg/PropertyBuilder.java | 2 +- .../beam/{ => sdk}/io/iceberg/RecordWriter.java| 4 +- .../beam/{ => sdk}/io/iceberg/ScanSource.java | 2 +- .../beam/{ => sdk}/io/iceberg/ScanTaskReader.java | 2 +- .../beam/{ => sdk}/io/iceberg/ScanTaskSource.java | 2 +- .../io/iceberg/SchemaAndRowConversions.java| 2 +- .../apache/beam/sdk/io/iceberg/SnapshotInfo.java | 118 ++ .../io/iceberg/WriteGroupedRowsToFiles.java| 2 +- .../{ => sdk}/io/iceberg/WriteToDestinations.java | 5 +- .../io/iceberg/WriteUngroupedRowsToFiles.java | 2 +- .../beam/{ => sdk}/io/iceberg/package-info.java| 2 +- .../{ => sdk}/io/iceberg/FileWriteResultTest.java | 17 +- .../{ => sdk}/io/iceberg/IcebergIOReadTest.java| 4 +- .../{ => sdk}/io/iceberg/IcebergIOWriteTest.java | 16 +- .../IcebergReadSchemaTransformProviderTest.java| 183 +++ .../IcebergSchemaTransformTranslationTest.java | 248 + .../IcebergWriteSchemaTransformProviderTest.java | 175 +++ .../beam/{ => sdk}/io/iceberg/ScanSourceTest.java | 2 +- .../io/iceberg/SchemaAndRowConversionsTest.java| 2 +- .../{ => sdk}/io/iceberg/TestDataWarehouse.java| 2 +- .../beam/{ => sdk}/io/iceberg/TestFixtures.java| 2 +- .../KafkaReadSchemaTransformProviderTest.java | 16 +- sdks/java/managed/build.gradle | 1 - .../java/org/apache/beam/sdk/managed/Managed.java | 39 ++-- .../managed/ManagedSchemaTransformProvider.java| 119 ++ .../managed/ManagedSchemaTransformTranslation.java | 59 + .../sdk/managed/ManagedTransformConstants.java}| 9 +- .../testing}/TestSchemaTransformProvider.java | 8 +- .../beam/sdk/managed/testing}/package-info.jav
(beam) branch release-2.56.0 updated: Cherry picking (#30460) BQ clustering valueprovider (#31039)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch release-2.56.0 in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/release-2.56.0 by this push: new 3c7a01360df Cherry picking (#30460) BQ clustering valueprovider (#31039) 3c7a01360df is described below commit 3c7a01360df3635ae56b7073dfa36e75e7492f61 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Apr 18 11:47:41 2024 -0400 Cherry picking (#30460) BQ clustering valueprovider (#31039) * Support clustering with value provider * remove * add some documentation * fix * address comments * update test * spotless * use serializable json clustering; fix translation * fall back on super's clustering and time partitioning when needed * fork based on version 2.56.0 * fix test --- .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 22 +++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 31 ++ .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 18 - .../gcp/bigquery/DynamicDestinationsHelpers.java | 8 ++ .../sdk/io/gcp/bigquery/BigQueryClusteringIT.java | 5 +++- .../sdk/io/gcp/bigquery/BigQueryHelpersTest.java | 11 .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 10 +-- .../BigQueryTimePartitioningClusteringIT.java | 1 + 8 files changed, 85 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index c4ad09ce6ea..8c600cf780a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobReference; @@ -31,6 +33,8 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; @@ -40,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.regex.Matcher; +import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions; @@ -704,6 +709,23 @@ public class BigQueryHelpers { } } + static Clustering clusteringFromJsonFields(String jsonStringClustering) { +JsonElement jsonClustering = JsonParser.parseString(jsonStringClustering); + +checkArgument( +jsonClustering.isJsonArray(), +"Received an invalid Clustering json string: %s." ++ "Please provide a serialized json array like so: [\"column1\", \"column2\"]", +jsonStringClustering); + +List fields = +jsonClustering.getAsJsonArray().asList().stream() +.map(JsonElement::getAsString) +.collect(Collectors.toList()); + +return new Clustering().setFields(fields); + } + static String resolveTempLocation( String tempLocationDir, String bigQueryOperationName, String stepUuid) { return FileSystems.matchNewResource(tempLocationDir, true) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index a646e1e6247..fce8f1c5d40 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2350,7 +2350,7 @@ public class BigQueryIO { abstract @Nullable ValueProvider getJsonTimePartitioning(); -
(beam) branch master updated (2eb1a756258 -> 04ff4bdd7fc)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 2eb1a756258 [Python] Clean doc related to write data in bigquery.py (#30887) add 04ff4bdd7fc Support BQ clustering with value provider (#30460) No new revisions were added by this update. Summary of changes: .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 22 +++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 31 ++ .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 18 - .../gcp/bigquery/DynamicDestinationsHelpers.java | 8 ++ .../sdk/io/gcp/bigquery/BigQueryClusteringIT.java | 5 +++- .../sdk/io/gcp/bigquery/BigQueryHelpersTest.java | 11 .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 10 +-- .../BigQueryTimePartitioningClusteringIT.java | 1 + 8 files changed, 85 insertions(+), 21 deletions(-)
(beam) branch master updated: Add config validation to kafka read schema transform (#30625)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new e33dec69c7c Add config validation to kafka read schema transform (#30625) e33dec69c7c is described below commit e33dec69c7cfd01c0b827538e1dad8567e3ff95e Author: Jeff Kinard AuthorDate: Thu Apr 11 19:23:26 2024 -0400 Add config validation to kafka read schema transform (#30625) * Add config validation to kafka read schema transform Signed-off-by: Jeffrey Kinard --- .../KafkaReadSchemaTransformConfiguration.java | 36 +- .../io/kafka/KafkaReadSchemaTransformProvider.java | 2 ++ .../KafkaReadSchemaTransformProviderTest.java | 4 +-- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java index d95c49894a2..13f5249a6c3 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + import com.google.auto.value.AutoValue; import java.util.Map; import java.util.Set; @@ -46,11 +49,13 @@ public abstract class KafkaReadSchemaTransformConfiguration { public void validate() { final String startOffset = this.getAutoOffsetResetConfig(); -assert startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset) -: "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES; +checkArgument( +startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset), +"Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES); final String dataFormat = this.getFormat(); -assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat) -: "Valid data formats are " + VALID_DATA_FORMATS; +checkArgument( +dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat), +"Valid data formats are " + VALID_DATA_FORMATS); final String inputSchema = this.getSchema(); final String messageName = this.getMessageName(); @@ -59,20 +64,23 @@ public abstract class KafkaReadSchemaTransformConfiguration { final String confluentSchemaRegSubject = this.getConfluentSchemaRegistrySubject(); if (confluentSchemaRegUrl != null) { - assert confluentSchemaRegSubject != null - : "To read from Kafka, a schema must be provided directly or though Confluent " - + "Schema Registry. Make sure you are providing one of these parameters."; + checkNotNull( + confluentSchemaRegSubject, + "To read from Kafka, a schema must be provided directly or though Confluent " + + "Schema Registry. Make sure you are providing one of these parameters."); } else if (dataFormat != null && dataFormat.equals("RAW")) { - assert inputSchema == null : "To read from Kafka in RAW format, you can't provide a schema."; + checkArgument( + inputSchema == null, "To read from Kafka in RAW format, you can't provide a schema."); } else if (dataFormat != null && dataFormat.equals("JSON")) { - assert inputSchema != null : "To read from Kafka in JSON format, you must provide a schema."; + checkNotNull(inputSchema, "To read from Kafka in JSON format, you must provide a schema."); } else if (dataFormat != null && dataFormat.equals("PROTO")) { - assert messageName != null - : "To read from Kafka in PROTO format, messageName must be provided."; - assert fileDescriptorPath != null || inputSchema != null - : "To read from Kafka in PROTO format, fileDescriptorPath or schema must be provided."; + checkNotNull( + messageName, "To read from Kafka in PROTO format, messageName must be provided."); + checkArgument( + fileDescriptorPath != null || inputSchema != null, + "To read from Kafka in PROTO format, fileDescriptorPath or schema must be provided."); } else { - assert inputSchema != null : "To read from Kafka in AVRO format, you must provide a schema."; + checkNotNull(inputSchema, "T
(beam) branch master updated: [Java] ManagedIO (#30808)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 54673996c9b [Java] ManagedIO (#30808) 54673996c9b is described below commit 54673996c9bf2ee076b04833bbae2729d6cebbaf Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Apr 8 06:55:17 2024 -0400 [Java] ManagedIO (#30808) * managed api for java * yaml utils --- build.gradle.kts | 1 + sdks/java/core/build.gradle| 1 + .../apache/beam/sdk/schemas/utils/YamlUtils.java | 171 .../org/apache/beam/sdk/util/YamlUtilsTest.java| 228 + sdks/java/managed/build.gradle | 37 .../java/org/apache/beam/sdk/managed/Managed.java | 195 ++ .../managed/ManagedSchemaTransformProvider.java| 183 + .../org/apache/beam/sdk/managed/package-info.java | 20 ++ .../ManagedSchemaTransformProviderTest.java| 103 ++ .../org/apache/beam/sdk/managed/ManagedTest.java | 114 +++ .../sdk/managed/TestSchemaTransformProvider.java | 98 + .../managed/src/test/resources/test_config.yaml| 21 ++ settings.gradle.kts| 2 + 13 files changed, 1174 insertions(+) diff --git a/build.gradle.kts b/build.gradle.kts index ded692677b5..9c42ffdc8ce 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -303,6 +303,7 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:io:synthetic:build") dependsOn(":sdks:java:io:xml:build") dependsOn(":sdks:java:javadoc:allJavadoc") + dependsOn(":sdks:java:managed:build") dependsOn(":sdks:java:testing:expansion-service:build") dependsOn(":sdks:java:testing:jpms-tests:build") dependsOn(":sdks:java:testing:load-tests:build") diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 438a3fb1806..5a47cb5237e 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -98,6 +98,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema + implementation "org.yaml:snakeyaml:2.0" shadowTest library.java.everit_json_schema provided library.java.junit testImplementation "com.github.stefanbirkner:system-rules:1.19.0" diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java new file mode 100644 index 000..5c05b2bed39 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import static org.apache.beam.sdk.values.Row.toRow; + +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.yaml.snakeyaml.Yaml; + +public class YamlUtils { + private static final Map> YAML_VALUE_PARSERS = + ImmutableMap + .> + builder() + .put(Schema.TypeName.BYTE, Byte::valueOf) + .put(Schema.TypeName.
(beam) branch master updated: Rename tests to remove them from dataflow test suite (#30623)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new e648fd08726 Rename tests to remove them from dataflow test suite (#30623) e648fd08726 is described below commit e648fd08726d1d0ccde62603776fa0f886907f0a Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Mar 13 11:42:17 2024 -0400 Rename tests to remove them from dataflow test suite (#30623) --- ...oviderIT.java => BigQueryDirectReadSchemaTransformProviderTest.java} | 2 +- ...rIT.java => BigQueryStorageWriteApiSchemaTransformProviderTest.java} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java similarity index 99% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderIT.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java index 958409eb5e3..2363a870bbd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java @@ -74,7 +74,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class BigQueryDirectReadSchemaTransformProviderIT { +public class BigQueryDirectReadSchemaTransformProviderTest { private static PipelineOptions testOptions = TestPipeline.testingPipelineOptions(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java similarity index 99% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderIT.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 8b8a3b75949..64ea0b11d1b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -61,7 +61,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class BigQueryStorageWriteApiSchemaTransformProviderIT { +public class BigQueryStorageWriteApiSchemaTransformProviderTest { private FakeDatasetService fakeDatasetService = new FakeDatasetService(); private FakeJobService fakeJobService = new FakeJobService();
(beam) branch master updated: Make defaults for optional SchemaTransformProvider methods (#30560)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new d22a7e783ab Make defaults for optional SchemaTransformProvider methods (#30560) d22a7e783ab is described below commit d22a7e783abf445ae7bef2ce26075f9d93b409a7 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Mar 11 17:26:54 2024 -0400 Make defaults for optional SchemaTransformProvider methods (#30560) * simplify schematransformprovider --- .../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 1 + .../transforms/SchemaTransformProvider.java| 9 ++-- .../transforms/TypedSchemaTransformProvider.java | 20 +- .../TypedSchemaTransformProviderTest.java | 24 -- ...ueryStorageWriteApiSchemaTransformProvider.java | 5 - .../BigtableReadSchemaTransformProvider.java | 10 - .../BigtableWriteSchemaTransformProvider.java | 10 - ...gQueryDirectReadSchemaTransformProviderIT.java} | 2 +- ...yStorageWriteApiSchemaTransformProviderIT.java} | 2 +- 9 files changed, 51 insertions(+), 32 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index e69de29bb2d..8b137891791 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -0,0 +1 @@ + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java index c76d7a25e69..9d0ad61b7a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.schemas.transforms; +import java.util.Collections; import java.util.List; import java.util.Optional; import org.apache.beam.sdk.annotations.Internal; @@ -58,10 +59,14 @@ public interface SchemaTransformProvider { SchemaTransform from(Row configuration); /** Returns the input collection names of this transform. */ - List inputCollectionNames(); + default List inputCollectionNames() { +return Collections.emptyList(); + } /** Returns the output collection names of this transform. */ - List outputCollectionNames(); + default List outputCollectionNames() { +return Collections.emptyList(); + } /** * List the dependencies needed for this transform. Jars from classpath are used by default when diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index 1117f59a748..e75fa27d2d1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -17,8 +17,13 @@ */ package org.apache.beam.sdk.schemas.transforms; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import java.lang.reflect.ParameterizedType; import java.util.List; import java.util.Optional; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; @@ -39,7 +44,20 @@ import org.apache.beam.sdk.values.Row; @Internal public abstract class TypedSchemaTransformProvider implements SchemaTransformProvider { - protected abstract Class configurationClass(); + @SuppressWarnings("unchecked") + protected Class configurationClass() { +@Nullable +ParameterizedType parameterizedType = (ParameterizedType) getClass().getGenericSuperclass(); +checkStateNotNull( +parameterizedType, "Could not get the TypedSchemaTransformProvider's parameterized type."); +checkArgument( +parameterizedType.getActualTypeArguments().length == 1, +String.format( +"Expected one parameterized type, but got %s.", +parameterizedType.getActualTypeArguments().length)); + +return (Class) parameterizedType.getActualTypeArguments()[0]; + } /** * Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a diff --git a/sdks/java/core/src/test/java/or
(beam) branch master updated: fix: support reading arrays of structs from bigquery with schemas (#30448)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new b6301b52058 fix: support reading arrays of structs from bigquery with schemas (#30448) b6301b52058 is described below commit b6301b5205800ce3604a751964211d6061b09f02 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Mar 6 09:37:37 2024 -0500 fix: support reading arrays of structs from bigquery with schemas (#30448) --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 11 --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java| 11 ++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index fa5ffae0909..e3ace73ee96 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -69,6 +69,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; @@ -710,14 +711,18 @@ public class BigQueryUtils { + fieldType + "' because the BigQuery type is a List, while the output type is not a collection."); } - boolean innerTypeIsMap = - fieldType.getCollectionElementType().getTypeName().equals(TypeName.MAP); + + boolean innerTypeIsMap = fieldType.getCollectionElementType().getTypeName().isMapType(); return ((List) jsonBQValue) .stream() + // Old BigQuery client returns arrays as lists of maps {"v": }. + // If this is the case, unwrap the value first .map( v -> - (!innerTypeIsMap && v instanceof Map) + (!innerTypeIsMap + && v instanceof Map + && ((Map) v).keySet().equals(Sets.newHashSet("v"))) ? ((Map) v).get("v") : v) .map(v -> toBeamValue(field.withType(fieldType.getCollectionElementType()), v)) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index d73ff5e2b71..d8db71b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -441,10 +441,13 @@ public class BigQueryUtilsTest { private static final Row ARRAY_ROW_ROW = Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) Arrays.asList(FLAT_ROW)).build(); - private static final TableRow BQ_ARRAY_ROW_ROW = + private static final TableRow BQ_ARRAY_ROW_ROW_V = new TableRow() .set("rows", Collections.singletonList(Collections.singletonMap("v", BQ_FLAT_ROW))); + private static final TableRow BQ_ARRAY_ROW_ROW = + new TableRow().set("rows", Collections.singletonList(BQ_FLAT_ROW)); + private static final TableSchema BQ_FLAT_TYPE = new TableSchema() .setFields( @@ -943,6 +946,12 @@ public class BigQueryUtilsTest { assertEquals(ROW_ROW, beamRow); } + @Test + public void testToBeamRow_array_row_v() { +Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW_V); +assertEquals(ARRAY_ROW_ROW, beamRow); + } + @Test public void testToBeamRow_array_row() { Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW);
(beam) branch master updated (b776d705997 -> 6a03f9ba062)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from b776d705997 Duet AI ML data processing prompts (no links) (#30421) add 6a03f9ba062 Update triggering frequency doc (#30457) No new revisions were added by this update. Summary of changes: .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 10 +- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 2 +- .../content/en/documentation/io/built-in/google-bigquery.md| 1 + 3 files changed, 7 insertions(+), 6 deletions(-)
(beam) branch master updated: Generate external transform wrappers using a script (#29834)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 11f9bce485c Generate external transform wrappers using a script (#29834) 11f9bce485c is described below commit 11f9bce485c4f6fe466ff4bf5073d2414e43678c Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Feb 22 16:13:55 2024 -0500 Generate external transform wrappers using a script (#29834) --- ...=> beam_PostCommit_Python_Examples_Direct.json} | 0 .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 1 + ...> beam_PostCommit_Python_Xlang_Gcp_Direct.json} | 0 ... beam_PostCommit_Python_Xlang_IO_Dataflow.json} | 0 .github/workflows/README.md| 1 + .../beam_PreCommit_Xlang_Generated_Transforms.yml | 114 ++ .gitignore | 1 + CHANGES.md | 1 + build.gradle.kts | 5 + .../org/apache/beam/gradle/BeamModulePlugin.groovy | 100 ++--- scripts/ci/release/test/resources/mass_comment.txt | 2 + .../GenerateSequenceSchemaTransformProvider.java | 2 +- sdks/python/MANIFEST.in| 1 + sdks/python/apache_beam/io/__init__.py | 1 + .../io/external/xlang_bigqueryio_it_test.py| 23 +- .../apache_beam/io/gcp/bigtableio_it_test.py | 22 +- .../transforms/external_transform_provider.py | 8 +- .../external_transform_provider_it_test.py | 413 +++ .../transforms/external_transform_provider_test.py | 140 --- .../transforms/xlang/__init__.py} | 15 +- sdks/python/build.gradle | 19 + sdks/python/gen_xlang_wrappers.py | 447 + sdks/python/pyproject.toml | 6 + sdks/python/pytest.ini | 1 + sdks/python/python_xlang_wrapper.template | 36 ++ sdks/python/setup.py | 49 ++- sdks/python/test-suites/dataflow/common.gradle | 5 +- sdks/python/test-suites/direct/build.gradle| 7 + sdks/python/test-suites/direct/common.gradle | 8 +- sdks/python/test-suites/xlang/build.gradle | 56 +-- sdks/python/tox.ini| 4 +- sdks/standard_expansion_services.yaml | 77 sdks/standard_external_transforms.yaml | 52 +++ 33 files changed, 1330 insertions(+), 287 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json similarity index 100% copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json copy to .github/trigger_files/beam_PostCommit_Python_Examples_Direct.json diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index e69de29bb2d..8b137891791 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -0,0 +1 @@ + diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json similarity index 100% copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json copy to .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json similarity index 100% copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json copy to .github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json diff --git a/.github/workflows/README.md b/.github/workflows/README.md index f882553ef9b..42efdac 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -271,6 +271,7 @@ PreCommit Jobs run in a schedule and also get triggered in a PR if relevant sour | [ PreCommit Website ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml) | N/A |`Run Website PreCommit`| [![.github/workflows/beam_PreCommit_Website.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml?query=event%3Aschedule) | | [ PreCommit Website Stage GCS ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml) | N/A |`Run Website_Stage_GCS PreCommit`| [![.github/workflows/beam_PreCommit_Website_Stage_GCS.yml](https://github.com/apache/beam/actio
(beam) branch master updated: Fix Python postcommits (#30333)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 1bbe07e1653 Fix Python postcommits (#30333) 1bbe07e1653 is described below commit 1bbe07e1653bc35366dea0c8ebe2399490d3ad50 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Feb 15 20:31:04 2024 -0500 Fix Python postcommits (#30333) --- .github/trigger_files/beam_PostCommit_Python_Examples_Direct.json | 0 sdks/java/extensions/python/build.gradle | 2 ++ 2 files changed, 2 insertions(+) diff --git a/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json new file mode 100644 index 000..e69de29bb2d diff --git a/sdks/java/extensions/python/build.gradle b/sdks/java/extensions/python/build.gradle index 7c04259b05e..c27251fa445 100644 --- a/sdks/java/extensions/python/build.gradle +++ b/sdks/java/extensions/python/build.gradle @@ -21,6 +21,8 @@ applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.extensions.python') description = "Apache Beam :: SDKs :: Java :: Extensions :: Python" +evaluationDependsOn(":sdks:java:core") + dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation library.java.vendored_grpc_1_60_1
(beam) branch master updated (cce70cdd95c -> 9452a29f489)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from cce70cdd95c Remove python 3.7 from programming guide (#30302) add 9452a29f489 fix: temporarily double idle timeout to workaround a bug in watchdog (#30299) No new revisions were added by this update. Summary of changes: .../apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-)
(beam) branch master updated: Make ReadFromBigQueryRequest id more randomized (#30156)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 41dee464db4 Make ReadFromBigQueryRequest id more randomized (#30156) 41dee464db4 is described below commit 41dee464db458fa72eeab7ddc902b242ebc894eb Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Jan 31 13:07:02 2024 -0500 Make ReadFromBigQueryRequest id more randomized (#30156) * make ReadFromBigQueryRequest id more randomized --- sdks/python/apache_beam/io/gcp/bigquery.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index bba8b8a4af7..7648ab4064d 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -361,6 +361,7 @@ import itertools import json import logging import random +import secrets import time import uuid import warnings @@ -2925,8 +2926,9 @@ class ReadFromBigQueryRequest: self.table = table self.validate() -# We use this internal object ID to generate BigQuery export directories. -self.obj_id = random.randint(0, 10) +# We use this internal object ID to generate BigQuery export directories +# and to create BigQuery job names +self.obj_id = '%d_%s' % (int(time.time()), secrets.token_hex(3)) def validate(self): if self.table is not None and self.query is not None:
(beam) branch master updated: Fix failing python BQ test (#30099)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 3ae851876e2 Fix failing python BQ test (#30099) 3ae851876e2 is described below commit 3ae851876e22be8ed09c4434c73dd654874e7ae4 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Jan 25 14:03:37 2024 -0500 Fix failing python BQ test (#30099) --- .github/trigger_files/beam_PostCommit_Python.json | 0 sdks/python/apache_beam/io/gcp/bigquery.py| 5 + sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 1 + .../python/apache_beam/io/gcp/bigquery_file_loads_test.py | 15 +++ sdks/python/test-suites/dataflow/common.gradle| 2 +- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json new file mode 100644 index 000..e69de29bb2d diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 4643c8ddf0a..bba8b8a4af7 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1854,6 +1854,7 @@ class WriteToBigQuery(PTransform): kms_key=None, batch_size=None, max_file_size=None, + max_partition_size=None, max_files_per_bundle=None, test_client=None, custom_gcs_temp_location=None, @@ -1934,6 +1935,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, max_file_size (int): The maximum size for a file to be written and then loaded into BigQuery. The default value is 4TB, which is 80% of the limit of 5TB for BigQuery to load any file. + max_partition_size (int): Maximum byte size for each load job to +BigQuery. Defaults to 15TB. Applicable to FILE_LOADS only. max_files_per_bundle(int): The maximum number of files to be concurrently written by a worker. The default here is 20. Larger values will allow writing to multiple destinations without having to reshard - but they @@ -2059,6 +2062,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, # TODO(pabloem): Consider handling ValueProvider for this location. self.custom_gcs_temp_location = custom_gcs_temp_location self.max_file_size = max_file_size +self.max_partition_size = max_partition_size self.max_files_per_bundle = max_files_per_bundle self.method = method or WriteToBigQuery.Method.DEFAULT self.triggering_frequency = triggering_frequency @@ -2202,6 +2206,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, with_auto_sharding=self.with_auto_sharding, temp_file_format=self._temp_file_format, max_file_size=self.max_file_size, + max_partition_size=self.max_partition_size, max_files_per_bundle=self.max_files_per_bundle, custom_gcs_temp_location=self.custom_gcs_temp_location, test_client=self.test_client, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 48f2ab4b36b..e1a4af31f1c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -747,6 +747,7 @@ class TriggerLoadJobs(beam.DoFn): ) if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) + job_reference = self.bq_wrapper.perform_load_job( destination=table_reference, source_uris=files, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 345c8e70500..0605206714e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -925,9 +925,6 @@ class BigQueryFileLoadsIT(unittest.TestCase): create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_APPEND') -# reduce load job size to induce copy jobs -bqfl._DEFAULT_MAX_FILE_SIZE = 10 -bqfl._MAXIMUM_LOAD_SIZE = 20 verifiers = [ BigqueryFullResultMatcher( project=self.project, @@ -949,8 +946,7 @@ class BigQueryFileLoadsIT(unittest.TestCase): dest += "_2" return dest -args = self.test_pipeline.get_full_options_as_args( -on_success_matcher=all_of(verifiers)) +args = self.test_pipeline.get_full_options_as_args() with beam.Pipeline(argv=args) as p: # 0...4 going to table 1 @@ -961,7 +957,10 @@ class BigQueryFileLoadsIT(unittest.TestCase): p | beam.Create(items) | bigquery
(beam) branch release-2.54.0 updated: rename ExternalSchemaTransform to ExternalTransform (#30114)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch release-2.54.0 in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/release-2.54.0 by this push: new 22ef257784f rename ExternalSchemaTransform to ExternalTransform (#30114) 22ef257784f is described below commit 22ef257784f1259f1c38448f4eb502f9c9233f04 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Jan 25 12:07:00 2024 -0500 rename ExternalSchemaTransform to ExternalTransform (#30114) --- ..._provider.py => external_transform_provider.py} | 34 +++--- ...test.py => external_transform_provider_test.py} | 18 ++-- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/transforms/external_schematransform_provider.py b/sdks/python/apache_beam/transforms/external_transform_provider.py similarity index 90% rename from sdks/python/apache_beam/transforms/external_schematransform_provider.py rename to sdks/python/apache_beam/transforms/external_transform_provider.py index fd650087893..26cc31471e6 100644 --- a/sdks/python/apache_beam/transforms/external_schematransform_provider.py +++ b/sdks/python/apache_beam/transforms/external_transform_provider.py @@ -29,7 +29,7 @@ from apache_beam.transforms.external import SchemaTransformsConfig from apache_beam.typehints.schemas import named_tuple_to_schema from apache_beam.typehints.schemas import typing_from_runner_api -__all__ = ['ExternalSchemaTransform', 'ExternalSchemaTransformProvider'] +__all__ = ['ExternalTransform', 'ExternalTransformProvider'] def snake_case_to_upper_camel_case(string): @@ -84,7 +84,7 @@ def get_config_with_descriptions( return fields_with_descriptions -class ExternalSchemaTransform(PTransform): +class ExternalTransform(PTransform): """Template for a wrapper class of an external SchemaTransform This is a superclass for dynamically generated SchemaTransform wrappers and @@ -93,7 +93,7 @@ class ExternalSchemaTransform(PTransform): Experimental; no backwards compatibility guarantees.""" # These attributes need to be set when - # creating an ExternalSchemaTransform type + # creating an ExternalTransform type default_expansion_service = None description: str = "" identifier: str = "" @@ -138,21 +138,21 @@ def infer_name_from_identifier(identifier: str, pattern: str): return ''.join(components) -class ExternalSchemaTransformProvider: +class ExternalTransformProvider: """Dynamically discovers Schema-aware external transforms from a given list of expansion services and provides them as ready PTransforms. - A :class:`ExternalSchemaTransform` subclass is generated for each external + A :class:`ExternalTransform` subclass is generated for each external transform, and is named based on what can be inferred from the URN (see :param urn_pattern). - These classes are generated when :class:`ExternalSchemaTransformProvider` is + These classes are generated when :class:`ExternalTransformProvider` is initialized. We need to give it one or more expansion service addresses that are already up and running: - >>> provider = ExternalSchemaTransformProvider(["localhost:12345", + >>> provider = ExternalTransformProvider(["localhost:12345", ... "localhost:12121"]) We can also give it the gradle target of a standard Beam expansion service: - >>> provider = ExternalSchemaTransform(BeamJarExpansionService( + >>> provider = ExternalTransform(BeamJarExpansionService( ... "sdks:java:io:google-cloud-platform:expansion-service:shadowJar")) Let's take a look at the output of :func:`get_available()` to know the available transforms in the expansion service(s) we provided: @@ -162,7 +162,7 @@ class ExternalSchemaTransformProvider: ...] Then retrieve a transform by :func:`get()`, :func:`get_urn()`, or by directly - accessing it as an attribute of :class:`ExternalSchemaTransformProvider`. + accessing it as an attribute of :class:`ExternalTransformProvider`. All of the following commands do the same thing: >>> provider.get('BigqueryStorageRead') >>> provider.get_urn( @@ -194,7 +194,7 @@ class ExternalSchemaTransformProvider: Experimental; no backwards compatibility guarantees. """ def __init__(self, expansion_services, urn_pattern=STANDARD_URN_PATTERN): -f"""Initialize an ExternalSchemaTransformProvider +f"""Initialize an ExternalTransformProvider :param expansion_services: A list of expansion services to discover transforms from. @@ -207,7 +207,7 @@ class ExternalSchemaTransformProvider: By d
(beam) branch master updated (d62ae0144cc -> 61a62e19c4a)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from d62ae0144cc Add section for 2.55.0, cleanup 2.54.0 due to cut. (#30101) add 61a62e19c4a rename ExternalSchemaTransform to ExternalTransform (#30102) No new revisions were added by this update. Summary of changes: ..._provider.py => external_transform_provider.py} | 34 +++--- ...test.py => external_transform_provider_test.py} | 18 ++-- 2 files changed, 26 insertions(+), 26 deletions(-) rename sdks/python/apache_beam/transforms/{external_schematransform_provider.py => external_transform_provider.py} (90%) rename sdks/python/apache_beam/transforms/{external_schematransform_provider_test.py => external_transform_provider_test.py} (87%)
(beam) branch master updated (e85d070a32b -> 27214149d34)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from e85d070a32b Ignore DicomMetadataRead tests (#30096) add 27214149d34 Support dynamic destinations with Python Storage API (#30045) No new revisions were added by this update. Summary of changes: .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 0 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 1 + CHANGES.md | 1 + ...ueryStorageWriteApiSchemaTransformProvider.java | 59 - ...StorageWriteApiSchemaTransformProviderTest.java | 54 +++- .../io/external/xlang_bigqueryio_it_test.py| 77 +- sdks/python/apache_beam/io/gcp/bigquery.py | 293 - 7 files changed, 334 insertions(+), 151 deletions(-) copy learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/go-example/myfile.txt => .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json (100%) create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
(beam) branch master updated: [Python BQ] Substitute final destination schema when no input schema is specified (#30015)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new b4be68e2404 [Python BQ] Substitute final destination schema when no input schema is specified (#30015) b4be68e2404 is described below commit b4be68e2404e8f87c545f81800a7e83cd9c77df7 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Fri Jan 19 16:25:27 2024 -0500 [Python BQ] Substitute final destination schema when no input schema is specified (#30015) * substitute final destination's schema; tests * use cache and only fetch when necessary --- .../apache_beam/io/gcp/bigquery_file_loads.py | 24 + .../apache_beam/io/gcp/bigquery_file_loads_test.py | 62 ++ 2 files changed, 86 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 453cd27dfda..48f2ab4b36b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -656,6 +656,7 @@ class TriggerLoadJobs(beam.DoFn): if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) self.pending_jobs = [] +self.schema_cache = {} def process( self, @@ -703,6 +704,29 @@ class TriggerLoadJobs(beam.DoFn): create_disposition = self.create_disposition if self.temporary_tables: + # we need to create temp tables, so we need a schema. + # if there is no input schema, fetch the destination table's schema + if schema is None: +hashed_dest = bigquery_tools.get_hashable_destination(table_reference) +if hashed_dest in self.schema_cache: + schema = self.schema_cache[hashed_dest] +else: + try: +schema = bigquery_tools.table_schema_to_dict( +bigquery_tools.BigQueryWrapper().get_table( +project_id=table_reference.projectId, +dataset_id=table_reference.datasetId, +table_id=table_reference.tableId).schema) +self.schema_cache[hashed_dest] = schema + except Exception as e: +_LOGGER.warning( +"Input schema is absent and could not fetch the final " +"destination table's schema [%s]. Creating temp table [%s] " +"will likely fail: %s", +hashed_dest, +job_name, +e) + # If we are using temporary tables, then we must always create the # temporary tables, so we replace the create_disposition. create_disposition = 'CREATE_IF_NEEDED' diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index edd92f21e73..345c8e70500 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -903,6 +903,68 @@ class BigQueryFileLoadsIT(unittest.TestCase): _LOGGER.info( "Created dataset %s in project %s", self.dataset_id, self.project) + @pytest.mark.it_postcommit + def test_batch_copy_jobs_with_no_input_schema(self): +schema_1 = "col_1:INTEGER" +schema_2 = "col_2:INTEGER" + +# create two tables with different schemas +# test to make sure this works with dynamic destinations too +self.bigquery_client.get_or_create_table( +project_id=self.project, +dataset_id=self.dataset_id, +table_id="output_table_1", +schema=bigquery_tools.get_table_schema_from_string(schema_1), +create_disposition='CREATE_IF_NEEDED', +write_disposition='WRITE_APPEND') +self.bigquery_client.get_or_create_table( +project_id=self.project, +dataset_id=self.dataset_id, +table_id="output_table_2", +schema=bigquery_tools.get_table_schema_from_string(schema_2), +create_disposition='CREATE_IF_NEEDED', +write_disposition='WRITE_APPEND') + +# reduce load job size to induce copy jobs +bqfl._DEFAULT_MAX_FILE_SIZE = 10 +bqfl._MAXIMUM_LOAD_SIZE = 20 +verifiers = [ +BigqueryFullResultMatcher( +project=self.project, +query="SELECT * FROM %s" % (self.output_table + "_1"), +data=[(i, ) for i in range(5)]), +BigqueryFullResultMatcher( +project=self.project, +query="SELECT * FROM %s" % (self.output_table + "_2"), +data=[(i, ) for i in range(5, 10)]) +] + +output = self.output_table + +def callable_table(el: dict): + dest = output
(beam) branch master updated: [Python BQ] Retry get_table for quota errors (#28820)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c9e036e40e4 [Python BQ] Retry get_table for quota errors (#28820) c9e036e40e4 is described below commit c9e036e40e4f1d21f33ce6829fdf919c934da7de Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Jan 8 15:52:31 2024 -0500 [Python BQ] Retry get_table for quota errors (#28820) * retry get_table on quota errors * add tests * only retry on transient reasons --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 201 ++- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 2 +- sdks/python/apache_beam/utils/retry.py | 33 +++- 3 files changed, 229 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 035edffc03f..e53204a5ebc 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -43,6 +43,7 @@ import apache_beam as beam from apache_beam.internal import pickler from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp +from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp import bigquery as beam_bq from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery import ReadFromBigQuery @@ -82,11 +83,13 @@ from apache_beam.transforms.display_test import DisplayDataItemMatcher try: from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_client from apitools.base.py.exceptions import HttpError + from apitools.base.py.exceptions import HttpForbiddenError from google.cloud import bigquery as gcp_bigquery from google.api_core import exceptions except ImportError: gcp_bigquery = None HttpError = None + HttpForbiddenError = None exceptions = None # pylint: enable=wrong-import-order, wrong-import-position @@ -323,7 +326,9 @@ class TestJsonToDictCoder(unittest.TestCase): self.assertEqual(expected_row, actual) -@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +@unittest.skipIf( +HttpError is None or HttpForbiddenError is None, +'GCP dependencies are not installed') class TestReadFromBigQuery(unittest.TestCase): @classmethod def setUpClass(cls): @@ -454,6 +459,200 @@ class TestReadFromBigQuery(unittest.TestCase): mock_insert.assert_called() self.assertIn(error_message, exc.exception.args[0]) + @parameterized.expand([ + # first attempt returns a Http 500 blank error and retries + # second attempt returns a Http 408 blank error and retries, + # third attempt passes + param( + responses=[ + HttpForbiddenError( + response={'status': 500}, content="something", url="") + if HttpForbiddenError else None, + HttpForbiddenError( + response={'status': 408}, content="blank", url="") + if HttpForbiddenError else None + ], + expected_retries=2), + # first attempts returns a 403 rateLimitExceeded error + # second attempt returns a 429 blank error + # third attempt returns a Http 403 rateLimitExceeded error + # fourth attempt passes + param( + responses=[ + exceptions.Forbidden( + "some message", + errors=({ + "message": "transient", "reason": "rateLimitExceeded" + }, )) if exceptions else None, + exceptions.ResourceExhausted("some message") + if exceptions else None, + HttpForbiddenError( + response={'status': 403}, + content={ + "error": { + "errors": [{ + "message": "transient", + "reason": "rateLimitExceeded" + }] + } + }, + url="") if HttpForbiddenError else None, + ], + expected_retries=3), + ]) + def test_get_table_transient_exception(self, responses, expected_retries): +class DummyTable: + class DummySchema: +fields = [] + + numBytes = 5 + schema = DummySchema() + +with mock.patch('time.sleep'), \ +mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, + 'Get') as mock_get_table, \ +mock.patch.
(beam) branch master updated: update container tag (#29872)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 43e79080b44 update container tag (#29872) 43e79080b44 is described below commit 43e79080b445c8fc227edc20cbea06a296964d6f Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Dec 27 19:15:46 2023 +0300 update container tag (#29872) --- sdks/python/apache_beam/runners/dataflow/internal/names.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 1b7795916f0..8d3dc94a3ce 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -34,6 +34,6 @@ SERIALIZED_SOURCE_KEY = 'serialized_source' # Unreleased sdks use container image tag specified below. # Update this tag whenever there is a change that # requires changes to SDK harness container or SDK harness launcher. -BEAM_DEV_SDK_CONTAINER_TAG = 'beam-master-20231215' +BEAM_DEV_SDK_CONTAINER_TAG = 'beam-master-20231227' DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3'
(beam) branch master updated: Allow large timestamp skew for at-least-once streaming (#29858)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 989a2198a31 Allow large timestamp skew for at-least-once streaming (#29858) 989a2198a31 is described below commit 989a2198a3174a66cd733834383f3603de70d1fa Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Dec 27 12:16:53 2023 +0300 Allow large timestamp skew for at-least-once streaming (#29858) * large skew * test * use AppendSerializationError everywhere --- .../bigquery/StorageApiWriteUnshardedRecords.java | 11 ++- .../bigquery/StorageApiWritesShardedRecords.java | 6 +- .../sdk/io/gcp/testing/FakeDatasetService.java | 2 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 109 + .../io/gcp/bigquery/BigQuerySinkMetricsTest.java | 4 +- 5 files changed, 123 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8f24ebc8ad9..3c6c73dd021 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -670,10 +670,10 @@ public class StorageApiWriteUnshardedRecords BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError()); if (failedContext.getError() != null - && failedContext.getError() instanceof Exceptions.AppendSerializtionError) { -Exceptions.AppendSerializtionError error = + && failedContext.getError() instanceof Exceptions.AppendSerializationError) { +Exceptions.AppendSerializationError error = Preconditions.checkStateNotNull( -(Exceptions.AppendSerializtionError) failedContext.getError()); +(Exceptions.AppendSerializationError) failedContext.getError()); Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); for (int failedIndex : failedRowIndices) { @@ -1164,5 +1164,10 @@ public class StorageApiWriteUnshardedRecords throw new RuntimeException(e); } } + +@Override +public Duration getAllowedTimestampSkew() { + return Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); +} } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 8cf8ad0ee02..0f9b07d0c40 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -662,10 +662,10 @@ public class StorageApiWritesShardedRecords failedRowIndices = error.getRowIndexToErrorMessage().keySet(); for (int failedIndex : failedRowIndices) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 6a50127acd8..1e746d7f96b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -680,7 +680,7 @@ public class FakeDatasetService implements DatasetService, WriteStreamService, S } if (!rowIndexToErrorMessage.isEmpty()) { return ApiFutures.immediateFailedFuture( -new Exceptions.AppendSerializtionError( +new Exceptions.AppendSerializationError( Code.INVALID_ARGUMENT.getNumber(), "Append serialization failed for writer: " + streamName, stream.streamName, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 720419f2227..55269342155 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest
(beam) branch master updated: Dynamic SchemaTransform wrapper provider (#29561)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 921e40a12f4 Dynamic SchemaTransform wrapper provider (#29561) 921e40a12f4 is described below commit 921e40a12f467c51161bf33a0144fb8a1d4ca334 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Dec 14 05:58:29 2023 +0300 Dynamic SchemaTransform wrapper provider (#29561) * wrapper provider * add typehints * add GenerateSequence schematransform and expansion service for java core * address comments; add description property * add description test * add experimental note --- .../GenerateSequenceSchemaTransformProvider.java | 201 +++ .../apache/beam/sdk/providers/package-info.java| 23 ++ ...enerateSequenceSchemaTransformProviderTest.java | 61 + .../io/external/xlang_kafkaio_it_test.py | 4 +- sdks/python/apache_beam/transforms/external.py | 1 + .../external_schematransform_provider.py | 277 + .../external_schematransform_provider_test.py | 140 +++ sdks/python/apache_beam/typehints/schemas.py | 3 + sdks/python/pytest.ini | 2 +- sdks/python/test-suites/dataflow/build.gradle | 4 +- sdks/python/test-suites/direct/build.gradle| 8 +- sdks/python/test-suites/gradle.properties | 4 +- sdks/python/test-suites/xlang/build.gradle | 9 +- 13 files changed, 723 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java new file mode 100644 index 000..f4cada661b0 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.providers; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.providers.GenerateSequenceSchemaTransformProvider.GenerateSequenceConfiguration; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.Duration; + +@AutoService(SchemaTransformProvider.class) +public class GenerateSequenceSchemaTransformProvider +extends TypedSchemaTransformProvider { + public static final String OUTPUT_ROWS_TAG = "output"; + public static final Schema OUTPUT_SCHEMA = Schema.builder().addInt64Field("value").build(); + + @Override + public String identifier() { +return "beam:schematransform:org.apache.beam:generate_sequence:v1"; + } + + @Override + public List inputCollectionNames() { +return Collections.emptyList(); + } + + @Override + public List outputCollectionNames() { +return Collections.singletonList(OUTPUT_ROWS_TAG); + } + + @Override + public String desc
(beam) branch master updated: Pass Java SchemaTransform descriptions to Python SDK (#29606)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a96fa74f0cc Pass Java SchemaTransform descriptions to Python SDK (#29606) a96fa74f0cc is described below commit a96fa74f0ccb8d0c5490ba2872bfd52675a15558 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Dec 11 00:35:32 2023 +0300 Pass Java SchemaTransform descriptions to Python SDK (#29606) * pipe thru schematransform descriptions --- .../beam/sdk/schemas/transforms/SchemaTransformProvider.java | 6 +- .../schemas/transforms/TypedSchemaTransformProviderTest.java | 6 ++ .../BigQueryStorageWriteApiSchemaTransformProvider.java| 10 ++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java index e73ec5d870c..c76d7a25e69 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java @@ -36,7 +36,11 @@ public interface SchemaTransformProvider { /** Returns an id that uniquely represents this transform. */ String identifier(); - /** Returns a description of this transform to be used for documentation. */ + /** + * Returns a description regarding the {@link SchemaTransform} represented by the {@link + * SchemaTransformProvider}. Please keep the language generic (i.e. not specific to any + * programming language). The description may be markdown formatted. + */ default String description() { return ""; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java index db7b1436a12..2b698f4f67b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java @@ -61,6 +61,11 @@ public class TypedSchemaTransformProviderTest { return "fake:v1"; } +@Override +public String description() { + return "Description of fake provider"; +} + @Override protected Class configurationClass() { return Configuration.class; @@ -115,6 +120,7 @@ public class TypedSchemaTransformProviderTest { Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config; assertEquals("field1", outputConfig.getField1()); assertEquals(13, outputConfig.getField2().intValue()); +assertEquals("Description of fake provider", provider.description()); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 98cc246ce0d..8c4edd2244b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -98,6 +98,16 @@ public class BigQueryStorageWriteApiSchemaTransformProvider return String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v2"); } + @Override + public String description() { +return String.format( +"Writes data to BigQuery using the Storage Write API (https://cloud.google.com/bigquery/docs/write-api)." ++ "\n\nThis expects a single PCollection of Beam Rows and outputs two dead-letter queues (DLQ) that " ++ "contain failed rows. The first DLQ has tag [%s] and contains the failed rows. The second DLQ has " ++ "tag [%s] and contains failed rows and along with their respective errors.", +FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG); + } + @Override public List inputCollectionNames() { return Collections.singletonList(INPUT_ROWS_TAG);
(beam) branch master updated (ebc4a5585e0 -> 39e1615f945)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from ebc4a5585e0 Documents the transform upgrade feature (#29581) add 39e1615f945 fix typos (#29588) No new revisions were added by this update. Summary of changes: .../site/content/en/documentation/programming-guide.md | 18 +- 1 file changed, 9 insertions(+), 9 deletions(-)
(beam) branch master updated: Revert "revert protobuf version (#29523)" (#29530)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5a6329492f7 Revert "revert protobuf version (#29523)" (#29530) 5a6329492f7 is described below commit 5a6329492f713750ac141d90607653978eccabc2 Author: Shunping Huang AuthorDate: Fri Nov 24 14:14:23 2023 -0500 Revert "revert protobuf version (#29523)" (#29530) --- buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 1664f5fc670..359aeea55a2 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -626,7 +626,7 @@ class BeamModulePlugin implements Plugin { def postgres_version = "42.2.16" def powermock_version = "2.0.9" // Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom -def protobuf_version = "3.23.2" +def protobuf_version = "3.24.4" def qpid_jms_client_version = "0.61.0" def quickcheck_version = "1.0" def sbe_tool_version = "1.25.1"
(beam) branch master updated: revert protobuf version (#29523)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 183141d2eab revert protobuf version (#29523) 183141d2eab is described below commit 183141d2eab4f8b1f4ffd67200f5618c3abd48af Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Nov 22 22:51:27 2023 -0500 revert protobuf version (#29523) --- buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 359aeea55a2..1664f5fc670 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -626,7 +626,7 @@ class BeamModulePlugin implements Plugin { def postgres_version = "42.2.16" def powermock_version = "2.0.9" // Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom -def protobuf_version = "3.24.4" +def protobuf_version = "3.23.2" def qpid_jms_client_version = "0.61.0" def quickcheck_version = "1.0" def sbe_tool_version = "1.25.1"
(beam) branch master updated: Fix BigQueryStorageWriteApiSchemaTransformProvider null pointer on numStreams (#29434)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new bb4b633b019 Fix BigQueryStorageWriteApiSchemaTransformProvider null pointer on numStreams (#29434) bb4b633b019 is described below commit bb4b633b01970d46359637d58a296b532269b18e Author: Jeff Kinard <35542536+pol...@users.noreply.github.com> AuthorDate: Tue Nov 21 10:38:36 2023 -0500 Fix BigQueryStorageWriteApiSchemaTransformProvider null pointer on numStreams (#29434) * Fix BigQueryStorageWriteApiSchemaTransformProvider null pointer on numStreams Signed-off-by: Jeffrey Kinard --- .../BigQueryStorageWriteApiSchemaTransformProvider.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 1b9eb309ec4..39e6fd7c809 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -177,7 +177,9 @@ public class BigQueryStorageWriteApiSchemaTransformProvider invalidConfigMessage + "Output must not be empty if error handling specified."); } - if (this.getAutoSharding() != null && this.getAutoSharding()) { + if (this.getAutoSharding() != null + && this.getAutoSharding() + && this.getNumStreams() != null) { checkArgument( this.getNumStreams() == 0, invalidConfigMessage @@ -338,7 +340,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider Boolean autoSharding = configuration.getAutoSharding(); Integer numStreams = configuration.getNumStreams(); // Triggering frequency is only applicable for exactly-once -if (!configuration.getUseAtLeastOnceSemantics()) { +if (configuration.getUseAtLeastOnceSemantics() == null +|| !configuration.getUseAtLeastOnceSemantics()) { write = write.withTriggeringFrequency( (triggeringFrequency == null || triggeringFrequency <= 0) @@ -346,7 +349,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider : Duration.standardSeconds(triggeringFrequency)); } // set num streams if specified, otherwise default to autoSharding -if (numStreams > 0) { +if (numStreams != null && numStreams > 0) { write = write.withNumStorageWriteApiStreams(numStreams); } else if (autoSharding == null || autoSharding) { write = write.withAutoSharding();
(beam) branch master updated: Add ability to run performance regression checks on Beam IO Load tests. (#29226)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ea8596f2df0 Add ability to run performance regression checks on Beam IO Load tests. (#29226) ea8596f2df0 is described below commit ea8596f2df0e3e4b9da9f215ae6745c2ddfb6612 Author: Pranav Bhandari AuthorDate: Wed Nov 1 12:50:01 2023 -0400 Add ability to run performance regression checks on Beam IO Load tests. (#29226) --- .../testing/analyzers/io_tests_config.yaml | 256 + .../testing/analyzers/load_test_perf_analysis.py | 98 .../testing/analyzers/perf_analysis_utils.py | 11 +- 3 files changed, 364 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/testing/analyzers/io_tests_config.yaml b/sdks/python/apache_beam/testing/analyzers/io_tests_config.yaml new file mode 100644 index 000..2a33ae31797 --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/io_tests_config.yaml @@ -0,0 +1,256 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spanner_io_read: + test_description: | +SpannerIO Read test 100 GB. + project: apache-beam-testing + metrics_dataset: performance_tests + metrics_table: io_performance_metrics + # test_name is in the format testName,pipelineName + test_name: testSpannerWriteAndRead,read-spanner + metric_name: +- RunTime +- EstimatedCost + +spanner_io_read_runnerV2: + test_description: | +SpannerIO RunnerV2 Read test 100 GB. + project: apache-beam-testing + metrics_dataset: performance_tests + metrics_table: io_performance_metrics + # test_name is in the format testName,pipelineName + test_name: testSpannerWriteAndRead,read_spanner_v2 + metric_name: +- RunTime +- EstimatedCost + +spanner_io_write: + test_description: | +SpannerIO write test 100 GB. + project: apache-beam-testing + metrics_dataset: performance_tests + metrics_table: io_performance_metrics + # test_name is in the format testName,pipelineName + test_name: testSpannerWriteAndRead,write-spanner + metric_name: +- RunTime +- EstimatedCost + +spanner_io_write_runnerV2: + test_description: | +SpannerIO RunnerV2 write test 100 GB. + project: apache-beam-testing + metrics_dataset: performance_tests + metrics_table: io_performance_metrics + # test_name is in the format testName,pipelineName + test_name: testSpannerWriteAndRead,write_spanner_v2 + metric_name: +- RunTime +- EstimatedCost + +bigquery_io_storage_api_read: + test_description: | +BigQueryIO Storage write API read test 100 GB. + project: apache-beam-testing + metrics_dataset: performance_tests + metrics_table: io_performance_metrics + # test_name is in the format testName,pipelineName + test_name: testStorageAPIWriteThenRead,read-bigquery + metric_name: +- RunTime +- EstimatedCost + +bigquery_io_storage_api_read_runnerV2: + test_description: | +BigQueryIO RunnerV2 Storage write API read test 100 GB. + project: apache-beam-testing + metrics_dataset: performance_tests + metrics_table: io_performance_metrics + # test_name is in the format testName,pipelineName + test_name: testStorageAPIWriteThenRead,read_bigquery_v2 + metric_name: +- RunTime +- EstimatedCost + +bigquery_io_storage_api_write: + test_description: | +BigQueryIO Storage write API write test 100 GB. + project: apache-beam-testing + metrics_dataset: performance_tests + metrics_table: io_performance_metrics + # test_name is in the format testName,pipelineName + test_name: testStorageAPIWriteThenRead,write-bigquery + metric_name: +- RunTime +- EstimatedCost + +bigquery_io_storage_api_write_runnerV2: + test_description: | +BigQueryIO Storage write API write test 100 GB. + project: apache-beam-testing + metrics_dataset: performance_tests + metrics_table: io_performance_metrics + # test_name is in the format testName,pipelineName + test_name: testStorageAPIWriteThenRead,write_bigquery_v2 + metric_name: +- RunTime +- EstimatedCost + +bigquery_io_avro_file_loads_read:
[beam] branch master updated (2bccee14cf9 -> a859863d392)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 2bccee14cf9 Bump google.golang.org/api from 0.144.0 to 0.146.0 in /sdks (#28911) add a859863d392 update dataflow containers (#28904) No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/runners/dataflow/internal/names.py| 2 +- website/www/site/content/en/documentation/runtime/environments.md | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-)
[beam] branch master updated (604629798a3 -> d17063412ea)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 604629798a3 Add dataclasses to perf alert tool and refactor code. (#28889) add d17063412ea Update BigQueryIO Documentation (#28591) No new revisions were added by this update. Summary of changes: .../en/documentation/io/built-in/google-bigquery.md| 14 ++ 1 file changed, 6 insertions(+), 8 deletions(-)
[beam] branch master updated (31e1c7a8aef -> 0304cae8cc8)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 31e1c7a8aef Bump pillow (#28810) add 0304cae8cc8 add bigtable reviewers (#28834) No new revisions were added by this update. Summary of changes: .github/REVIEWERS.yml | 4 .github/autolabeler.yml | 1 + 2 files changed, 5 insertions(+)
[beam] branch master updated (c01b41f9758 -> be805372238)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from c01b41f9758 Bump urllib3 from 1.26.16 to 1.26.17 in /sdks/python/container/py310 (#28785) add be805372238 BigQuery testing suite that runs against BQ's day 0 region (#28397) No new revisions were added by this update. Summary of changes: ... beam_PostCommit_Java_BigQueryEarlyRollout.yml} | 27 - sdks/java/io/google-cloud-platform/build.gradle| 44 +- .../sdk/io/gcp/bigquery/TestBigQueryOptions.java | 7 .../gcp/common/GcpIoPipelineOptionsRegistrar.java | 2 + .../beam/sdk/io/gcp/testing/BigqueryClient.java| 37 -- .../io/gcp/bigquery/BigQueryIOStorageQueryIT.java | 10 - .../io/gcp/bigquery/BigQueryIOStorageReadIT.java | 9 - .../bigquery/BigQueryIOStorageReadTableRowIT.java | 43 - .../io/gcp/bigquery/BigQueryIOStorageWriteIT.java | 31 +++ .../bigquery/BigQuerySchemaUpdateOptionsIT.java| 6 ++- .../BigQueryTimePartitioningClusteringIT.java | 42 - .../sdk/io/gcp/bigquery/BigQueryToTableIT.java | 9 +++-- .../sdk/io/gcp/bigquery/FileLoadsStreamingIT.java | 9 - .../bigquery/StorageApiDirectWriteProtosIT.java| 9 - .../gcp/bigquery/StorageApiSinkFailedRowsIT.java | 13 ++- .../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 9 - .../gcp/bigquery/StorageApiSinkSchemaUpdateIT.java | 17 ++--- .../gcp/bigquery/TableRowToStorageApiProtoIT.java | 11 -- 18 files changed, 261 insertions(+), 74 deletions(-) copy .github/workflows/{beam_PostCommit_Java.yml => beam_PostCommit_Java_BigQueryEarlyRollout.yml} (75%)
[beam] branch master updated (c7d7896533b -> 2e0521162f6)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from c7d7896533b Github Workflow Replacement for Jenkins Jobs, beam_LoadTests_Java_GBK_Dataflow_Batch_* (#28738) add 2e0521162f6 [Java BQ] Storage API streaming load test (#28264) No new revisions were added by this update. Summary of changes: .../beam_PostCommit_Java_IO_Performance_Tests.yml | 2 +- it/build.gradle| 4 + it/google-cloud-platform/build.gradle | 5 +- .../beam/it/gcp/bigquery/BigQueryStreamingLT.java | 643 + .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 6 +- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 6 +- .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 29 +- .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 37 +- 8 files changed, 723 insertions(+), 9 deletions(-) create mode 100644 it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java
[beam] branch master updated: Add Bigtable xlang bug to release notes for versions 2.49.0 and 2.50.0 (#28633)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 72b6645ec55 Add Bigtable xlang bug to release notes for versions 2.49.0 and 2.50.0 (#28633) 72b6645ec55 is described below commit 72b6645ec5533dcedeadcec9fad764acc1be46d0 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Sep 25 15:17:44 2023 -0400 Add Bigtable xlang bug to release notes for versions 2.49.0 and 2.50.0 (#28633) --- CHANGES.md | 3 +++ website/www/site/content/en/blog/beam-2.49.0.md | 1 + website/www/site/content/en/blog/beam-2.50.0.md | 1 + 3 files changed, 5 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index a990a5fd730..650b33c1240 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -85,6 +85,7 @@ * Fixed exception chaining issue in GCS connector (Python) ([#26769](https://github.com/apache/beam/issues/26769#issuecomment-1700422615)). * Fixed streaming inserts exception handling, GoogleAPICallErrors are now retried according to retry strategy and routed to failed rows where appropriate rather than causing a pipeline error (Python) ([#21080](https://github.com/apache/beam/issues/21080)). +* Fixed a bug in Python SDK's cross-language Bigtable sink that mishandled records that don't have an explicit timestamp set: [#28632](https://github.com/apache/beam/issues/28632). ## Security Fixes @@ -153,6 +154,7 @@ * Long-running Python pipelines might experience a memory leak: [#28246](https://github.com/apache/beam/issues/28246). * Python Pipelines using BigQuery IO or `orjson` dependency might experience segmentation faults or get stuck: [#28318](https://github.com/apache/beam/issues/28318). * Beam Python containers rely on a version of Debian/aom that has several security vulnerabilities: [CVE-2021-30474](https://nvd.nist.gov/vuln/detail/CVE-2021-30474), [CVE-2021-30475](https://nvd.nist.gov/vuln/detail/CVE-2021-30475), [CVE-2021-30473](https://nvd.nist.gov/vuln/detail/CVE-2021-30473), [CVE-2020-36133](https://nvd.nist.gov/vuln/detail/CVE-2020-36133), [CVE-2020-36131](https://nvd.nist.gov/vuln/detail/CVE-2020-36131), [CVE-2020-36130](https://nvd.nist.gov/vuln/detail/CVE-202 [...] +* Python SDK's cross-language Bigtable sink mishandles records that don't have an explicit timestamp set: [#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set explicit timestamps for all records before writing to Bigtable. # [2.49.0] - 2023-07-17 @@ -225,6 +227,7 @@ * PubsubIO writes will throw *SizeLimitExceededException* for any message above 100 bytes, when used in batch (bounded) mode. (Java) ([#27000](https://github.com/apache/beam/issues/27000)). * Long-running Python pipelines might experience a memory leak: [#28246](https://github.com/apache/beam/issues/28246). +* Python SDK's cross-language Bigtable sink mishandles records that don't have an explicit timestamp set: [#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set explicit timestamps for all records before writing to Bigtable. # [2.47.0] - 2023-05-10 diff --git a/website/www/site/content/en/blog/beam-2.49.0.md b/website/www/site/content/en/blog/beam-2.49.0.md index 595b8e71253..a2e7af0e18f 100644 --- a/website/www/site/content/en/blog/beam-2.49.0.md +++ b/website/www/site/content/en/blog/beam-2.49.0.md @@ -51,6 +51,7 @@ For more information on changes in 2.49.0, check out the [detailed release notes ### Known Issues * Long-running Python pipelines might experience a memory leak: [#28246](https://github.com/apache/beam/issues/28246). +* Python SDK's cross-language Bigtable sink mishandles records that don't have an explicit timestamp set: [#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set explicit timestamps for all records before writing to Bigtable. ## List of Contributors diff --git a/website/www/site/content/en/blog/beam-2.50.0.md b/website/www/site/content/en/blog/beam-2.50.0.md index 4cfddd6167a..43bdecdc3ba 100644 --- a/website/www/site/content/en/blog/beam-2.50.0.md +++ b/website/www/site/content/en/blog/beam-2.50.0.md @@ -83,6 +83,7 @@ For more information on changes in 2.50.0, check out the [detailed release notes * Long-running Python pipelines might experience a memory leak: [#28246](https://github.com/apache/beam/issues/28246). * Python Pipelines using BigQuery IO or `orjson` dependency might experience segmentation faults or get stuck: [#28318](https://github.com/apache/beam/issues/28318). +* Python SDK's cross-language Bigtable sink mishandles records that don't have an explicit timestamp set: [#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set explicit timestamps for all records before writing to Bigtable. #
[beam] branch master updated: [Fix Python Bigtable dataloss bug] Stop unsetting timestamps of -1 (#28624)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 68cf802626a [Fix Python Bigtable dataloss bug] Stop unsetting timestamps of -1 (#28624) 68cf802626a is described below commit 68cf802626a1ef7d41aadca13d009f0d1b609b33 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Sat Sep 23 03:56:19 2023 + [Fix Python Bigtable dataloss bug] Stop unsetting timestamps of -1 (#28624) * add test for unset timestamp * pass thru all timestamps * default to -1 in schematransform --- .../BigtableWriteSchemaTransformProvider.java | 13 +++-- .../BigtableWriteSchemaTransformProviderIT.java | 19 +++ sdks/python/apache_beam/io/gcp/bigtableio.py | 7 +++ sdks/python/apache_beam/io/gcp/bigtableio_it_test.py | 18 ++ 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java index d38bdae2f09..b99b69621a8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java @@ -179,12 +179,13 @@ public class BigtableWriteSchemaTransformProvider .setColumnQualifier( ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get())) .setFamilyNameBytes( - ByteString.copyFrom(ofNullable(mutation.get("family_name")).get())); -if (mutation.containsKey("timestamp_micros")) { - setMutation = - setMutation.setTimestampMicros( - Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get())); -} + ByteString.copyFrom(ofNullable(mutation.get("family_name")).get())) +// Use timestamp if provided, else default to -1 (current Bigtable server time) +.setTimestampMicros( +mutation.containsKey("timestamp_micros") +? Longs.fromByteArray( + ofNullable(mutation.get("timestamp_micros")).get()) +: -1); bigtableMutation = Mutation.newBuilder().setSetCell(setMutation.build()).build(); break; case "DeleteFromColumn": diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java index 14bb04b0315..1a60fe661b5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java @@ -154,8 +154,8 @@ public class BigtableWriteSchemaTransformProviderIT { public void testSetMutationsExistingColumn() { RowMutation rowMutation = RowMutation.create(tableId, "key-1") -.setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a") -.setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c"); +.setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a") +.setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c"); dataClient.mutateRow(rowMutation); List> mutations = new ArrayList<>(); @@ -165,13 +165,15 @@ public class BigtableWriteSchemaTransformProviderIT { "type", "SetCell".getBytes(StandardCharsets.UTF_8), "value", "new-val-1-a".getBytes(StandardCharsets.UTF_8), "column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8), -"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8))); +"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8), +"timestamp_micros", Longs.toByteArray(2000))); mutations.add( ImmutableMap.of( "type", "SetCell".getBytes(StandardCharsets.UTF_8), &quo
[beam] branch master updated: [Python BQ] Follow up of #28592: Allow setting a fixed number of Storage API streams (#28618)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 1b6de5d407b [Python BQ] Follow up of #28592: Allow setting a fixed number of Storage API streams (#28618) 1b6de5d407b is described below commit 1b6de5d407ba4122710ec870c880a1ee443b6d2b Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Fri Sep 22 19:49:41 2023 + [Python BQ] Follow up of #28592: Allow setting a fixed number of Storage API streams (#28618) * expose num streams option; fix some streaming tests * add test phrase in description * lint fix --- ...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy | 2 +- ...stCommit_Python_CrossLanguage_Gcp_Direct.groovy | 2 +- ...ueryStorageWriteApiSchemaTransformProvider.java | 37 ++ .../io/external/xlang_bigqueryio_it_test.py| 44 ++ sdks/python/apache_beam/io/gcp/bigquery.py | 9 + .../documentation/io/built-in/google-bigquery.md | 2 +- 6 files changed, 69 insertions(+), 27 deletions(-) diff --git a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy index d1ee27088c7..1280fcb4e23 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow', 'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow (\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) { - description('Runs end-to-end cross language GCP IO tests on the Dataflow runner.') + description('Runs end-to-end cross language GCP IO tests on the Dataflow runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"') // Set common parameters. diff --git a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy index 438b735fba7..e4bf771be1a 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct', 'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run Python_Xlang_Gcp_Direct PostCommit\")', this) { - description('Runs end-to-end cross language GCP IO tests on the Direct runner.') + description('Runs end-to-end cross language GCP IO tests on the Direct runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"') // Set common parameters. commonJobProperties.setTopLevelMainJobProperties(delegate) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index e4461793011..1b9eb309ec4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -176,6 +176,13 @@ public class BigQueryStorageWriteApiSchemaTransformProvider !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()), invalidConfigMessage + "Output must not be empty if error handling specified."); } + + if (this.getAutoSharding() != null && this.getAutoSharding()) { +checkArgument( +this.getNumStreams() == 0, +invalidConfigMessage ++ "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options."); + } } /** @@ -218,11 +225,17 @@ public class BigQueryStorageWriteApiSchemaTransformProvider public abstract Boolean getUseAtLeastOnceSemantics(); @SchemaFieldDescription( -"This option enables using a dynamically determined number of shards to write to " +"This op
[beam] branch master updated (04a26da777f -> 426dbd3955e)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 04a26da777f [Python BQ] Allow setting a fixed number of Storage API streams (#28592) add 426dbd3955e Revert "[Python BQ] Allow setting a fixed number of Storage API streams (#28592)" (#28613) No new revisions were added by this update. Summary of changes: ...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy | 2 +- ...stCommit_Python_CrossLanguage_Gcp_Direct.groovy | 2 +- ...ueryStorageWriteApiSchemaTransformProvider.java | 37 + .../io/external/xlang_bigqueryio_it_test.py| 38 -- sdks/python/apache_beam/io/gcp/bigquery.py | 9 - .../documentation/io/built-in/google-bigquery.md | 2 +- 6 files changed, 24 insertions(+), 66 deletions(-)
[beam] 01/01: Revert "[Python BQ] Allow setting a fixed number of Storage API streams (#28592)"
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch revert-28592-expose_numshards in repository https://gitbox.apache.org/repos/asf/beam.git commit 65bf0e2616e78ba6ffe43748d7cd5a1614c79653 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Fri Sep 22 09:13:13 2023 -0400 Revert "[Python BQ] Allow setting a fixed number of Storage API streams (#28592)" This reverts commit 04a26da777ff4c0ed9112f07bf0f41a39bc7260d. --- ...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy | 2 +- ...stCommit_Python_CrossLanguage_Gcp_Direct.groovy | 2 +- ...ueryStorageWriteApiSchemaTransformProvider.java | 37 + .../io/external/xlang_bigqueryio_it_test.py| 38 -- sdks/python/apache_beam/io/gcp/bigquery.py | 9 - .../documentation/io/built-in/google-bigquery.md | 2 +- 6 files changed, 24 insertions(+), 66 deletions(-) diff --git a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy index 1280fcb4e23..d1ee27088c7 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow', 'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow (\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) { - description('Runs end-to-end cross language GCP IO tests on the Dataflow runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"') + description('Runs end-to-end cross language GCP IO tests on the Dataflow runner.') // Set common parameters. diff --git a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy index e4bf771be1a..438b735fba7 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct', 'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run Python_Xlang_Gcp_Direct PostCommit\")', this) { - description('Runs end-to-end cross language GCP IO tests on the Direct runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"') + description('Runs end-to-end cross language GCP IO tests on the Direct runner.') // Set common parameters. commonJobProperties.setTopLevelMainJobProperties(delegate) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index c3eed246723..e4461793011 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -176,13 +176,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()), invalidConfigMessage + "Output must not be empty if error handling specified."); } - - if (this.getAutoSharding() != null && this.getAutoSharding()) { -checkArgument( -this.getNumStreams() == 0, -invalidConfigMessage -+ "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options."); - } } /** @@ -225,17 +218,11 @@ public class BigQueryStorageWriteApiSchemaTransformProvider public abstract Boolean getUseAtLeastOnceSemantics(); @SchemaFieldDescription( -"This option enables using a dynamically determined number of Storage Write API streams to write to " +"This option enables using a dynamically determined number of shards to write to " + "BigQuery. Only applicable to unbounded data.") @Nullable public abstract Boolean getAutoSharding(); -@SchemaFieldDescription(
[beam] branch revert-28592-expose_numshards created (now 65bf0e2616e)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch revert-28592-expose_numshards in repository https://gitbox.apache.org/repos/asf/beam.git at 65bf0e2616e Revert "[Python BQ] Allow setting a fixed number of Storage API streams (#28592)" This branch includes the following new commits: new 65bf0e2616e Revert "[Python BQ] Allow setting a fixed number of Storage API streams (#28592)" The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[beam] branch master updated (0146a8389b4 -> 04a26da777f)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 0146a8389b4 cleanup jobs (#28528) add 04a26da777f [Python BQ] Allow setting a fixed number of Storage API streams (#28592) No new revisions were added by this update. Summary of changes: ...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy | 2 +- ...stCommit_Python_CrossLanguage_Gcp_Direct.groovy | 2 +- ...ueryStorageWriteApiSchemaTransformProvider.java | 37 - .../io/external/xlang_bigqueryio_it_test.py| 38 ++ sdks/python/apache_beam/io/gcp/bigquery.py | 9 + .../documentation/io/built-in/google-bigquery.md | 2 +- 6 files changed, 66 insertions(+), 24 deletions(-)
[beam] branch master updated (f676d93030c -> ef0d8d4041c)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from f676d93030c When comparing Series, sort the values in Dataframe tests (#28557) add ef0d8d4041c Label Python external SchemaTransform with its URN (#28540) No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/transforms/external.py | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-)
[beam] branch master updated: Return errors for insert_rows_json exceptions (#21080) (#28091)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 0f727927c01 Return errors for insert_rows_json exceptions (#21080) (#28091) 0f727927c01 is described below commit 0f727927c0136c63e8127ab5e9d3279fd5748dc4 Author: Adam Whitmore <61937566+ajdub...@users.noreply.github.com> AuthorDate: Tue Sep 19 15:05:46 2023 -0400 Return errors for insert_rows_json exceptions (#21080) (#28091) --- CHANGES.md | 1 + sdks/python/apache_beam/io/gcp/bigquery_test.py| 853 +++-- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 22 +- .../apache_beam/io/gcp/bigquery_write_it_test.py | 51 +- 4 files changed, 681 insertions(+), 246 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 47c35fe3491..cdf93909cb6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -82,6 +82,7 @@ ## Bugfixes * Fixed exception chaining issue in GCS connector (Python) ([#26769](https://github.com/apache/beam/issues/26769#issuecomment-1700422615)). +* Fixed streaming inserts exception handling, GoogleAPICallErrors are now retried according to retry strategy and routed to failed rows where appropriate rather than causing a pipeline error (Python) ([#21080](https://github.com/apache/beam/issues/21080)). ## Security Fixes diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 3a3033dfcaf..7e9c1e63474 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -77,7 +77,6 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -from apache_beam.utils import retry # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -931,77 +930,269 @@ class TestWriteToBigQuery(unittest.TestCase): 'GCP dependencies are not installed') class BigQueryStreamingInsertsErrorHandling(unittest.TestCase): - # Using https://cloud.google.com/bigquery/docs/error-messages and - # https://googleapis.dev/python/google-api-core/latest/_modules/google - #/api_core/exceptions.html - # to determine error types and messages to try for retriables. + # Running tests with a variety of exceptions from https://googleapis.dev + # /python/google-api-core/latest/_modules/google/api_core/exceptions.html. + # Choosing some exceptions that produce reasons included in + # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not @parameterized.expand([ + # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt + # transient error retried and succeeds on second attempt, 0 rows sent to + # failed rows param( - exception_type=exceptions.Forbidden if exceptions else None, - error_reason='rateLimitExceeded'), + insert_response=[ +exceptions.TooManyRequests if exceptions else None, +None], + error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS + failed_rows=[]), + # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to + # failed rows after hitting max_retries + param( + insert_response=[ +exceptions.InternalServerError if exceptions else None, +exceptions.InternalServerError if exceptions else None], + error_reason='Internal Server Error', # not in _NON_TRANSIENT_ERRORS + failed_rows=['value1', 'value3', 'value5']), + # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to + # failed_rows after hitting max_retries + param( + insert_response=[ +exceptions.Forbidden if exceptions else None, +exceptions.Forbidden if exceptions else None], + error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS + failed_rows=['value1', 'value3', 'value5']), + ]) + def test_insert_rows_json_exception_retry_always( + self, insert_response, error_reason, failed_rows): +# In this test, a pipeline will always retry all caught exception types +# since RetryStrategy is not set and defaults to RETRY_ALWAYS +with mock.patch('time.sleep'): + call_counter = 0 + mock_response = mock.Mock() + mock_response.reason = error_reason + + def store_callback(table, **kwargs): +nonlocal call_counter +# raise exception if insert_response element is an exception +if insert_response[call_counter]: + exception_type = insert_response[call_counter] + call_counter += 1 + raise exception_type('some exc
[beam] branch master updated: Updating Storage API Autosharding documentation to include that it doesn't work on Runner V2 (#28233)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new d6068ad66b0 Updating Storage API Autosharding documentation to include that it doesn't work on Runner V2 (#28233) d6068ad66b0 is described below commit d6068ad66b0a28a6dd628bb8ef48f1e2182acb4b Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Sep 18 18:27:08 2023 + Updating Storage API Autosharding documentation to include that it doesn't work on Runner V2 (#28233) * add documentation * doc for python too --- .../www/site/content/en/documentation/io/built-in/google-bigquery.md | 4 1 file changed, 4 insertions(+) diff --git a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md index 24314dc1180..eae98b84d2c 100644 --- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md +++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md @@ -788,6 +788,8 @@ BigQuery Storage Write API for Python SDK currently has some limitations on supp {{< paragraph class="language-py" >}} **Note:** If you want to run WriteToBigQuery with Storage Write API from the source code, you need to run `./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build` to build the expansion-service jar. If you are running from a released Beam SDK, the jar will already be included. +**Note:** Auto sharding is not currently supported for Python's Storage Write API. + {{< /paragraph >}} Exactly-once semantics @@ -877,6 +879,8 @@ explicitly enable this using [`withAutoSharding`](https://beam.apache.org/releas ***Note:*** `STORAGE_WRITE_API` will default to dynamic sharding when `numStorageWriteApiStreams` is set to 0 or is unspecified. + +***Note:*** Auto sharding with `STORAGE_WRITE_API` is supported on Dataflow's legacy runner, but **not** on Runner V2 {{< /paragraph >}} When using `STORAGE_WRITE_API`, the PCollection returned by
[beam] branch master updated: Add file loads streaming integration tests (#28312)
This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 103253348ec Add file loads streaming integration tests (#28312) 103253348ec is described below commit 103253348ec3f1e193bb124e52dffcc603c929d9 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Sep 13 15:34:59 2023 + Add file loads streaming integration tests (#28312) * file loads streaming integration tests * fix dynamic destinations copy jobs * disable for runnerV2 until pane index is fixed --- runners/google-cloud-dataflow-java/build.gradle| 3 + .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 11 +- .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 4 + .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 27 +- .../sdk/io/gcp/bigquery/FileLoadsStreamingIT.java | 497 + 5 files changed, 532 insertions(+), 10 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index f6e2b9b147c..2acc30455e2 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -612,6 +612,9 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) { exclude '**/FhirIOLROIT.class' exclude '**/FhirIOSearchIT.class' exclude '**/FhirIOPatientEverythingIT.class' + // failing due to pane index not incrementing after Reshuffle: + // https://github.com/apache/beam/issues/28219 + exclude '**/FileLoadsStreamingIT.class' maxParallelForks 4 classpath = configurations.googleCloudPlatformIntegrationTest diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index f5f193aecb7..32ee29738bf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -399,10 +398,12 @@ class BatchLoads "Window Into Global Windows", Window.>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1 -.apply("Add Void Key", WithKeys.of((Void) null)) -.setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) -.apply("GroupByKey", GroupByKey.create()) -.apply("Extract Values", Values.create()) +// We use this and the following GBK to aggregate by final destination. +// This way, each destination has its own pane sequence +.apply("AddDestinationKeys", WithKeys.of(result -> result.getKey())) +.setCoder(KvCoder.of(destinationCoder, tempTables.getCoder())) +.apply("GroupTempTablesByFinalDestination", GroupByKey.create()) +.apply("ExtractTempTables", Values.create()) .apply( ParDo.of( new UpdateSchemaDestination( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 0063952d8b1..00ee815c3c9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -689,6 +689,10 @@ public class BigQueryUtils { } } +if (jsonBQValue instanceof byte[] && fieldType.getTypeName() == TypeName.BYTES) { + return jsonBQValue; +} + if (jsonBQValue instanceof List) { if (fieldType.getCollectionElementType() == null) { throw new IllegalArgumentException( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index 9bff77a1658..f4074cc1a55 100644 --- a/sdks/java/io/google-cloud-pla