(beam) branch master updated: Improve IcebergIO utils (#31958)

2024-07-31 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 2aefd5b9c0c Improve IcebergIO utils (#31958)
2aefd5b9c0c is described below

commit 2aefd5b9c0c8a4dcb577d367d859455e4a93ec14
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Jul 31 14:25:14 2024 -0400

Improve IcebergIO utils (#31958)

* Improve Iceberg utils

* add documentation; clarify variable name

* fix kinks, add type tests
---
 .../org/apache/beam/sdk/io/iceberg/IcebergIO.java  |   2 +-
 ...emaAndRowConversions.java => IcebergUtils.java} | 217 +--
 .../apache/beam/sdk/io/iceberg/RecordWriter.java   |   4 +-
 .../org/apache/beam/sdk/io/iceberg/ScanSource.java |   2 +-
 .../apache/beam/sdk/io/iceberg/ScanTaskReader.java |   4 +-
 .../apache/beam/sdk/io/iceberg/IcebergIOIT.java|   5 +-
 .../beam/sdk/io/iceberg/IcebergIOReadTest.java |   8 +-
 .../beam/sdk/io/iceberg/IcebergIOWriteTest.java|  12 +-
 .../IcebergReadSchemaTransformProviderTest.java|   8 +-
 .../beam/sdk/io/iceberg/IcebergUtilsTest.java  | 676 +
 .../IcebergWriteSchemaTransformProviderTest.java   |   5 +-
 .../apache/beam/sdk/io/iceberg/ScanSourceTest.java |   6 +-
 .../io/iceberg/SchemaAndRowConversionsTest.java| 268 
 .../apache/beam/sdk/io/iceberg/TestFixtures.java   |   2 +-
 14 files changed, 865 insertions(+), 354 deletions(-)

diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 50e0ea8b63d..c3c1da7c788 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -137,7 +137,7 @@ public class IcebergIO {
   .setCatalogConfig(getCatalogConfig())
   .setScanType(IcebergScanConfig.ScanType.TABLE)
   .setTableIdentifier(tableId)
-  
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema()))
+  
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
   .build(;
 }
   }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
similarity index 50%
rename from 
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java
rename to 
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
index e1a8685614f..a2f84e6475c 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -20,36 +20,42 @@ package org.apache.beam.sdk.io.iceberg;
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
-class SchemaAndRowConversions {
+/** Utilities for converting between Beam and Iceberg types. */
+public class IcebergUtils {
+  // This is made public for users convenience, as many may have more 
experience working with
+  // Iceberg types.
 
-  private SchemaAndRowConversions() {}
+  private IcebergUtils() {}
 
-  static final Map BEAM_TYPES_TO_ICEBERG_TYPES =
-  ImmutableMap.builder()
-  .put(Schema.FieldType.BOOLEAN, Types.BooleanType.get())
-  .put(Schema.FieldType.INT32, Types.IntegerType.get())
-  .put(Schema.FieldType.INT64, Types.LongType.get())
-  .put(Schema.FieldType.FLOAT, Types.FloatType.get())
-  .put(Schema.FieldType.DOUBLE, Types.DoubleType.get())
-  .put(Schema.FieldType.STRING, Types.StringType.get())
-  .put(Schema.FieldType.BYTES, Types.BinaryType.get())
+  private static final Map BEAM_TYPES_TO_ICEBERG_TYPES =
+  ImmutableMap.builder()
+ 

(beam) branch master updated (835630b12e2 -> b61ef7591fe)

2024-07-31 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 835630b12e2 Replace Class with TypeDescriptor in SchemaProvider 
implementations (#31785)
 add b61ef7591fe update document in AwsOptions (#32036)

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)



(beam) branch master updated: Update Dataflow containers (#31944)

2024-07-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 53a804e25b5 Update Dataflow containers (#31944)
53a804e25b5 is described below

commit 53a804e25b5baf901cd66f605f3cf2724a91c8c8
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Jul 22 13:31:03 2024 -0400

Update Dataflow containers (#31944)
---
 runners/google-cloud-dataflow-java/build.gradle | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 053d44f0074..177f8acf804 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -52,7 +52,7 @@ evaluationDependsOn(":sdks:java:container:java11")
 ext.dataflowLegacyEnvironmentMajorVersion = '8'
 ext.dataflowFnapiEnvironmentMajorVersion = '8'
 ext.dataflowLegacyContainerVersion = 'beam-master-20240718'
-ext.dataflowFnapiContainerVersion = 'beam-master-20240716'
+ext.dataflowFnapiContainerVersion = 'beam-master-20240718'
 ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3'
 
 processResources {



(beam) branch master updated: Use default auth for Iceberg integration tests (#31940)

2024-07-19 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 12ad2afd2d6 Use default auth for Iceberg integration tests (#31940)
12ad2afd2d6 is described below

commit 12ad2afd2d667d6897680b01fe555477cde3742d
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Fri Jul 19 15:32:59 2024 -0400

Use default auth for Iceberg integration tests (#31940)
---
 .github/trigger_files/IO_Iceberg_Integration_Tests.json| 2 +-
 .../src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java  | 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 1efc8e9e440..3f63c0c9975 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
 "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-"modification": 1
+"modification": 2
 }
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index 0420e2f5779..1c5686bfde9 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -108,8 +108,7 @@ public class IcebergIOIT implements Serializable {
 
 catalogHadoopConf = new Configuration();
 catalogHadoopConf.set("fs.gs.project.id", options.getProject());
-catalogHadoopConf.set(
-"fs.gs.auth.service.account.json.keyfile", 
System.getenv("GOOGLE_APPLICATION_CREDENTIALS"));
+catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT");
   }
 
   @Before



(beam) branch master updated: Update Dataflow containers (#31936)

2024-07-18 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 8760a677e81 Update Dataflow containers (#31936)
8760a677e81 is described below

commit 8760a677e810b69832e16a9e14fd88dd9c779907
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Jul 18 16:17:19 2024 -0400

Update Dataflow containers (#31936)
---
 runners/google-cloud-dataflow-java/build.gradle | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index c9fa85e4165..d01fca437b9 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -51,8 +51,8 @@ evaluationDependsOn(":sdks:java:container:java11")
 
 ext.dataflowLegacyEnvironmentMajorVersion = '8'
 ext.dataflowFnapiEnvironmentMajorVersion = '8'
-ext.dataflowLegacyContainerVersion = 'beam-master-20240306'
-ext.dataflowFnapiContainerVersion = 'beam-master-20240306'
+ext.dataflowLegacyContainerVersion = 'beam-master-20240718'
+ext.dataflowFnapiContainerVersion = 'beam-master-20240716'
 ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3'
 
 processResources {



(beam) branch master updated (2d6d55b98ce -> f3e6c66c0a5)

2024-07-17 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 2d6d55b98ce Fix generateYamlDocs gradle task (#31909)
 add f3e6c66c0a5 Improve performance of BigQueryIO connector when 
withPropagateSuccessfulStorageApiWrites(true) is used (#31840)

No new revisions were added by this update.

Summary of changes:
 CHANGES.md |  1 +
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 39 +-
 .../io/gcp/bigquery/TableRowToStorageApiProto.java |  2 +-
 3 files changed, 26 insertions(+), 16 deletions(-)



(beam) branch master updated: [CsvIO] Create CsvIOStringToCsvRecord Class (#31857)

2024-07-15 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ea761115998 [CsvIO] Create CsvIOStringToCsvRecord Class (#31857)
ea761115998 is described below

commit ea7611159989c64e2190604f5c50e9457c4ca2d8
Author: lahariguduru <108150650+laharigud...@users.noreply.github.com>
AuthorDate: Mon Jul 15 21:50:53 2024 +

[CsvIO] Create CsvIOStringToCsvRecord Class (#31857)

* Create CsvIOStringToCsvRecord class

* Create CsvIOStringToCsvRecord Class

* Create CsvIOStringToCsvRecord Class

* Create CsvIOStringToCsvRecord Class

* Fixed BadRecord Output

* Make class final

-

Co-authored-by: Lahari Guduru 
---
 .../beam/sdk/io/csv/CsvIOStringToCsvRecord.java|  61 +
 .../sdk/io/csv/CsvIOStringToCsvRecordTest.java | 143 +
 2 files changed, 204 insertions(+)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
new file mode 100644
index 000..995052bf7f7
--- /dev/null
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * {@link CsvIOStringToCsvRecord} is a class that takes a {@link 
PCollection} input and
+ * outputs a {@link PCollection} with potential {@link 
PCollection} for
+ * targeted error detection.
+ */
+final class CsvIOStringToCsvRecord
+extends PTransform, PCollection>> {
+  private final CSVFormat csvFormat;
+
+  CsvIOStringToCsvRecord(CSVFormat csvFormat) {
+this.csvFormat = csvFormat;
+  }
+
+  /**
+   * Creates {@link PCollection} from {@link PCollection} 
for future processing
+   * to Row or custom type.
+   */
+  @Override
+  public PCollection> expand(PCollection input) {
+return input.apply(ParDo.of(new ProcessLineToRecordFn()));
+  }
+
+  /** Processes each line in order to convert it to a {@link CSVRecord}. */
+  private class ProcessLineToRecordFn extends DoFn> {
+@ProcessElement
+public void process(@Element String line, OutputReceiver> 
receiver)
+throws IOException {
+  for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) {
+receiver.output(record);
+  }
+}
+  }
+}
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
new file mode 100644
index 000..44db791cbee
--- /dev/null
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package or

(beam) branch master updated: Increase retry backoff for Storage API batch to survive AppendRows quota refill (#31837)

2024-07-15 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 0b61035f36f Increase retry backoff for Storage API batch to survive 
AppendRows quota refill (#31837)
0b61035f36f is described below

commit 0b61035f36fb099a8dcd39978c71779a2e81f957
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Jul 15 15:23:04 2024 -0400

Increase retry backoff for Storage API batch to survive AppendRows quota 
refill (#31837)

* Increase retry backoff for Storage API batch

* longer waits for quota error only

* cleanup

* add to CHANGES.md

* no need for quota backoff. just increase allowed retries

* cleanup
---
 CHANGES.md | 3 ++-
 .../beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java  | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index fc94877a2bb..243596e6f2e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
 
 * Multiple RunInference instances can now share the same model instance by 
setting the model_identifier parameter (Python) 
([#31665](https://github.com/apache/beam/issues/31665)).
 * Added options to control the number of Storage API multiplexing connections 
([#31721](https://github.com/apache/beam/pull/31721))
+* [BigQueryIO] Better handling for batch Storage Write API when it hits 
AppendRows throughput quota 
([#31837](https://github.com/apache/beam/pull/31837))
 * [IcebergIO] All specified catalog properties are passed through to the 
connector ([#31726](https://github.com/apache/beam/pull/31726))
 * Removed a 3rd party LGPL dependency from the Go SDK 
([#31765](https://github.com/apache/beam/issues/31765)).
 * Support for MapState and SetState when using Dataflow Runner v1 with 
Streaming Engine (Java) 
([[#18200](https://github.com/apache/beam/issues/18200)])
@@ -83,7 +84,7 @@
 
 ## Bugfixes
 
-* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted 
concurrent connections quota 
([#31710](https://github.com/apache/beam/pull/31710))
+* [BigQueryIO] Fixed a bug in batch Storage Write API that frequently 
exhausted concurrent connections quota 
([#31710](https://github.com/apache/beam/pull/31710))
 * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
 
 ## Security Fixes
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 21c1d961e84..f0c4a56ed3d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -771,7 +771,7 @@ public class StorageApiWriteUnshardedRecords
 invalidateWriteStream();
 allowedRetry = 5;
   } else {
-allowedRetry = 10;
+allowedRetry = 35;
   }
 
   // Maximum number of times we retry before we fail the work item.



(beam) branch master updated: Add options to control number of Storage API connections when using multiplexing (#31721)

2024-07-12 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 4d429dde47b Add options to control number of Storage API connections 
when using multiplexing (#31721)
4d429dde47b is described below

commit 4d429dde47b570ccabba6b86173eb2546f76a5f2
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Fri Jul 12 14:46:37 2024 -0400

Add options to control number of Storage API connections when using 
multiplexing (#31721)

* add options to set min and max connections to connection management pool; 
rename counter to be more accurate

* add multiplexing description

* add to CHANGES.md

* clarify documentation and address comments

* adjust description

* add details
---
 CHANGES.md |  2 ++
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |  8 +++
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  | 27 ++
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  9 
 4 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 96c436d89ec..fc94877a2bb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
 ## New Features / Improvements
 
 * Multiple RunInference instances can now share the same model instance by 
setting the model_identifier parameter (Python) 
([#31665](https://github.com/apache/beam/issues/31665)).
+* Added options to control the number of Storage API multiplexing connections 
([#31721](https://github.com/apache/beam/pull/31721))
 * [IcebergIO] All specified catalog properties are passed through to the 
connector ([#31726](https://github.com/apache/beam/pull/31726))
 * Removed a 3rd party LGPL dependency from the Go SDK 
([#31765](https://github.com/apache/beam/issues/31765)).
 * Support for MapState and SetState when using Dataflow Runner v1 with 
Streaming Engine (Java) 
([[#18200](https://github.com/apache/beam/issues/18200)])
@@ -82,6 +83,7 @@
 
 ## Bugfixes
 
+* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted 
concurrent connections quota 
([#31710](https://github.com/apache/beam/pull/31710))
 * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
 
 ## Security Fixes
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index 5a12e81ea79..7505f77fb5b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.metrics.Metrics;
  */
 @AutoValue
 abstract class AppendClientInfo {
-  private final Counter activeConnections =
-  Metrics.counter(AppendClientInfo.class, "activeConnections");
+  private final Counter activeStreamAppendClients =
+  Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients");
 
   abstract @Nullable BigQueryServices.StreamAppendClient 
getStreamAppendClient();
 
@@ -123,7 +123,7 @@ abstract class AppendClientInfo {
   writeStreamService.getStreamAppendClient(
   streamName, getDescriptor(), useConnectionPool, 
missingValueInterpretation);
 
-  activeConnections.inc();
+  activeStreamAppendClients.inc();
 
   return 
toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
 }
@@ -133,7 +133,7 @@ abstract class AppendClientInfo {
 BigQueryServices.StreamAppendClient client = getStreamAppendClient();
 if (client != null) {
   getCloseAppendClient().accept(client);
-  activeConnections.dec();
+  activeStreamAppendClients.dec();
 }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index cd1fc6d3842..ba76f483f77 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -109,6 +109,28 @@ public interface BigQueryOptions
 
   void setNumStorageWriteApiStreamAppendClients(Integer value);
 
+  @Description(
+  "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing 
(ie. useStorageApiConnectionPool=true), "
+  + "this option sets the minimum number of connections each pool 
creates before any connections are shared. This is "
+

(beam) branch release-2.58.0 updated: add doc warning against using icebergio directly (#31834)

2024-07-10 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch release-2.58.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.58.0 by this push:
 new 6e6f28027bc add doc warning against using icebergio directly (#31834)
6e6f28027bc is described below

commit 6e6f28027bcd875733f20ee1be303815d0227eb7
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Jul 10 14:33:25 2024 -0400

add doc warning against using icebergio directly (#31834)
---
 .../src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java   | 8 
 1 file changed, 8 insertions(+)

diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 75a35e6f8a3..50e0ea8b63d 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -22,6 +22,7 @@ import static 
org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import com.google.auto.value.AutoValue;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
@@ -33,6 +34,13 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
+/**
+ * The underlying Iceberg connector used by {@link 
org.apache.beam.sdk.managed.Managed#ICEBERG}. Not
+ * intended to be used directly.
+ *
+ * For internal use only; no backwards compatibility guarantees
+ */
+@Internal
 public class IcebergIO {
 
   public static WriteRows writeRows(IcebergCatalogConfig catalog) {



(beam) branch master updated: add doc warning against using icebergio directly (#31833)

2024-07-10 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new dd0912460c4 add doc warning against using icebergio directly (#31833)
dd0912460c4 is described below

commit dd0912460c4e106cc27ef4547d00ace07235431e
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Jul 10 14:01:39 2024 -0400

add doc warning against using icebergio directly (#31833)
---
 .../src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java   | 8 
 1 file changed, 8 insertions(+)

diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 75a35e6f8a3..50e0ea8b63d 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -22,6 +22,7 @@ import static 
org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import com.google.auto.value.AutoValue;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
@@ -33,6 +34,13 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
+/**
+ * The underlying Iceberg connector used by {@link 
org.apache.beam.sdk.managed.Managed#ICEBERG}. Not
+ * intended to be used directly.
+ *
+ * For internal use only; no backwards compatibility guarantees
+ */
+@Internal
 public class IcebergIO {
 
   public static WriteRows writeRows(IcebergCatalogConfig catalog) {



(beam) branch master updated: Pass-through IcebergIO catalog properties (#31726)

2024-07-08 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ac423af5699 Pass-through IcebergIO catalog properties (#31726)
ac423af5699 is described below

commit ac423af5699682c5519ae8d1a3af035ba7a5eab7
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Jul 8 13:15:03 2024 -0400

Pass-through IcebergIO catalog properties (#31726)

* Pass-through iceberg catalog properties

* add to CHANGES.md

* trigger integration test

* remove custom configuration; pass catalog name
---
 .../IO_Iceberg_Integration_Tests.json  |   3 +-
 CHANGES.md |   2 +
 .../beam/sdk/io/iceberg/IcebergCatalogConfig.java  | 197 +
 .../IcebergReadSchemaTransformProvider.java|  36 ++--
 .../IcebergSchemaTransformCatalogConfig.java   | 107 ---
 .../IcebergWriteSchemaTransformProvider.java   |  42 ++---
 .../apache/beam/sdk/io/iceberg/IcebergIOIT.java|  16 +-
 .../beam/sdk/io/iceberg/IcebergIOReadTest.java |  11 +-
 .../beam/sdk/io/iceberg/IcebergIOWriteTest.java|  31 ++--
 .../IcebergReadSchemaTransformProviderTest.java|  34 ++--
 .../IcebergSchemaTransformTranslationTest.java |  49 +++--
 .../IcebergWriteSchemaTransformProviderTest.java   |  34 ++--
 .../apache/beam/sdk/io/iceberg/ScanSourceTest.java |  28 ++-
 13 files changed, 152 insertions(+), 438 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index a03c067d2c4..1efc8e9e440 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,3 +1,4 @@
 {
-"comment": "Modify this file in a trivial way to cause this test suite to 
run"
+"comment": "Modify this file in a trivial way to cause this test suite to 
run",
+"modification": 1
 }
diff --git a/CHANGES.md b/CHANGES.md
index 0a620038f11..85f1be48cfb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,12 +67,14 @@
 ## New Features / Improvements
 
 * Multiple RunInference instances can now share the same model instance by 
setting the model_identifier parameter (Python) 
([#31665](https://github.com/apache/beam/issues/31665)).
+* [IcebergIO] All specified catalog properties are passed through to the 
connector ([#31726](https://github.com/apache/beam/pull/31726))
 * Removed a 3rd party LGPL dependency from the Go SDK 
([#31765](https://github.com/apache/beam/issues/31765)).
 * Support for MapState and SetState when using Dataflow Runner v1 with 
Streaming Engine (Java) 
([[#18200](https://github.com/apache/beam/issues/18200)])
 
 ## Breaking Changes
 
 * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog 
properties in a key-store fashion 
([#31726](https://github.com/apache/beam/pull/31726))
 
 ## Deprecations
 
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
index fefef4aa491..2956d75a266 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
@@ -19,214 +19,35 @@ package org.apache.beam.sdk.io.iceberg;
 
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
-import javax.annotation.Nullable;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import java.util.Properties;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
 import org.checkerframework.dataflow.qual.Pure;
 
 @AutoValue
 public abstract class IcebergCatalogConfig implements Serializable {
-
-  @Pure
-  public abstract String getName();
-
-  /* Core Properties */
-  @Pure
-  public abstract @Nullable String getIcebergCatalogType();
-
-  @Pure
-  public abstract @Nullable String getCatalogImplementation();
-
-  @Pure
-  public abstract @Nullable String getFileIOImplementation();
-
-  @Pure
-  public abstract @Nullable String getWarehouseLocation();
-
-  @Pure
-  public abstract @Nullable String getMetricsReporterImplementation();
-
-  /* Caching */
-  @Pure
-  public abstract boolean getCacheEnabled();
-
-  @Pure
-  public abstract boolean getCacheCaseSensitive();
-
-  @Pure
-  public abstract long getCacheExpirationIntervalMillis();
-
-  @Pure
-  public 

(beam) branch master updated: Properly close Storage API batch connections (#31710)

2024-06-28 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 20aa916931f Properly close Storage API batch connections (#31710)
20aa916931f is described below

commit 20aa916931f70d3bc75c5a22bc70bed0099c549e
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Fri Jun 28 15:52:06 2024 -0400

Properly close Storage API batch connections (#31710)

* properly close connections; add active connection counter

* only invalidate stream at teardown for PENDING type

* cleanup
---
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java| 19 +--
 .../gcp/bigquery/StorageApiWriteUnshardedRecords.java |  5 +
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index 211027c12b0..5a12e81ea79 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -30,6 +30,8 @@ import com.google.protobuf.Message;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 
 /**
  * Container class used by {@link StorageApiWritesShardedRecords} and {@link
@@ -38,6 +40,9 @@ import javax.annotation.Nullable;
  */
 @AutoValue
 abstract class AppendClientInfo {
+  private final Counter activeConnections =
+  Metrics.counter(AppendClientInfo.class, "activeConnections");
+
   abstract @Nullable BigQueryServices.StreamAppendClient 
getStreamAppendClient();
 
   abstract TableSchema getTableSchema();
@@ -114,12 +119,13 @@ abstract class AppendClientInfo {
   return this;
 } else {
   String streamName = getStreamName.get();
-  return toBuilder()
-  .setStreamName(streamName)
-  .setStreamAppendClient(
-  writeStreamService.getStreamAppendClient(
-  streamName, getDescriptor(), useConnectionPool, 
missingValueInterpretation))
-  .build();
+  BigQueryServices.StreamAppendClient client =
+  writeStreamService.getStreamAppendClient(
+  streamName, getDescriptor(), useConnectionPool, 
missingValueInterpretation);
+
+  activeConnections.inc();
+
+  return 
toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
 }
   }
 
@@ -127,6 +133,7 @@ abstract class AppendClientInfo {
 BigQueryServices.StreamAppendClient client = getStreamAppendClient();
 if (client != null) {
   getCloseAppendClient().accept(client);
+  activeConnections.dec();
 }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 8af88f8f7dc..ce5e7b4854e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -334,6 +334,11 @@ public class StorageApiWriteUnshardedRecords
   if (client != null) {
 runAsyncIgnoreFailure(closeWriterExecutor, client::unpin);
   }
+  // if this is a PENDING stream, we won't be using it again after 
cleaning up this
+  // destination state, so clear it from the cache
+  if (!useDefaultStream) {
+APPEND_CLIENTS.invalidate(streamName);
+  }
   appendClientInfo = null;
 }
   }



(beam) branch master updated: Fix nullable array issue 31674 in AvroGenericRecordToStorageApiProto (#31675)

2024-06-25 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 911c525348b Fix nullable array issue 31674 in 
AvroGenericRecordToStorageApiProto (#31675)
911c525348b is described below

commit 911c525348b81afd48a414018ba579c3b78e3262
Author: codertimu 
AuthorDate: Tue Jun 25 22:30:18 2024 +0200

Fix nullable array issue 31674 in AvroGenericRecordToStorageApiProto 
(#31675)

* Handle nullable array in fieldDescriptorFromAvroField function.

* rename test method.

* apply spotless

* Add test case with null array. Allow null value for union with a null 
type.

-

Co-authored-by: Ahmet Timucin 
---
 .../AvroGenericRecordToStorageApiProto.java|  5 +-
 .../AvroGenericRecordToStorageApiProtoTest.java| 99 ++
 2 files changed, 86 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
index 27e75176e57..0b7e17b8909 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
@@ -277,6 +277,7 @@ public class AvroGenericRecordToStorageApiProto {
 builder =
 builder
 .setType(unionFieldSchema.getType())
+.setMode(unionFieldSchema.getMode())
 .addAllFields(unionFieldSchema.getFieldsList());
 break;
   default:
@@ -311,7 +312,9 @@ public class AvroGenericRecordToStorageApiProto {
   FieldDescriptor fieldDescriptor, Schema.Field avroField, String name, 
GenericRecord record) {
 @Nullable Object value = record.get(name);
 if (value == null) {
-  if (fieldDescriptor.isOptional()) {
+  if (fieldDescriptor.isOptional()
+  || avroField.schema().getTypes().stream()
+  .anyMatch(t -> t.getType() == Schema.Type.NULL)) {
 return null;
   } else {
 throw new IllegalArgumentException(
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
index 6f31831db13..6a59afeed82 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
@@ -31,6 +31,7 @@ import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -268,24 +269,32 @@ public class AvroGenericRecordToStorageApiProtoTest {
   .noDefault()
   .endRecord();
 
-  private static final Schema SCHEMA_WITH_MAP;
+  private static final Schema SCHEMA_WITH_MAP =
+  SchemaBuilder.record("TestMap")
+  .fields()
+  .name("nested")
+  .type()
+  .optional()
+  .type(BASE_SCHEMA)
+  .name("aMap")
+  .type()
+  .map()
+  .values()
+  .stringType()
+  .mapDefault(ImmutableMap.builder().put("key1", 
"value1").build())
+  .endRecord();
 
-  static {
-SCHEMA_WITH_MAP =
-SchemaBuilder.record("TestMap")
-.fields()
-.name("nested")
-.type()
-.optional()
-.type(BASE_SCHEMA)
-.name("aMap")
-.type()
-.map()
-.values()
-.stringType()
-.mapDefault(ImmutableMap.builder().put("key1", 
"value1").build())
-.endRecord();
-  }
+  private static final Schema SCHEMA_WITH_NULLABLE_ARRAY =
+  SchemaBuilder.record("TestNullableArray")
+  .fields()
+  .name("aNullableArray")
+  .type()
+  .nullable()
+  .array()
+  .items()
+  .stringType()
+  .noDefault()
+  .endRecord();
 
   private static GenericRecord baseRecord;
   private static GenericRecord logicalTypesRecord;
@@ -567,4 +576,60 @@ public 

(beam) branch master updated: Replace getElementType with getValueType for MAP in AvroGenericRecord… (#31653)

2024-06-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 0f2e1963987 Replace getElementType with getValueType for MAP in 
AvroGenericRecord… (#31653)
0f2e1963987 is described below

commit 0f2e1963987f1fbb3329016d8c862639ed4fbe43
Author: codertimu 
AuthorDate: Sun Jun 23 04:44:13 2024 +0200

Replace getElementType with getValueType for MAP in AvroGenericRecord… 
(#31653)
---
 .../AvroGenericRecordToStorageApiProto.java| 10 ++--
 .../AvroGenericRecordToStorageApiProtoTest.java| 62 ++
 2 files changed, 66 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
index 519f9391db6..27e75176e57 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
@@ -247,17 +247,15 @@ public class AvroGenericRecordToStorageApiProto {
 break;
   case MAP:
 Schema keyType = Schema.create(Schema.Type.STRING);
-Schema valueType = 
TypeWithNullability.create(schema.getElementType()).getType();
+Schema valueType = Schema.create(schema.getValueType().getType());
 if (valueType == null) {
   throw new RuntimeException("Unexpected null element type!");
 }
 TableFieldSchema keyFieldSchema =
-fieldDescriptorFromAvroField(
-new Schema.Field("key", keyType, "key of the map entry", 
Schema.Field.NULL_VALUE));
+fieldDescriptorFromAvroField(new Schema.Field("key", keyType, "key 
of the map entry"));
 TableFieldSchema valueFieldSchema =
 fieldDescriptorFromAvroField(
-new Schema.Field(
-"value", valueType, "value of the map entry", 
Schema.Field.NULL_VALUE));
+new Schema.Field("value", valueType, "value of the map 
entry"));
 builder =
 builder
 .setType(TableFieldSchema.Type.STRUCT)
@@ -346,7 +344,7 @@ public class AvroGenericRecordToStorageApiProto {
 return toProtoValue(fieldDescriptor, type.getType(), value);
   case MAP:
 Map map = (Map) value;
-Schema valueType = 
TypeWithNullability.create(avroSchema.getElementType()).getType();
+Schema valueType = Schema.create(avroSchema.getValueType().getType());
 if (valueType == null) {
   throw new RuntimeException("Unexpected null element type!");
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
index 3a4dcb02ebd..6f31831db13 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
@@ -32,6 +32,8 @@ import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -266,6 +268,25 @@ public class AvroGenericRecordToStorageApiProtoTest {
   .noDefault()
   .endRecord();
 
+  private static final Schema SCHEMA_WITH_MAP;
+
+  static {
+SCHEMA_WITH_MAP =
+SchemaBuilder.record("TestMap")
+.fields()
+.name("nested")
+.type()
+.optional()
+.type(BASE_SCHEMA)
+.name("aMap")
+.type()
+.map()
+.values()
+.stringType()
+.mapDefault(ImmutableMap.builder().put("key1", 
"value1").build())
+.endRecord();
+  }
+
   private static GenericRecord baseRecord;
   private static GenericRecord logicalTypesRecord;
   private static Map baseProtoExpectedFields;
@@ -505,4 +526,45 @@ public class AvroGenericRecordToStorageApiProtoTest {
 assertEquals(7, msg.getAllFields().size());
 assertBaseRecord(msg, logicalTypesProtoExpectedFields);
   }
+
+  @Test
+  public void testMessageFr

(beam) branch master updated (47736c36a45 -> e5a5ea93e10)

2024-06-14 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 47736c36a45 Migrate Gradle Enterprise Plugin to Develocity Plugin 
(#31577)
 add e5a5ea93e10 Update KafkaIO's docstring to match current implementation 
(#31496)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 40 ++
 1 file changed, 26 insertions(+), 14 deletions(-)



(beam) branch master updated: Add missing import (#31533)

2024-06-06 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 8c6e1a4654b Add missing import (#31533)
8c6e1a4654b is described below

commit 8c6e1a4654bced7bf732e1a46092908580da561b
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Jun 6 09:53:45 2024 -0400

Add missing import (#31533)
---
 .../apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index f0e0fc9a3d0..19c336e1d24 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.utils.YamlUtils;
 import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;



(beam) branch release-2.57.0 updated: fix managed doc (#31521)

2024-06-05 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch release-2.57.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.57.0 by this push:
 new 6cbf144f21b fix managed doc (#31521)
6cbf144f21b is described below

commit 6cbf144f21b46a1b3d181ea340eba8ef2d3d21bc
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Jun 5 19:34:05 2024 -0400

fix managed doc (#31521)
---
 .../managed/src/main/java/org/apache/beam/sdk/managed/Managed.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 6f95290e6ee..911e25cdda1 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -62,7 +62,8 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
  * }
  *
  * Instead of specifying configuration arguments directly in the code, one 
can provide the
- * location to a YAML file that contains this information. Say we have the 
following YAML file:
+ * location to a YAML file that contains this information. Say we have the 
following {@code
+ * config.yaml} file:
  *
  * {@code
  * foo: "abc"
@@ -74,7 +75,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
  * {@code
  * PCollection inputRows = pipeline.apply(Create.of(...));
  *
- * input.apply(Managed.write(ICEBERG).withConfigUrl());
+ * 
inputRows.apply(Managed.write(ICEBERG).withConfigUrl("path/to/config.yaml"));
  * }
  */
 public class Managed {



(beam) branch release-2.57.0 updated: Cherrypick (#31362) kafka schematransform translation (#31518)

2024-06-05 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch release-2.57.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.57.0 by this push:
 new a36857f10a3 Cherrypick (#31362) kafka schematransform translation  
(#31518)
a36857f10a3 is described below

commit a36857f10a3314b7dac1445373a6d98dd3c85a31
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Jun 5 16:37:24 2024 -0400

Cherrypick (#31362) kafka schematransform translation  (#31518)

* kafka schematransform translation and tests

* cleanup

* spotless

* address failing tests

* switch existing schematransform tests to use Managed API

* fix nullness

* add some more mappings

* fix mapping

* typo

* more accurate test name

* cleanup after merging snake_case PR

* spotless
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |   2 +-
 .../KafkaReadSchemaTransformConfiguration.java |   6 +
 .../io/kafka/KafkaReadSchemaTransformProvider.java | 260 +++--
 .../io/kafka/KafkaSchemaTransformTranslation.java  |  93 
 .../kafka/KafkaWriteSchemaTransformProvider.java   |  16 ++
 .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java|  45 ++--
 .../KafkaReadSchemaTransformProviderTest.java  |  49 ++--
 .../kafka/KafkaSchemaTransformTranslationTest.java | 216 +
 8 files changed, 522 insertions(+), 165 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 8f995a63a10..35aabbbfd97 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -2665,7 +2665,7 @@ public class KafkaIO {
   abstract Builder setProducerFactoryFn(
   @Nullable SerializableFunction, Producer> 
fn);
 
-  abstract Builder setKeySerializer(Class> 
serializer);
+  abstract Builder setKeySerializer(@Nullable Class> serializer);
 
   abstract Builder setValueSerializer(Class> 
serializer);
 
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index 13f5249a6c3..693c1371f78 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -149,6 +149,10 @@ public abstract class 
KafkaReadSchemaTransformConfiguration {
   /** Sets the topic from which to read. */
   public abstract String getTopic();
 
+  @SchemaFieldDescription("Upper bound of how long to read from Kafka.")
+  @Nullable
+  public abstract Integer getMaxReadTimeSeconds();
+
   @SchemaFieldDescription("This option specifies whether and where to output 
unwritable rows.")
   @Nullable
   public abstract ErrorHandling getErrorHandling();
@@ -179,6 +183,8 @@ public abstract class KafkaReadSchemaTransformConfiguration 
{
 /** Sets the topic from which to read. */
 public abstract Builder setTopic(String value);
 
+public abstract Builder setMaxReadTimeSeconds(Integer maxReadTimeSeconds);
+
 public abstract Builder setErrorHandling(ErrorHandling errorHandling);
 
 /** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 13240ea9dc4..b2eeb1a54d1 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
 import com.google.auto.service.AutoService;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -38,7 +40,9 @@ import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.S

(beam) branch master updated: fix managed doc (#31517)

2024-06-05 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 4098fce785c fix managed doc (#31517)
4098fce785c is described below

commit 4098fce785c3f3553f1a54ce593224ac3fd117b4
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Jun 5 15:25:03 2024 -0400

fix managed doc (#31517)
---
 .../managed/src/main/java/org/apache/beam/sdk/managed/Managed.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 6f95290e6ee..911e25cdda1 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -62,7 +62,8 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
  * }
  *
  * Instead of specifying configuration arguments directly in the code, one 
can provide the
- * location to a YAML file that contains this information. Say we have the 
following YAML file:
+ * location to a YAML file that contains this information. Say we have the 
following {@code
+ * config.yaml} file:
  *
  * {@code
  * foo: "abc"
@@ -74,7 +75,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
  * {@code
  * PCollection inputRows = pipeline.apply(Create.of(...));
  *
- * input.apply(Managed.write(ICEBERG).withConfigUrl());
+ * 
inputRows.apply(Managed.write(ICEBERG).withConfigUrl("path/to/config.yaml"));
  * }
  */
 public class Managed {



(beam) branch master updated (9c9de4919b5 -> f2931d31246)

2024-06-05 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 9c9de4919b5 Update Go container build version to 1.21.11 (#31515)
 add f2931d31246 Kafka SchemaTransform translation  (#31362)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |   2 +-
 .../KafkaReadSchemaTransformConfiguration.java |   6 +
 .../io/kafka/KafkaReadSchemaTransformProvider.java | 260 +++--
 .../io/kafka/KafkaSchemaTransformTranslation.java} |  45 ++--
 .../kafka/KafkaWriteSchemaTransformProvider.java   |  16 ++
 .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java|  45 ++--
 .../KafkaReadSchemaTransformProviderTest.java  |  48 ++--
 .../kafka/KafkaSchemaTransformTranslationTest.java | 216 +
 8 files changed, 453 insertions(+), 185 deletions(-)
 copy 
sdks/java/io/{iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java
 => 
kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslation.java}
 (60%)
 create mode 100644 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java



(beam) branch master updated: Simplify Managed API to avoid dealing with PCollectionRowTuple (#31470)

2024-06-04 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 7ea8cd2608e Simplify Managed API to avoid dealing with 
PCollectionRowTuple (#31470)
7ea8cd2608e is described below

commit 7ea8cd2608e1b7550ffca44aff5596dd43fd4aa6
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Tue Jun 4 19:02:00 2024 -0400

Simplify Managed API to avoid dealing with PCollectionRowTuple (#31470)

* Managed accepts PInput type

* add unit test

* spotless

* spotless

* rename to getSinglePCollection
---
 .../beam/sdk/values/PCollectionRowTuple.java   | 17 
 .../apache/beam/sdk/io/iceberg/IcebergIOIT.java| 10 ++---
 .../IcebergReadSchemaTransformProviderTest.java|  4 +-
 .../IcebergWriteSchemaTransformProviderTest.java   | 14 +++
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  2 +-
 .../KafkaReadSchemaTransformProviderTest.java  |  4 +-
 .../KafkaWriteSchemaTransformProviderTest.java |  7 +---
 .../java/org/apache/beam/sdk/managed/Managed.java  | 46 +-
 .../sdk/managed/ManagedTransformConstants.java |  3 ++
 .../ManagedSchemaTransformTranslationTest.java |  3 +-
 .../org/apache/beam/sdk/managed/ManagedTest.java   | 34 ++--
 11 files changed, 104 insertions(+), 40 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionRowTuple.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionRowTuple.java
index 0e7c52c4ae7..a2a3aa74e53 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionRowTuple.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionRowTuple.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.PTransform;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -180,6 +181,22 @@ public class PCollectionRowTuple implements PInput, 
POutput {
 return pcollection;
   }
 
+  /**
+   * Like {@link #get(String)}, but is a convenience method to get a single 
PCollection without
+   * providing a tag for that output. Use only when there is a single 
collection in this tuple.
+   *
+   * Throws {@link IllegalStateException} if more than one output exists in 
the {@link
+   * PCollectionRowTuple}.
+   */
+  public PCollection getSinglePCollection() {
+Preconditions.checkState(
+pcollectionMap.size() == 1,
+"Expected exactly one output PCollection, but found %s. "
++ "Please try retrieving a specified output using get() 
instead.",
+pcollectionMap.size());
+return get(pcollectionMap.entrySet().iterator().next().getKey());
+  }
+
   /**
* Returns an immutable Map from tag to corresponding {@link PCollection}, 
for all the members of
* this {@link PCollectionRowTuple}.
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index 06a63909c12..467a2cbaf24 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
@@ -216,11 +215,10 @@ public class IcebergIOIT implements Serializable {
 .build())
 .build();
 
-PCollectionRowTuple output =
-PCollectionRowTuple.empty(readPipeline)
-.apply(Managed.read(Managed.ICEBERG).withConfig(config));
+PCollection rows =
+
readPipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
 
-PAssert.that(output.get("output")).containsInAnyOrder(expectedRows);
+PAssert.that(rows).containsInAnyOrder(expectedRows);
 readPipeline.run().waitUntilFinish();
   }
 
@@ -258,7 +256,7 @@ public class IcebergIOIT implements Serializable {
 .build();
 
 PCollection input = 
writePipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
-PCollectionRowTuple.of("input"

(beam) branch master updated: Support unknown repeated STRUCTs (#31447)

2024-06-04 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 1fea5f797e9 Support unknown repeated STRUCTs (#31447)
1fea5f797e9 is described below

commit 1fea5f797e99f2651fd9b2fe147ce93a25f8a60d
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Tue Jun 4 18:09:04 2024 -0400

Support unknown repeated STRUCTs (#31447)

* cleanup TableRow of unkonwns for structs

* set nested unknown value as an arraylist if it's repeated

* spotless

* add another test
---
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 35 +++---
 .../bigquery/TableRowToStorageApiProtoTest.java| 77 --
 2 files changed, 96 insertions(+), 16 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 07457e72050..c1f452ba93f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -50,6 +50,7 @@ import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.time.format.DateTimeParseException;
 import java.util.AbstractMap;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -484,11 +485,11 @@ public class TableRowToStorageApiProto {
   throws SchemaConversionException {
 DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
 for (final Map.Entry entry : map.entrySet()) {
-  @Nullable
-  FieldDescriptor fieldDescriptor = 
descriptor.findFieldByName(entry.getKey().toLowerCase());
+  String key = entry.getKey().toLowerCase();
+  @Nullable FieldDescriptor fieldDescriptor = 
descriptor.findFieldByName(key);
   if (fieldDescriptor == null) {
 if (unknownFields != null) {
-  unknownFields.set(entry.getKey().toLowerCase(), entry.getValue());
+  unknownFields.set(key, entry.getValue());
 }
 if (ignoreUnknownValues) {
   continue;
@@ -505,12 +506,19 @@ public class TableRowToStorageApiProto {
   schemaInformation.getSchemaForField(entry.getKey());
   try {
 Supplier<@Nullable TableRow> getNestedUnknown =
-() ->
-(unknownFields == null)
-? null
-: (TableRow)
-unknownFields.computeIfAbsent(
-entry.getKey().toLowerCase(), k -> new TableRow());
+() -> {
+  if (unknownFields == null) {
+return null;
+  }
+  TableRow nestedUnknown = new TableRow();
+  if (fieldDescriptor.isRepeated()) {
+((List)
+(unknownFields.computeIfAbsent(key, k -> new 
ArrayList(
+.add(nestedUnknown);
+return nestedUnknown;
+  }
+  return (TableRow) unknownFields.computeIfAbsent(key, k -> 
nestedUnknown);
+};
 
 @Nullable
 Object value =
@@ -524,6 +532,15 @@ public class TableRowToStorageApiProto {
 if (value != null) {
   builder.setField(fieldDescriptor, value);
 }
+// For STRUCT fields, we add a placeholder to unknownFields using the 
getNestedUnknown
+// supplier (in case we encounter unknown nested fields). If the 
placeholder comes out
+// to be empty, we should clean it up
+if 
(fieldSchemaInformation.getType().equals(TableFieldSchema.Type.STRUCT)
+&& unknownFields != null
+&& unknownFields.get(key) instanceof Map
+&& ((Map) unknownFields.get(key)).isEmpty()) {
+  unknownFields.remove(key);
+}
   } catch (Exception e) {
 throw new SchemaDoesntMatchException(
 "Problem converting field "
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 847ede9df36..6a98dad55a6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -40,6 +40,8 @@ impor

(beam) branch master updated (a7f5898f885 -> b4b1cc05b5d)

2024-06-04 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from a7f5898f885 Default SchemaTransform configs to snake_case (#31374)
 add b4b1cc05b5d Fix iceberg catalog validation (#31349)

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java| 2 +-
 .../src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java   | 1 -
 .../beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java | 3 +--
 .../beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java  | 6 ++
 .../sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java | 3 +--
 5 files changed, 5 insertions(+), 10 deletions(-)



(beam) branch master updated: Default SchemaTransform configs to snake_case (#31374)

2024-06-04 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new a7f5898f885 Default SchemaTransform configs to snake_case (#31374)
a7f5898f885 is described below

commit a7f5898f8850e5d110ccb299830c12a4f0807a73
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Tue Jun 4 16:29:50 2024 -0400

Default SchemaTransform configs to snake_case (#31374)

* default schematransform configs to snake_case

* add to CHANGES.md

* update Go's bigtable wrapper to export snake_case param names

* make more yaml snake_case changes
---
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |  2 +-
 CHANGES.md |  9 +++
 sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go   |  6 +-
 .../transforms/TypedSchemaTransformProvider.java   | 46 ---
 .../TypedSchemaTransformProviderTest.java  |  8 +-
 .../IcebergReadSchemaTransformProvider.java| 12 ---
 .../IcebergWriteSchemaTransformProvider.java   | 11 ---
 .../KafkaReadSchemaTransformProviderTest.java  | 16 ++--
 .../managed/ManagedSchemaTransformProvider.java| 11 ---
 .../sdk/managed/ManagedTransformConstants.java | 21 +-
 .../ManagedSchemaTransformProviderTest.java| 12 +--
 .../ManagedSchemaTransformTranslationTest.java |  6 +-
 .../org/apache/beam/sdk/managed/ManagedTest.java   |  2 +-
 .../managed/src/test/resources/test_config.yaml|  4 +-
 sdks/python/apache_beam/io/gcp/bigquery.py | 14 ++--
 sdks/python/apache_beam/io/gcp/bigtableio.py   | 12 +--
 .../transforms/external_transform_provider.py  | 35 +
 .../external_transform_provider_it_test.py | 22 --
 sdks/python/apache_beam/yaml/standard_io.yaml  | 88 +++---
 .../apache_beam/yaml/standard_providers.yaml   |  8 +-
 sdks/python/apache_beam/yaml/yaml_provider.py  |  2 +-
 sdks/python/gen_xlang_wrappers.py  | 21 +-
 22 files changed, 141 insertions(+), 227 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index b268323..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 2
+  "modification": 1
 }
diff --git a/CHANGES.md b/CHANGES.md
index 91bdfef6916..1aee8283bcb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -88,6 +88,15 @@
   This new implementation still supports all (immutable) List methods as 
before,
   but some of the random access methods like get() and size() will be slower.
   To use the old implementation one can use View.asList().withRandomAccess().
+* SchemaTransforms implemented with TypedSchemaTransformProvider now produce a
+  configuration Schema with snake_case naming convention
+  ([#31374](https://github.com/apache/beam/pull/31374)). This will make the 
following
+  cases problematic:
+  * Running a pre-2.57.0 remote SDK pipeline containing a 2.57.0+ Java 
SchemaTransform,
+and vice versa:
+  * Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java 
SchemaTransform
+  * All direct uses of Python's 
[SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
+should be updated to use new snake_case parameter names.
 
 ## Deprecations
 
diff --git a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go 
b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
index 5b6d7d91631..81df24223ca 100644
--- a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
+++ b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
@@ -62,9 +62,9 @@ import (
 )
 
 type bigtableConfig struct {
-   InstanceId string `beam:"instanceId"`
-   ProjectId  string `beam:"projectId"`
-   TableIdstring `beam:"tableId"`
+   InstanceId string `beam:"instance_id"`
+   ProjectId  string `beam:"project_id"`
+   TableIdstring `beam:"table_id"`
 }
 
 // Cell represents a single cell in a Bigtable row.
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index d5c6c724c6f..d9b49dd3ca2 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms

(beam) branch master updated: [ManagedIO] pass underlying transform URN as an annotation (#31398)

2024-05-30 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new b50ad0fe8fc [ManagedIO] pass underlying transform URN as an annotation 
(#31398)
b50ad0fe8fc is described below

commit b50ad0fe8fc168eaded62efb08f19cf2aea341e2
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu May 30 17:51:07 2024 -0400

[ManagedIO] pass underlying transform URN as an annotation (#31398)

* pass underlying transform URN to annotation

* move annotation keys to proto

* address comments: add descriptions for annotation enums; fail when 
missing transform_identifier; add unit tests for annotations
---
 .../model/pipeline/v1/external_transforms.proto| 13 +++
 .../beam/sdk/util/construction/BeamUrns.java   |  5 +++
 .../util/construction/PTransformTranslation.java   | 37 ++--
 .../sdk/util/construction/TransformUpgrader.java   |  6 ++--
 .../ManagedSchemaTransformTranslationTest.java | 40 --
 5 files changed, 86 insertions(+), 15 deletions(-)

diff --git 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index aa9e70c7a87..429371e1105 100644
--- 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++ 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -111,6 +111,19 @@ message BuilderMethod {
   bytes payload = 3;
 }
 
+message Annotations {
+  enum Enum {
+// The annotation key for the encoded configuration Row used to build a 
transform
+CONFIG_ROW_KEY = 0 [(org.apache.beam.model.pipeline.v1.beam_constant) = 
"config_row"];
+// The annotation key for the configuration Schema used to decode the 
configuration Row
+CONFIG_ROW_SCHEMA_KEY = 1 
[(org.apache.beam.model.pipeline.v1.beam_constant) = "config_row_schema"];
+// If ths transform is a SchemaTransform, this is the annotation key for 
the SchemaTransform's URN
+SCHEMATRANSFORM_URN_KEY = 2 
[(org.apache.beam.model.pipeline.v1.beam_constant) = "schematransform_urn"];
+// If the transform is a ManagedSchemaTransform, this is the annotation 
key for the underlying SchemaTransform's URN
+MANAGED_UNDERLYING_TRANSFORM_URN_KEY = 3 
[(org.apache.beam.model.pipeline.v1.beam_constant) = 
"managed_underlying_transform_urn"];
+  }
+}
+
 // Payload for a Schema-aware PTransform.
 // This is a transform that is aware of its input and output PCollection 
schemas
 // and is configured using Beam Schema-compatible parameters.
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java
index 05bb2b0e0a0..f0493de3696 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java
@@ -26,4 +26,9 @@ public class BeamUrns {
   public static String getUrn(ProtocolMessageEnum value) {
 return 
value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn);
   }
+
+  /** Returns the constant value of a given enum annotated with 
[(beam_constant)]. */
+  public static String getConstant(ProtocolMessageEnum value) {
+return 
value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant);
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
index 5dc84897d38..e2b6d95057f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.util.construction;
 
+import static org.apache.beam.model.pipeline.v1.ExternalTransforms.Annotations;
 import static 
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
 import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
@@ -43,6 +44,7 @@ import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaTranslation;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam

(beam) branch master updated (93cc6a521ee -> 791ead6a05b)

2024-05-28 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 93cc6a521ee add PR trigger files (#31424)
 add 791ead6a05b add pull_request_target event (#31426)

No new revisions were added by this update.

Summary of changes:
 .github/workflows/IO_Iceberg_Integration_Tests.yml | 1 +
 .github/workflows/IO_Iceberg_Performance_Tests.yml | 1 +
 2 files changed, 2 insertions(+)



(beam) branch master updated: add PR trigger files (#31424)

2024-05-28 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 93cc6a521ee add PR trigger files (#31424)
93cc6a521ee is described below

commit 93cc6a521eeabadfb0cb06abf1dcdc41588d08cf
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Tue May 28 16:23:00 2024 -0400

add PR trigger files (#31424)
---
 .github/workflows/IO_Iceberg_Integration_Tests.yml | 2 ++
 .github/workflows/IO_Iceberg_Performance_Tests.yml | 2 ++
 2 files changed, 4 insertions(+)

diff --git a/.github/workflows/IO_Iceberg_Integration_Tests.yml 
b/.github/workflows/IO_Iceberg_Integration_Tests.yml
index ab055c3a609..006cd9d13be 100644
--- a/.github/workflows/IO_Iceberg_Integration_Tests.yml
+++ b/.github/workflows/IO_Iceberg_Integration_Tests.yml
@@ -18,6 +18,8 @@ name: IcebergIO Integration Tests
 on:
   schedule:
 - cron: '15 4/6 * * *'
+  pull_request_target:
+paths: [ 'release/trigger_all_tests.json', 
'.github/trigger_files/IO_Iceberg_Integration_Tests.json' ]
   workflow_dispatch:
 
 # Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event
diff --git a/.github/workflows/IO_Iceberg_Performance_Tests.yml 
b/.github/workflows/IO_Iceberg_Performance_Tests.yml
index 27d93b8a40d..e9920a5a138 100644
--- a/.github/workflows/IO_Iceberg_Performance_Tests.yml
+++ b/.github/workflows/IO_Iceberg_Performance_Tests.yml
@@ -18,6 +18,8 @@ name: IcebergIO Performance Tests
 on:
   schedule:
 - cron: '10 10/12 * * *'
+  pull_request_target:
+paths: [ '.github/trigger_files/IO_Iceberg_Performance_Tests.json' ]
   workflow_dispatch:
 
 #Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event



(beam) branch master updated (e764fc9c17d -> 37b8c8a87b8)

2024-05-28 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from e764fc9c17d remove unused field (#31293)
 add 37b8c8a87b8 Add Iceberg workflows (#31401)

No new revisions were added by this update.

Summary of changes:
 ...etrics.yml => IO_Iceberg_Integration_Tests.yml} | 36 --
 ...etrics.yml => IO_Iceberg_Performance_Tests.yml} | 34 ++--
 .github/workflows/IO_Iceberg_Unit_Tests.yml|  2 +-
 sdks/java/io/iceberg/build.gradle  | 24 ++-
 .../apache/beam/sdk/io/iceberg/IcebergIOIT.java|  1 +
 5 files changed, 62 insertions(+), 35 deletions(-)
 copy .github/workflows/{beam_Prober_CommunityMetrics.yml => 
IO_Iceberg_Integration_Tests.yml} (74%)
 copy .github/workflows/{beam_Prober_CommunityMetrics.yml => 
IO_Iceberg_Performance_Tests.yml} (78%)



(beam) branch iceberg_lt_fork updated (46c14b0123f -> 666f8fd11cc)

2024-05-24 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch iceberg_lt_fork
in repository https://gitbox.apache.org/repos/asf/beam.git


from 46c14b0123f ignore
 add 666f8fd11cc trigger integration test

No new revisions were added by this update.

Summary of changes:
 ..._Python_Xlang_IO_Dataflow.json => IO_Iceberg_Integration_Tests.json} | 0
 .github/workflows/IO_Iceberg_Integration_Tests.yml  | 2 ++
 2 files changed, 2 insertions(+)
 copy .github/trigger_files/{beam_PostCommit_Python_Xlang_IO_Dataflow.json => 
IO_Iceberg_Integration_Tests.json} (100%)



(beam) branch iceberg_lt_fork created (now 46c14b0123f)

2024-05-24 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch iceberg_lt_fork
in repository https://gitbox.apache.org/repos/asf/beam.git


  at 46c14b0123f ignore

No new revisions were added by this update.



(beam) branch master updated (084f23b2a8d -> 743e34e0098)

2024-05-16 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 084f23b2a8d Install Beam from wheels in Dependency Compat Test Suite. 
(#31308)
 add 743e34e0098 Fix iceberg unit tests (#31314)

No new revisions were added by this update.

Summary of changes:
 ...Commit_Python_Xlang_Gcp_Direct.json => IO_Iceberg_Unit_Tests.json} | 0
 .github/workflows/IO_Iceberg_Unit_Tests.yml   | 4 ++--
 2 files changed, 2 insertions(+), 2 deletions(-)
 copy .github/trigger_files/{beam_PostCommit_Python_Xlang_Gcp_Direct.json => 
IO_Iceberg_Unit_Tests.json} (100%)



(beam) branch master updated (2196758c20b -> 6cb30cc5c86)

2024-05-15 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 2196758c20b Merge pull request #31270: Reapply "Add Redistribute 
translation to Samza runner"
 add 6cb30cc5c86 setup GCP auth before running tests (#31306)

No new revisions were added by this update.

Summary of changes:
 .github/workflows/IO_Iceberg_Unit_Tests.yml | 6 ++
 1 file changed, 6 insertions(+)



(beam) branch master updated: Support Kafka Managed IO (#31172)

2024-05-09 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 365c2d92965 Support Kafka Managed IO (#31172)
365c2d92965 is described below

commit 365c2d92965c5e23c23d6e1f3c7a1cd048c872d8
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu May 9 17:14:17 2024 -0400

Support Kafka Managed IO (#31172)

* managed kafka read

* managed kafka write
---
 .../apache/beam/sdk/schemas/utils/YamlUtils.java   |  8 +++
 sdks/java/io/kafka/build.gradle|  1 +
 .../io/kafka/KafkaReadSchemaTransformProvider.java |  7 ++-
 .../KafkaReadSchemaTransformProviderTest.java  | 53 ++-
 .../KafkaWriteSchemaTransformProviderTest.java | 59 ++
 sdks/java/managed/build.gradle |  1 +
 .../java/org/apache/beam/sdk/managed/Managed.java  | 14 +++--
 .../managed/ManagedSchemaTransformProvider.java| 30 ++-
 .../sdk/managed/ManagedTransformConstants.java | 52 ++-
 9 files changed, 213 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
index 122f2d1963b..e631e166e8b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.schemas.utils;
 import static org.apache.beam.sdk.values.Row.toRow;
 
 import java.math.BigDecimal;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -181,4 +182,11 @@ public class YamlUtils {
 }
 return new Yaml().dumpAsMap(map);
   }
+
+  public static Map yamlStringToMap(@Nullable String yaml) {
+if (yaml == null || yaml.isEmpty()) {
+  return Collections.emptyMap();
+}
+return new Yaml().load(yaml);
+  }
 }
diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index 269ddb3f5eb..3e095a2bacc 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -90,6 +90,7 @@ dependencies {
   provided library.java.everit_json_schema
   testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
   testImplementation project(":sdks:java:io:synthetic")
+  testImplementation project(":sdks:java:managed")
   testImplementation project(path: ":sdks:java:extensions:avro", 
configuration: "testRuntimeMigration")
   testImplementation project(path: ":sdks:java:extensions:protobuf", 
configuration: "testRuntimeMigration")
   testImplementation project(path: ":sdks:java:io:common", configuration: 
"testRuntimeMigration")
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 2776c388f7c..13240ea9dc4 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -151,11 +151,10 @@ public class KafkaReadSchemaTransformProvider
 }
   };
 }
-
-if (format.equals("RAW")) {
+if ("RAW".equals(format)) {
   beamSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
   valueMapper = getRawBytesToRowFunction(beamSchema);
-} else if (format.equals("PROTO")) {
+} else if ("PROTO".equals(format)) {
   String fileDescriptorPath = configuration.getFileDescriptorPath();
   String messageName = configuration.getMessageName();
   if (fileDescriptorPath != null) {
@@ -165,7 +164,7 @@ public class KafkaReadSchemaTransformProvider
 beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, 
messageName);
 valueMapper = 
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
   }
-} else if (format.equals("JSON")) {
+} else if ("JSON".equals(format)) {
   beamSchema = JsonUtils.beamSchemaFromJsonSchema(inputSchema);
   valueMapper = JsonUtils.getJsonBytesToRowFunction(beamSchema);
 } else {
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index f6e231c758a..d5962a737ba 100644
---

(beam) branch master updated (13708eaedeb -> e0bc8e770a7)

2024-05-09 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 13708eaedeb Merge pull request #31229: Turn off abandoned node 
enforcement in some Reshuffle unit tests where it is not needed
 add e0bc8e770a7 Add IcebergIO integration tests (#31220)

No new revisions were added by this update.

Summary of changes:
 ...n => beam_PostCommit_Java_Hadoop_Versions.json} |   2 +-
 .../{IO_Iceberg.yml => IO_Iceberg_Unit_Tests.yml}  |  16 +-
 build.gradle.kts   |   1 +
 sdks/java/io/iceberg/build.gradle  |  26 +-
 .../sdk/io/iceberg/SchemaAndRowConversions.java|  29 ++-
 .../apache/beam/sdk/io/iceberg/IcebergIOIT.java| 289 +
 .../io/iceberg/SchemaAndRowConversionsTest.java|  42 +++
 7 files changed, 386 insertions(+), 19 deletions(-)
 copy .github/trigger_files/{beam_PostCommit_Java_ValidatesRunner_Dataflow.json 
=> beam_PostCommit_Java_Hadoop_Versions.json} (97%)
 rename .github/workflows/{IO_Iceberg.yml => IO_Iceberg_Unit_Tests.yml} (89%)
 create mode 100644 
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java



(beam) branch master updated: Revert global snake_case convention for SchemaTransforms (#31109)

2024-04-25 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new b33a8438ad3 Revert global snake_case convention for SchemaTransforms 
(#31109)
b33a8438ad3 is described below

commit b33a8438ad335b75feec0c5c97e9a728795fc6ff
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Apr 25 22:01:15 2024 -0400

Revert global snake_case convention for SchemaTransforms (#31109)

* revert global snake_case convention and make it a special case for 
iceberg and managed

* remove docs and comments too

* cleanup

* revert python and yaml changes too

* fix test
---
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |  2 +-
 .../transforms/TypedSchemaTransformProvider.java   | 36 +-
 .../TypedSchemaTransformProviderTest.java  |  8 ++---
 .../IcebergReadSchemaTransformProvider.java| 12 
 .../IcebergWriteSchemaTransformProvider.java   | 11 +++
 .../KafkaReadSchemaTransformProviderTest.java  | 16 +-
 .../managed/ManagedSchemaTransformProvider.java| 12 +++-
 .../ManagedSchemaTransformProviderTest.java| 12 
 .../ManagedSchemaTransformTranslationTest.java |  6 ++--
 .../org/apache/beam/sdk/managed/ManagedTest.java   |  2 +-
 .../managed/src/test/resources/test_config.yaml|  4 +--
 sdks/python/apache_beam/io/gcp/bigquery.py | 14 -
 sdks/python/apache_beam/io/gcp/bigtableio.py   | 12 
 .../transforms/external_transform_provider.py  | 35 +++--
 .../external_transform_provider_it_test.py | 22 +
 sdks/python/apache_beam/yaml/yaml_provider.py  |  2 +-
 sdks/python/gen_xlang_wrappers.py  | 21 -
 17 files changed, 155 insertions(+), 72 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index e3d6056a5de..b268323 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 1
+  "modification": 2
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index cfd298ae87e..d5c6c724c6f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -17,10 +17,8 @@
  */
 package org.apache.beam.sdk.schemas.transforms;
 
-import static 
org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import java.lang.reflect.ParameterizedType;
 import java.util.List;
@@ -28,10 +26,8 @@ import java.util.Optional;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaProvider;
 import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.values.Row;
 
@@ -45,9 +41,6 @@ import org.apache.beam.sdk.values.Row;
  * {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be 
used produce a {@link
  * SchemaTransform} using {@link #from(Row)}, as long as the Row fits the 
configuration Schema.
  *
- * NOTE: The inferred field names in the configuration {@link Schema} and 
{@link Row} follow the
- * {@code snake_case} naming convention.
- *
  * Internal only: This interface is actively being worked on and it 
will likely change as
  * we provide implementations for more standard Beam transforms. We provide no 
backwards
  * compatibility guarantees and it should not be implemented outside of the 
Beam repository.
@@ -85,11 +78,10 @@ public abstract class TypedSchemaTransformProvider 
implements SchemaTra
   }
 
   @Override
-  public final Schema configurationSchema() {
+  public Schema configurationSchema() {
 try {
   // Sort the fields by name to ensure a consistent 

(beam) branch master updated (485c5198384 -> 45e78572e8f)

2024-04-24 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 485c5198384 Merge pull request #31086: Simplify intermediate data in 
Iceberg sink; use manifest files
 add 45e78572e8f python sdk: fix several bugs regarding avto <-> beam 
schema conversion (#30770)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/avroio.py  | 132 ++
 sdks/python/apache_beam/io/avroio_test.py |  84 ++-
 2 files changed, 198 insertions(+), 18 deletions(-)



(beam) branch master updated (a0dad088980 -> 37609ba70fa)

2024-04-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from a0dad088980 Bump golang.org/x/net from 0.22.0 to 0.24.0 in /sdks 
(#31065)
 add 37609ba70fa Managed Transform protos & translation; Iceberg 
SchemaTransforms & translation (#30910)

No new revisions were added by this update.

Summary of changes:
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |   3 +-
 .../beam_PostCommit_Python_Xlang_IO_Dataflow.json  |   3 +-
 .../beam_PreCommit_Xlang_Generated_Transforms.yml  |   2 +-
 .../model/pipeline/v1/external_transforms.proto|   4 +
 .../java/org/apache/beam/sdk/schemas/Schema.java   |  54 -
 .../apache/beam/sdk/schemas/SchemaRegistry.java|  23 ++
 .../sdk/schemas/annotations/DefaultSchema.java |  18 ++
 .../transforms/SchemaTransformTranslation.java |  79 +++
 .../transforms/TypedSchemaTransformProvider.java   |  35 ++-
 .../apache/beam/sdk/schemas/utils/YamlUtils.java   |  21 +-
 .../util/construction/PTransformTranslation.java   |  11 +
 .../main/java/org/apache/beam/sdk/values/Row.java  |  49 
 .../beam/sdk/schemas/SchemaRegistryTest.java   |  49 
 .../org/apache/beam/sdk/schemas/SchemaTest.java| 100 +
 .../TypedSchemaTransformProviderTest.java  |  24 +-
 .../org/apache/beam/sdk/util/YamlUtilsTest.java|  33 +++
 .../java/org/apache/beam/sdk/values/RowTest.java   |  90 
 .../sdk/expansion/service/ExpansionService.java|  41 +++-
 sdks/java/io/expansion-service/build.gradle|   2 +
 .../FileWriteSchemaTransformProviderTest.java  |  32 +--
 ...ueryExportReadSchemaTransformConfiguration.java |  14 --
 ...FileLoadsWriteSchemaTransformConfiguration.java |  14 --
 ...QueryExportReadSchemaTransformProviderTest.java |  25 +--
 ...yFileLoadsWriteSchemaTransformProviderTest.java |  12 +-
 .../io/gcp/bigquery/BigQueryIOTranslationTest.java |   2 +-
 sdks/java/io/iceberg/build.gradle  |   1 +
 .../java/org/apache/beam/io/iceberg/IcebergIO.java |  93 
 .../{ => sdk}/io/iceberg/AppendFilesToTables.java  |  17 +-
 .../{ => sdk}/io/iceberg/AssignDestinations.java   |   2 +-
 .../{ => sdk}/io/iceberg/DynamicDestinations.java  |   2 +-
 .../beam/{ => sdk}/io/iceberg/FileWriteResult.java |  12 +-
 .../{ => sdk}/io/iceberg/IcebergCatalogConfig.java |   2 +-
 .../{ => sdk}/io/iceberg/IcebergDestination.java   |   2 +-
 .../org/apache/beam/sdk/io/iceberg/IcebergIO.java  | 136 +++
 .../IcebergReadSchemaTransformProvider.java| 134 +++
 .../{ => sdk}/io/iceberg/IcebergScanConfig.java|   2 +-
 .../IcebergSchemaTransformCatalogConfig.java   | 107 +
 .../iceberg/IcebergSchemaTransformTranslation.java |  88 
 .../io/iceberg/IcebergTableCreateConfig.java   |   2 +-
 .../{ => sdk}/io/iceberg/IcebergWriteResult.java   |  13 +-
 .../IcebergWriteSchemaTransformProvider.java   | 179 +++
 .../io/iceberg/OneTableDynamicDestinations.java|  31 ++-
 .../beam/{ => sdk}/io/iceberg/PropertyBuilder.java |   2 +-
 .../beam/{ => sdk}/io/iceberg/RecordWriter.java|   4 +-
 .../beam/{ => sdk}/io/iceberg/ScanSource.java  |   2 +-
 .../beam/{ => sdk}/io/iceberg/ScanTaskReader.java  |   2 +-
 .../beam/{ => sdk}/io/iceberg/ScanTaskSource.java  |   2 +-
 .../io/iceberg/SchemaAndRowConversions.java|   2 +-
 .../apache/beam/sdk/io/iceberg/SnapshotInfo.java   | 118 ++
 .../io/iceberg/WriteGroupedRowsToFiles.java|   2 +-
 .../{ => sdk}/io/iceberg/WriteToDestinations.java  |   5 +-
 .../io/iceberg/WriteUngroupedRowsToFiles.java  |   2 +-
 .../beam/{ => sdk}/io/iceberg/package-info.java|   2 +-
 .../{ => sdk}/io/iceberg/FileWriteResultTest.java  |  17 +-
 .../{ => sdk}/io/iceberg/IcebergIOReadTest.java|   4 +-
 .../{ => sdk}/io/iceberg/IcebergIOWriteTest.java   |  16 +-
 .../IcebergReadSchemaTransformProviderTest.java| 183 +++
 .../IcebergSchemaTransformTranslationTest.java | 248 +
 .../IcebergWriteSchemaTransformProviderTest.java   | 175 +++
 .../beam/{ => sdk}/io/iceberg/ScanSourceTest.java  |   2 +-
 .../io/iceberg/SchemaAndRowConversionsTest.java|   2 +-
 .../{ => sdk}/io/iceberg/TestDataWarehouse.java|   2 +-
 .../beam/{ => sdk}/io/iceberg/TestFixtures.java|   2 +-
 .../KafkaReadSchemaTransformProviderTest.java  |  16 +-
 sdks/java/managed/build.gradle |   1 -
 .../java/org/apache/beam/sdk/managed/Managed.java  |  39 ++--
 .../managed/ManagedSchemaTransformProvider.java| 119 ++
 .../managed/ManagedSchemaTransformTranslation.java |  59 +
 .../sdk/managed/ManagedTransformConstants.java}|   9 +-
 .../testing}/TestSchemaTransformProvider.java  |   8 +-
 .../beam/sdk/managed/testing}/package-info.jav

(beam) branch release-2.56.0 updated: Cherry picking (#30460) BQ clustering valueprovider (#31039)

2024-04-18 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch release-2.56.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.56.0 by this push:
 new 3c7a01360df Cherry picking (#30460) BQ clustering valueprovider  
(#31039)
3c7a01360df is described below

commit 3c7a01360df3635ae56b7073dfa36e75e7492f61
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Apr 18 11:47:41 2024 -0400

Cherry picking (#30460) BQ clustering valueprovider  (#31039)

* Support clustering with value provider

* remove

* add some documentation

* fix

* address comments

* update test

* spotless

* use serializable json clustering; fix translation

* fall back on super's clustering and time partitioning when needed

* fork based on version 2.56.0

* fix test
---
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  | 22 +++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   | 31 ++
 .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 18 -
 .../gcp/bigquery/DynamicDestinationsHelpers.java   |  8 ++
 .../sdk/io/gcp/bigquery/BigQueryClusteringIT.java  |  5 +++-
 .../sdk/io/gcp/bigquery/BigQueryHelpersTest.java   | 11 
 .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 10 +--
 .../BigQueryTimePartitioningClusteringIT.java  |  1 +
 8 files changed, 85 insertions(+), 21 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index c4ad09ce6ea..8c600cf780a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -17,11 +17,13 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobReference;
@@ -31,6 +33,8 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
 import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigInteger;
@@ -40,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.regex.Matcher;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
@@ -704,6 +709,23 @@ public class BigQueryHelpers {
 }
   }
 
+  static Clustering clusteringFromJsonFields(String jsonStringClustering) {
+JsonElement jsonClustering = JsonParser.parseString(jsonStringClustering);
+
+checkArgument(
+jsonClustering.isJsonArray(),
+"Received an invalid Clustering json string: %s."
++ "Please provide a serialized json array like so: [\"column1\", 
\"column2\"]",
+jsonStringClustering);
+
+List fields =
+jsonClustering.getAsJsonArray().asList().stream()
+.map(JsonElement::getAsString)
+.collect(Collectors.toList());
+
+return new Clustering().setFields(fields);
+  }
+
   static String resolveTempLocation(
   String tempLocationDir, String bigQueryOperationName, String stepUuid) {
 return FileSystems.matchNewResource(tempLocationDir, true)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index a646e1e6247..fce8f1c5d40 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2350,7 +2350,7 @@ public class BigQueryIO {
 
 abstract @Nullable ValueProvider getJsonTimePartitioning();
 
- 

(beam) branch master updated (2eb1a756258 -> 04ff4bdd7fc)

2024-04-18 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 2eb1a756258 [Python] Clean doc related to write data in bigquery.py 
(#30887)
 add 04ff4bdd7fc Support BQ clustering with value provider (#30460)

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  | 22 +++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   | 31 ++
 .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 18 -
 .../gcp/bigquery/DynamicDestinationsHelpers.java   |  8 ++
 .../sdk/io/gcp/bigquery/BigQueryClusteringIT.java  |  5 +++-
 .../sdk/io/gcp/bigquery/BigQueryHelpersTest.java   | 11 
 .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 10 +--
 .../BigQueryTimePartitioningClusteringIT.java  |  1 +
 8 files changed, 85 insertions(+), 21 deletions(-)



(beam) branch master updated: Add config validation to kafka read schema transform (#30625)

2024-04-11 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e33dec69c7c Add config validation to kafka read schema transform 
(#30625)
e33dec69c7c is described below

commit e33dec69c7cfd01c0b827538e1dad8567e3ff95e
Author: Jeff Kinard 
AuthorDate: Thu Apr 11 19:23:26 2024 -0400

Add config validation to kafka read schema transform (#30625)

* Add config validation to kafka read schema transform

Signed-off-by: Jeffrey Kinard 
---
 .../KafkaReadSchemaTransformConfiguration.java | 36 +-
 .../io/kafka/KafkaReadSchemaTransformProvider.java |  2 ++
 .../KafkaReadSchemaTransformProviderTest.java  |  4 +--
 3 files changed, 26 insertions(+), 16 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index d95c49894a2..13f5249a6c3 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.auto.value.AutoValue;
 import java.util.Map;
 import java.util.Set;
@@ -46,11 +49,13 @@ public abstract class KafkaReadSchemaTransformConfiguration 
{
 
   public void validate() {
 final String startOffset = this.getAutoOffsetResetConfig();
-assert startOffset == null || 
VALID_START_OFFSET_VALUES.contains(startOffset)
-: "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES;
+checkArgument(
+startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset),
+"Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES);
 final String dataFormat = this.getFormat();
-assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
-: "Valid data formats are " + VALID_DATA_FORMATS;
+checkArgument(
+dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat),
+"Valid data formats are " + VALID_DATA_FORMATS);
 
 final String inputSchema = this.getSchema();
 final String messageName = this.getMessageName();
@@ -59,20 +64,23 @@ public abstract class KafkaReadSchemaTransformConfiguration 
{
 final String confluentSchemaRegSubject = 
this.getConfluentSchemaRegistrySubject();
 
 if (confluentSchemaRegUrl != null) {
-  assert confluentSchemaRegSubject != null
-  : "To read from Kafka, a schema must be provided directly or though 
Confluent "
-  + "Schema Registry. Make sure you are providing one of these 
parameters.";
+  checkNotNull(
+  confluentSchemaRegSubject,
+  "To read from Kafka, a schema must be provided directly or though 
Confluent "
+  + "Schema Registry. Make sure you are providing one of these 
parameters.");
 } else if (dataFormat != null && dataFormat.equals("RAW")) {
-  assert inputSchema == null : "To read from Kafka in RAW format, you 
can't provide a schema.";
+  checkArgument(
+  inputSchema == null, "To read from Kafka in RAW format, you can't 
provide a schema.");
 } else if (dataFormat != null && dataFormat.equals("JSON")) {
-  assert inputSchema != null : "To read from Kafka in JSON format, you 
must provide a schema.";
+  checkNotNull(inputSchema, "To read from Kafka in JSON format, you must 
provide a schema.");
 } else if (dataFormat != null && dataFormat.equals("PROTO")) {
-  assert messageName != null
-  : "To read from Kafka in PROTO format, messageName must be 
provided.";
-  assert fileDescriptorPath != null || inputSchema != null
-  : "To read from Kafka in PROTO format, fileDescriptorPath or schema 
must be provided.";
+  checkNotNull(
+  messageName, "To read from Kafka in PROTO format, messageName must 
be provided.");
+  checkArgument(
+  fileDescriptorPath != null || inputSchema != null,
+  "To read from Kafka in PROTO format, fileDescriptorPath or schema 
must be provided.");
 } else {
-  assert inputSchema != null : "To read from Kafka in AVRO format, you 
must provide a schema.";
+  checkNotNull(inputSchema, "T

(beam) branch master updated: [Java] ManagedIO (#30808)

2024-04-08 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 54673996c9b [Java] ManagedIO  (#30808)
54673996c9b is described below

commit 54673996c9bf2ee076b04833bbae2729d6cebbaf
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Apr 8 06:55:17 2024 -0400

[Java] ManagedIO  (#30808)

* managed api for java

* yaml utils
---
 build.gradle.kts   |   1 +
 sdks/java/core/build.gradle|   1 +
 .../apache/beam/sdk/schemas/utils/YamlUtils.java   | 171 
 .../org/apache/beam/sdk/util/YamlUtilsTest.java| 228 +
 sdks/java/managed/build.gradle |  37 
 .../java/org/apache/beam/sdk/managed/Managed.java  | 195 ++
 .../managed/ManagedSchemaTransformProvider.java| 183 +
 .../org/apache/beam/sdk/managed/package-info.java  |  20 ++
 .../ManagedSchemaTransformProviderTest.java| 103 ++
 .../org/apache/beam/sdk/managed/ManagedTest.java   | 114 +++
 .../sdk/managed/TestSchemaTransformProvider.java   |  98 +
 .../managed/src/test/resources/test_config.yaml|  21 ++
 settings.gradle.kts|   2 +
 13 files changed, 1174 insertions(+)

diff --git a/build.gradle.kts b/build.gradle.kts
index ded692677b5..9c42ffdc8ce 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -303,6 +303,7 @@ tasks.register("javaPreCommit") {
   dependsOn(":sdks:java:io:synthetic:build")
   dependsOn(":sdks:java:io:xml:build")
   dependsOn(":sdks:java:javadoc:allJavadoc")
+  dependsOn(":sdks:java:managed:build")
   dependsOn(":sdks:java:testing:expansion-service:build")
   dependsOn(":sdks:java:testing:jpms-tests:build")
   dependsOn(":sdks:java:testing:load-tests:build")
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index 438a3fb1806..5a47cb5237e 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -98,6 +98,7 @@ dependencies {
   permitUnusedDeclared 
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
   provided library.java.json_org
   implementation library.java.everit_json_schema
+  implementation "org.yaml:snakeyaml:2.0"
   shadowTest library.java.everit_json_schema
   provided library.java.junit
   testImplementation "com.github.stefanbirkner:system-rules:1.19.0"
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
new file mode 100644
index 000..5c05b2bed39
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static org.apache.beam.sdk.values.Row.toRow;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.yaml.snakeyaml.Yaml;
+
+public class YamlUtils {
+  private static final Map> YAML_VALUE_PARSERS =
+  ImmutableMap
+  .>
+  builder()
+  .put(Schema.TypeName.BYTE, Byte::valueOf)
+  .put(Schema.TypeName.

(beam) branch master updated: Rename tests to remove them from dataflow test suite (#30623)

2024-03-13 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e648fd08726 Rename tests to remove them from dataflow test suite 
(#30623)
e648fd08726 is described below

commit e648fd08726d1d0ccde62603776fa0f886907f0a
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Mar 13 11:42:17 2024 -0400

Rename tests to remove them from dataflow test suite (#30623)
---
 ...oviderIT.java => BigQueryDirectReadSchemaTransformProviderTest.java} | 2 +-
 ...rIT.java => BigQueryStorageWriteApiSchemaTransformProviderTest.java} | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java
similarity index 99%
rename from 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderIT.java
rename to 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java
index 958409eb5e3..2363a870bbd 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java
@@ -74,7 +74,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-public class BigQueryDirectReadSchemaTransformProviderIT {
+public class BigQueryDirectReadSchemaTransformProviderTest {
 
   private static PipelineOptions testOptions = 
TestPipeline.testingPipelineOptions();
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
similarity index 99%
rename from 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderIT.java
rename to 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
index 8b8a3b75949..64ea0b11d1b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -61,7 +61,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-public class BigQueryStorageWriteApiSchemaTransformProviderIT {
+public class BigQueryStorageWriteApiSchemaTransformProviderTest {
 
   private FakeDatasetService fakeDatasetService = new FakeDatasetService();
   private FakeJobService fakeJobService = new FakeJobService();



(beam) branch master updated: Make defaults for optional SchemaTransformProvider methods (#30560)

2024-03-11 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d22a7e783ab Make defaults for optional SchemaTransformProvider methods 
 (#30560)
d22a7e783ab is described below

commit d22a7e783abf445ae7bef2ce26075f9d93b409a7
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Mar 11 17:26:54 2024 -0400

Make defaults for optional SchemaTransformProvider methods  (#30560)

* simplify schematransformprovider
---
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |  1 +
 .../transforms/SchemaTransformProvider.java|  9 ++--
 .../transforms/TypedSchemaTransformProvider.java   | 20 +-
 .../TypedSchemaTransformProviderTest.java  | 24 --
 ...ueryStorageWriteApiSchemaTransformProvider.java |  5 -
 .../BigtableReadSchemaTransformProvider.java   | 10 -
 .../BigtableWriteSchemaTransformProvider.java  | 10 -
 ...gQueryDirectReadSchemaTransformProviderIT.java} |  2 +-
 ...yStorageWriteApiSchemaTransformProviderIT.java} |  2 +-
 9 files changed, 51 insertions(+), 32 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index e69de29bb2d..8b137891791 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -0,0 +1 @@
+
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
index c76d7a25e69..9d0ad61b7a6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas.transforms;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import org.apache.beam.sdk.annotations.Internal;
@@ -58,10 +59,14 @@ public interface SchemaTransformProvider {
   SchemaTransform from(Row configuration);
 
   /** Returns the input collection names of this transform. */
-  List inputCollectionNames();
+  default List inputCollectionNames() {
+return Collections.emptyList();
+  }
 
   /** Returns the output collection names of this transform. */
-  List outputCollectionNames();
+  default List outputCollectionNames() {
+return Collections.emptyList();
+  }
 
   /**
* List the dependencies needed for this transform. Jars from classpath are 
used by default when
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index 1117f59a748..e75fa27d2d1 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -17,8 +17,13 @@
  */
 package org.apache.beam.sdk.schemas.transforms;
 
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.lang.reflect.ParameterizedType;
 import java.util.List;
 import java.util.Optional;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
@@ -39,7 +44,20 @@ import org.apache.beam.sdk.values.Row;
 @Internal
 public abstract class TypedSchemaTransformProvider implements 
SchemaTransformProvider {
 
-  protected abstract Class configurationClass();
+  @SuppressWarnings("unchecked")
+  protected Class configurationClass() {
+@Nullable
+ParameterizedType parameterizedType = (ParameterizedType) 
getClass().getGenericSuperclass();
+checkStateNotNull(
+parameterizedType, "Could not get the TypedSchemaTransformProvider's 
parameterized type.");
+checkArgument(
+parameterizedType.getActualTypeArguments().length == 1,
+String.format(
+"Expected one parameterized type, but got %s.",
+parameterizedType.getActualTypeArguments().length));
+
+return (Class) parameterizedType.getActualTypeArguments()[0];
+  }
 
   /**
* Produce a SchemaTransform from ConfigT. Can throw a {@link 
InvalidConfigurationException} or a
diff --git 
a/sdks/java/core/src/test/java/or

(beam) branch master updated: fix: support reading arrays of structs from bigquery with schemas (#30448)

2024-03-06 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new b6301b52058 fix: support reading arrays of structs from bigquery with 
schemas (#30448)
b6301b52058 is described below

commit b6301b5205800ce3604a751964211d6061b09f02
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Mar 6 09:37:37 2024 -0500

fix: support reading arrays of structs from bigquery with schemas (#30448)
---
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 11 ---
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java| 11 ++-
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index fa5ffae0909..e3ace73ee96 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -69,6 +69,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
@@ -710,14 +711,18 @@ public class BigQueryUtils {
 + fieldType
 + "' because the BigQuery type is a List, while the output 
type is not a collection.");
   }
-  boolean innerTypeIsMap =
-  
fieldType.getCollectionElementType().getTypeName().equals(TypeName.MAP);
+
+  boolean innerTypeIsMap = 
fieldType.getCollectionElementType().getTypeName().isMapType();
 
   return ((List) jsonBQValue)
   .stream()
+  // Old BigQuery client returns arrays as lists of maps {"v": 
}.
+  // If this is the case, unwrap the value first
   .map(
   v ->
-  (!innerTypeIsMap && v instanceof Map)
+  (!innerTypeIsMap
+  && v instanceof Map
+  && ((Map) 
v).keySet().equals(Sets.newHashSet("v")))
   ? ((Map) v).get("v")
   : v)
   .map(v -> 
toBeamValue(field.withType(fieldType.getCollectionElementType()), v))
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index d73ff5e2b71..d8db71b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -441,10 +441,13 @@ public class BigQueryUtilsTest {
   private static final Row ARRAY_ROW_ROW =
   Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) 
Arrays.asList(FLAT_ROW)).build();
 
-  private static final TableRow BQ_ARRAY_ROW_ROW =
+  private static final TableRow BQ_ARRAY_ROW_ROW_V =
   new TableRow()
   .set("rows", Collections.singletonList(Collections.singletonMap("v", 
BQ_FLAT_ROW)));
 
+  private static final TableRow BQ_ARRAY_ROW_ROW =
+  new TableRow().set("rows", Collections.singletonList(BQ_FLAT_ROW));
+
   private static final TableSchema BQ_FLAT_TYPE =
   new TableSchema()
   .setFields(
@@ -943,6 +946,12 @@ public class BigQueryUtilsTest {
 assertEquals(ROW_ROW, beamRow);
   }
 
+  @Test
+  public void testToBeamRow_array_row_v() {
+Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW_V);
+assertEquals(ARRAY_ROW_ROW, beamRow);
+  }
+
   @Test
   public void testToBeamRow_array_row() {
 Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW);



(beam) branch master updated (b776d705997 -> 6a03f9ba062)

2024-03-04 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from b776d705997 Duet AI ML data processing prompts (no links) (#30421)
 add 6a03f9ba062 Update triggering frequency doc (#30457)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java   | 10 +-
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java   |  2 +-
 .../content/en/documentation/io/built-in/google-bigquery.md|  1 +
 3 files changed, 7 insertions(+), 6 deletions(-)



(beam) branch master updated: Generate external transform wrappers using a script (#29834)

2024-02-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 11f9bce485c Generate external transform wrappers using a script 
(#29834)
11f9bce485c is described below

commit 11f9bce485c4f6fe466ff4bf5073d2414e43678c
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Feb 22 16:13:55 2024 -0500

Generate external transform wrappers using a script (#29834)
---
 ...=> beam_PostCommit_Python_Examples_Direct.json} |   0
 .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json |   1 +
 ...> beam_PostCommit_Python_Xlang_Gcp_Direct.json} |   0
 ... beam_PostCommit_Python_Xlang_IO_Dataflow.json} |   0
 .github/workflows/README.md|   1 +
 .../beam_PreCommit_Xlang_Generated_Transforms.yml  | 114 ++
 .gitignore |   1 +
 CHANGES.md |   1 +
 build.gradle.kts   |   5 +
 .../org/apache/beam/gradle/BeamModulePlugin.groovy | 100 ++---
 scripts/ci/release/test/resources/mass_comment.txt |   2 +
 .../GenerateSequenceSchemaTransformProvider.java   |   2 +-
 sdks/python/MANIFEST.in|   1 +
 sdks/python/apache_beam/io/__init__.py |   1 +
 .../io/external/xlang_bigqueryio_it_test.py|  23 +-
 .../apache_beam/io/gcp/bigtableio_it_test.py   |  22 +-
 .../transforms/external_transform_provider.py  |   8 +-
 .../external_transform_provider_it_test.py | 413 +++
 .../transforms/external_transform_provider_test.py | 140 ---
 .../transforms/xlang/__init__.py}  |  15 +-
 sdks/python/build.gradle   |  19 +
 sdks/python/gen_xlang_wrappers.py  | 447 +
 sdks/python/pyproject.toml |   6 +
 sdks/python/pytest.ini |   1 +
 sdks/python/python_xlang_wrapper.template  |  36 ++
 sdks/python/setup.py   |  49 ++-
 sdks/python/test-suites/dataflow/common.gradle |   5 +-
 sdks/python/test-suites/direct/build.gradle|   7 +
 sdks/python/test-suites/direct/common.gradle   |   8 +-
 sdks/python/test-suites/xlang/build.gradle |  56 +--
 sdks/python/tox.ini|   4 +-
 sdks/standard_expansion_services.yaml  |  77 
 sdks/standard_external_transforms.yaml |  52 +++
 33 files changed, 1330 insertions(+), 287 deletions(-)

diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json
similarity index 100%
copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
copy to .github/trigger_files/beam_PostCommit_Python_Examples_Direct.json
diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
index e69de29bb2d..8b137891791 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
@@ -0,0 +1 @@
+
diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
similarity index 100%
copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
copy to .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
similarity index 100%
copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
copy to .github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index f882553ef9b..42efdac 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -271,6 +271,7 @@ PreCommit Jobs run in a schedule and also get triggered in 
a PR if relevant sour
 | [ PreCommit Website 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml) 
| N/A |`Run Website PreCommit`| 
[![.github/workflows/beam_PreCommit_Website.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml?query=event%3Aschedule)
 |
 | [ PreCommit Website Stage GCS 
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml)
 | N/A |`Run Website_Stage_GCS PreCommit`| 
[![.github/workflows/beam_PreCommit_Website_Stage_GCS.yml](https://github.com/apache/beam/actio

(beam) branch master updated: Fix Python postcommits (#30333)

2024-02-15 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 1bbe07e1653 Fix Python postcommits  (#30333)
1bbe07e1653 is described below

commit 1bbe07e1653bc35366dea0c8ebe2399490d3ad50
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Feb 15 20:31:04 2024 -0500

Fix Python postcommits  (#30333)
---
 .github/trigger_files/beam_PostCommit_Python_Examples_Direct.json | 0
 sdks/java/extensions/python/build.gradle  | 2 ++
 2 files changed, 2 insertions(+)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json
new file mode 100644
index 000..e69de29bb2d
diff --git a/sdks/java/extensions/python/build.gradle 
b/sdks/java/extensions/python/build.gradle
index 7c04259b05e..c27251fa445 100644
--- a/sdks/java/extensions/python/build.gradle
+++ b/sdks/java/extensions/python/build.gradle
@@ -21,6 +21,8 @@ applyJavaNature( automaticModuleName: 
'org.apache.beam.sdk.extensions.python')
 
 description = "Apache Beam :: SDKs :: Java :: Extensions :: Python"
 
+evaluationDependsOn(":sdks:java:core")
+
 dependencies {
 implementation library.java.vendored_guava_32_1_2_jre
 implementation library.java.vendored_grpc_1_60_1



(beam) branch master updated (cce70cdd95c -> 9452a29f489)

2024-02-13 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from cce70cdd95c Remove python 3.7 from programming guide (#30302)
 add 9452a29f489 fix: temporarily double idle timeout to workaround a bug 
in watchdog (#30299)

No new revisions were added by this update.

Summary of changes:
 .../apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java  | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)



(beam) branch master updated: Make ReadFromBigQueryRequest id more randomized (#30156)

2024-01-31 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 41dee464db4 Make ReadFromBigQueryRequest id more randomized (#30156)
41dee464db4 is described below

commit 41dee464db458fa72eeab7ddc902b242ebc894eb
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Jan 31 13:07:02 2024 -0500

Make ReadFromBigQueryRequest id more randomized (#30156)

* make ReadFromBigQueryRequest id more randomized
---
 sdks/python/apache_beam/io/gcp/bigquery.py | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index bba8b8a4af7..7648ab4064d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -361,6 +361,7 @@ import itertools
 import json
 import logging
 import random
+import secrets
 import time
 import uuid
 import warnings
@@ -2925,8 +2926,9 @@ class ReadFromBigQueryRequest:
 self.table = table
 self.validate()
 
-# We use this internal object ID to generate BigQuery export directories.
-self.obj_id = random.randint(0, 10)
+# We use this internal object ID to generate BigQuery export directories
+# and to create BigQuery job names
+self.obj_id = '%d_%s' % (int(time.time()), secrets.token_hex(3))
 
   def validate(self):
 if self.table is not None and self.query is not None:



(beam) branch master updated: Fix failing python BQ test (#30099)

2024-01-25 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 3ae851876e2 Fix failing python BQ test (#30099)
3ae851876e2 is described below

commit 3ae851876e22be8ed09c4434c73dd654874e7ae4
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Jan 25 14:03:37 2024 -0500

Fix failing python BQ test (#30099)
---
 .github/trigger_files/beam_PostCommit_Python.json |  0
 sdks/python/apache_beam/io/gcp/bigquery.py|  5 +
 sdks/python/apache_beam/io/gcp/bigquery_file_loads.py |  1 +
 .../python/apache_beam/io/gcp/bigquery_file_loads_test.py | 15 +++
 sdks/python/test-suites/dataflow/common.gradle|  2 +-
 5 files changed, 14 insertions(+), 9 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
new file mode 100644
index 000..e69de29bb2d
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4643c8ddf0a..bba8b8a4af7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1854,6 +1854,7 @@ class WriteToBigQuery(PTransform):
   kms_key=None,
   batch_size=None,
   max_file_size=None,
+  max_partition_size=None,
   max_files_per_bundle=None,
   test_client=None,
   custom_gcs_temp_location=None,
@@ -1934,6 +1935,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
   max_file_size (int): The maximum size for a file to be written and then
 loaded into BigQuery. The default value is 4TB, which is 80% of the
 limit of 5TB for BigQuery to load any file.
+  max_partition_size (int): Maximum byte size for each load job to
+BigQuery. Defaults to 15TB. Applicable to FILE_LOADS only.
   max_files_per_bundle(int): The maximum number of files to be concurrently
 written by a worker. The default here is 20. Larger values will allow
 writing to multiple destinations without having to reshard - but they
@@ -2059,6 +2062,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
 # TODO(pabloem): Consider handling ValueProvider for this location.
 self.custom_gcs_temp_location = custom_gcs_temp_location
 self.max_file_size = max_file_size
+self.max_partition_size = max_partition_size
 self.max_files_per_bundle = max_files_per_bundle
 self.method = method or WriteToBigQuery.Method.DEFAULT
 self.triggering_frequency = triggering_frequency
@@ -2202,6 +2206,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
   with_auto_sharding=self.with_auto_sharding,
   temp_file_format=self._temp_file_format,
   max_file_size=self.max_file_size,
+  max_partition_size=self.max_partition_size,
   max_files_per_bundle=self.max_files_per_bundle,
   custom_gcs_temp_location=self.custom_gcs_temp_location,
   test_client=self.test_client,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 48f2ab4b36b..e1a4af31f1c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -747,6 +747,7 @@ class TriggerLoadJobs(beam.DoFn):
 )
 if not self.bq_io_metadata:
   self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+
 job_reference = self.bq_wrapper.perform_load_job(
 destination=table_reference,
 source_uris=files,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 345c8e70500..0605206714e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -925,9 +925,6 @@ class BigQueryFileLoadsIT(unittest.TestCase):
 create_disposition='CREATE_IF_NEEDED',
 write_disposition='WRITE_APPEND')
 
-# reduce load job size to induce copy jobs
-bqfl._DEFAULT_MAX_FILE_SIZE = 10
-bqfl._MAXIMUM_LOAD_SIZE = 20
 verifiers = [
 BigqueryFullResultMatcher(
 project=self.project,
@@ -949,8 +946,7 @@ class BigQueryFileLoadsIT(unittest.TestCase):
 dest += "_2"
   return dest
 
-args = self.test_pipeline.get_full_options_as_args(
-on_success_matcher=all_of(verifiers))
+args = self.test_pipeline.get_full_options_as_args()
 
 with beam.Pipeline(argv=args) as p:
   # 0...4 going to table 1
@@ -961,7 +957,10 @@ class BigQueryFileLoadsIT(unittest.TestCase):
   p | beam.Create(items) | bigquery

(beam) branch release-2.54.0 updated: rename ExternalSchemaTransform to ExternalTransform (#30114)

2024-01-25 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch release-2.54.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.54.0 by this push:
 new 22ef257784f rename ExternalSchemaTransform to ExternalTransform 
(#30114)
22ef257784f is described below

commit 22ef257784f1259f1c38448f4eb502f9c9233f04
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Jan 25 12:07:00 2024 -0500

rename ExternalSchemaTransform to ExternalTransform (#30114)
---
 ..._provider.py => external_transform_provider.py} | 34 +++---
 ...test.py => external_transform_provider_test.py} | 18 ++--
 2 files changed, 26 insertions(+), 26 deletions(-)

diff --git 
a/sdks/python/apache_beam/transforms/external_schematransform_provider.py 
b/sdks/python/apache_beam/transforms/external_transform_provider.py
similarity index 90%
rename from 
sdks/python/apache_beam/transforms/external_schematransform_provider.py
rename to sdks/python/apache_beam/transforms/external_transform_provider.py
index fd650087893..26cc31471e6 100644
--- a/sdks/python/apache_beam/transforms/external_schematransform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -29,7 +29,7 @@ from apache_beam.transforms.external import 
SchemaTransformsConfig
 from apache_beam.typehints.schemas import named_tuple_to_schema
 from apache_beam.typehints.schemas import typing_from_runner_api
 
-__all__ = ['ExternalSchemaTransform', 'ExternalSchemaTransformProvider']
+__all__ = ['ExternalTransform', 'ExternalTransformProvider']
 
 
 def snake_case_to_upper_camel_case(string):
@@ -84,7 +84,7 @@ def get_config_with_descriptions(
   return fields_with_descriptions
 
 
-class ExternalSchemaTransform(PTransform):
+class ExternalTransform(PTransform):
   """Template for a wrapper class of an external SchemaTransform
 
   This is a superclass for dynamically generated SchemaTransform wrappers and
@@ -93,7 +93,7 @@ class ExternalSchemaTransform(PTransform):
   Experimental; no backwards compatibility guarantees."""
 
   # These attributes need to be set when
-  # creating an ExternalSchemaTransform type
+  # creating an ExternalTransform type
   default_expansion_service = None
   description: str = ""
   identifier: str = ""
@@ -138,21 +138,21 @@ def infer_name_from_identifier(identifier: str, pattern: 
str):
 return ''.join(components)
 
 
-class ExternalSchemaTransformProvider:
+class ExternalTransformProvider:
   """Dynamically discovers Schema-aware external transforms from a given list
   of expansion services and provides them as ready PTransforms.
 
-  A :class:`ExternalSchemaTransform` subclass is generated for each external
+  A :class:`ExternalTransform` subclass is generated for each external
   transform, and is named based on what can be inferred from the URN
   (see :param urn_pattern).
 
-  These classes are generated when :class:`ExternalSchemaTransformProvider` is
+  These classes are generated when :class:`ExternalTransformProvider` is
   initialized. We need to give it one or more expansion service addresses that
   are already up and running:
-  >>> provider = ExternalSchemaTransformProvider(["localhost:12345",
+  >>> provider = ExternalTransformProvider(["localhost:12345",
   ... "localhost:12121"])
   We can also give it the gradle target of a standard Beam expansion service:
-  >>> provider = ExternalSchemaTransform(BeamJarExpansionService(
+  >>> provider = ExternalTransform(BeamJarExpansionService(
   ... "sdks:java:io:google-cloud-platform:expansion-service:shadowJar"))
   Let's take a look at the output of :func:`get_available()` to know the
   available transforms in the expansion service(s) we provided:
@@ -162,7 +162,7 @@ class ExternalSchemaTransformProvider:
   ...]
 
   Then retrieve a transform by :func:`get()`, :func:`get_urn()`, or by directly
-  accessing it as an attribute of :class:`ExternalSchemaTransformProvider`.
+  accessing it as an attribute of :class:`ExternalTransformProvider`.
   All of the following commands do the same thing:
   >>> provider.get('BigqueryStorageRead')
   >>> provider.get_urn(
@@ -194,7 +194,7 @@ class ExternalSchemaTransformProvider:
   Experimental; no backwards compatibility guarantees.
   """
   def __init__(self, expansion_services, urn_pattern=STANDARD_URN_PATTERN):
-f"""Initialize an ExternalSchemaTransformProvider
+f"""Initialize an ExternalTransformProvider
 
 :param expansion_services:
   A list of expansion services to discover transforms from.
@@ -207,7 +207,7 @@ class ExternalSchemaTransformProvider:
   By d

(beam) branch master updated (d62ae0144cc -> 61a62e19c4a)

2024-01-24 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from d62ae0144cc Add section for 2.55.0, cleanup 2.54.0 due to cut. (#30101)
 add 61a62e19c4a rename ExternalSchemaTransform to ExternalTransform 
(#30102)

No new revisions were added by this update.

Summary of changes:
 ..._provider.py => external_transform_provider.py} | 34 +++---
 ...test.py => external_transform_provider_test.py} | 18 ++--
 2 files changed, 26 insertions(+), 26 deletions(-)
 rename 
sdks/python/apache_beam/transforms/{external_schematransform_provider.py => 
external_transform_provider.py} (90%)
 rename 
sdks/python/apache_beam/transforms/{external_schematransform_provider_test.py 
=> external_transform_provider_test.py} (87%)



(beam) branch master updated (e85d070a32b -> 27214149d34)

2024-01-24 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from e85d070a32b Ignore DicomMetadataRead tests (#30096)
 add 27214149d34 Support dynamic destinations with Python Storage API 
(#30045)

No new revisions were added by this update.

Summary of changes:
 .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json |   0
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |   1 +
 CHANGES.md |   1 +
 ...ueryStorageWriteApiSchemaTransformProvider.java |  59 -
 ...StorageWriteApiSchemaTransformProviderTest.java |  54 +++-
 .../io/external/xlang_bigqueryio_it_test.py|  77 +-
 sdks/python/apache_beam/io/gcp/bigquery.py | 293 -
 7 files changed, 334 insertions(+), 151 deletions(-)
 copy 
learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/go-example/myfile.txt
 => .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json (100%)
 create mode 100644 
.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json



(beam) branch master updated: [Python BQ] Substitute final destination schema when no input schema is specified (#30015)

2024-01-19 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new b4be68e2404 [Python BQ] Substitute final destination schema when no 
input schema is specified (#30015)
b4be68e2404 is described below

commit b4be68e2404e8f87c545f81800a7e83cd9c77df7
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Fri Jan 19 16:25:27 2024 -0500

[Python BQ] Substitute final destination schema when no input schema is 
specified (#30015)

* substitute final destination's schema; tests

* use cache and only fetch when necessary
---
 .../apache_beam/io/gcp/bigquery_file_loads.py  | 24 +
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 62 ++
 2 files changed, 86 insertions(+)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 453cd27dfda..48f2ab4b36b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -656,6 +656,7 @@ class TriggerLoadJobs(beam.DoFn):
 if not self.bq_io_metadata:
   self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
 self.pending_jobs = []
+self.schema_cache = {}
 
   def process(
   self,
@@ -703,6 +704,29 @@ class TriggerLoadJobs(beam.DoFn):
 
 create_disposition = self.create_disposition
 if self.temporary_tables:
+  # we need to create temp tables, so we need a schema.
+  # if there is no input schema, fetch the destination table's schema
+  if schema is None:
+hashed_dest = bigquery_tools.get_hashable_destination(table_reference)
+if hashed_dest in self.schema_cache:
+  schema = self.schema_cache[hashed_dest]
+else:
+  try:
+schema = bigquery_tools.table_schema_to_dict(
+bigquery_tools.BigQueryWrapper().get_table(
+project_id=table_reference.projectId,
+dataset_id=table_reference.datasetId,
+table_id=table_reference.tableId).schema)
+self.schema_cache[hashed_dest] = schema
+  except Exception as e:
+_LOGGER.warning(
+"Input schema is absent and could not fetch the final "
+"destination table's schema [%s]. Creating temp table [%s] "
+"will likely fail: %s",
+hashed_dest,
+job_name,
+e)
+
   # If we are using temporary tables, then we must always create the
   # temporary tables, so we replace the create_disposition.
   create_disposition = 'CREATE_IF_NEEDED'
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index edd92f21e73..345c8e70500 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -903,6 +903,68 @@ class BigQueryFileLoadsIT(unittest.TestCase):
 _LOGGER.info(
 "Created dataset %s in project %s", self.dataset_id, self.project)
 
+  @pytest.mark.it_postcommit
+  def test_batch_copy_jobs_with_no_input_schema(self):
+schema_1 = "col_1:INTEGER"
+schema_2 = "col_2:INTEGER"
+
+# create two tables with different schemas
+# test to make sure this works with dynamic destinations too
+self.bigquery_client.get_or_create_table(
+project_id=self.project,
+dataset_id=self.dataset_id,
+table_id="output_table_1",
+schema=bigquery_tools.get_table_schema_from_string(schema_1),
+create_disposition='CREATE_IF_NEEDED',
+write_disposition='WRITE_APPEND')
+self.bigquery_client.get_or_create_table(
+project_id=self.project,
+dataset_id=self.dataset_id,
+table_id="output_table_2",
+schema=bigquery_tools.get_table_schema_from_string(schema_2),
+create_disposition='CREATE_IF_NEEDED',
+write_disposition='WRITE_APPEND')
+
+# reduce load job size to induce copy jobs
+bqfl._DEFAULT_MAX_FILE_SIZE = 10
+bqfl._MAXIMUM_LOAD_SIZE = 20
+verifiers = [
+BigqueryFullResultMatcher(
+project=self.project,
+query="SELECT * FROM %s" % (self.output_table + "_1"),
+data=[(i, ) for i in range(5)]),
+BigqueryFullResultMatcher(
+project=self.project,
+query="SELECT * FROM %s" % (self.output_table + "_2"),
+data=[(i, ) for i in range(5, 10)])
+]
+
+output = self.output_table
+
+def callable_table(el: dict):
+  dest = output

(beam) branch master updated: [Python BQ] Retry get_table for quota errors (#28820)

2024-01-08 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new c9e036e40e4 [Python BQ] Retry get_table for quota errors (#28820)
c9e036e40e4 is described below

commit c9e036e40e4f1d21f33ce6829fdf919c934da7de
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Jan 8 15:52:31 2024 -0500

[Python BQ] Retry get_table for quota errors (#28820)

* retry get_table on quota errors

* add tests

* only retry on transient reasons
---
 sdks/python/apache_beam/io/gcp/bigquery_test.py  | 201 ++-
 sdks/python/apache_beam/io/gcp/bigquery_tools.py |   2 +-
 sdks/python/apache_beam/utils/retry.py   |  33 +++-
 3 files changed, 229 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 035edffc03f..e53204a5ebc 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -43,6 +43,7 @@ import apache_beam as beam
 from apache_beam.internal import pickler
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
+from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.gcp import bigquery as beam_bq
 from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.bigquery import ReadFromBigQuery
@@ -82,11 +83,13 @@ from apache_beam.transforms.display_test import 
DisplayDataItemMatcher
 try:
   from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_client
   from apitools.base.py.exceptions import HttpError
+  from apitools.base.py.exceptions import HttpForbiddenError
   from google.cloud import bigquery as gcp_bigquery
   from google.api_core import exceptions
 except ImportError:
   gcp_bigquery = None
   HttpError = None
+  HttpForbiddenError = None
   exceptions = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
@@ -323,7 +326,9 @@ class TestJsonToDictCoder(unittest.TestCase):
 self.assertEqual(expected_row, actual)
 
 
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+@unittest.skipIf(
+HttpError is None or HttpForbiddenError is None,
+'GCP dependencies are not installed')
 class TestReadFromBigQuery(unittest.TestCase):
   @classmethod
   def setUpClass(cls):
@@ -454,6 +459,200 @@ class TestReadFromBigQuery(unittest.TestCase):
 mock_insert.assert_called()
 self.assertIn(error_message, exc.exception.args[0])
 
+  @parameterized.expand([
+  # first attempt returns a Http 500 blank error and retries
+  # second attempt returns a Http 408 blank error and retries,
+  # third attempt passes
+  param(
+  responses=[
+  HttpForbiddenError(
+  response={'status': 500}, content="something", url="")
+  if HttpForbiddenError else None,
+  HttpForbiddenError(
+  response={'status': 408}, content="blank", url="")
+  if HttpForbiddenError else None
+  ],
+  expected_retries=2),
+  # first attempts returns a 403 rateLimitExceeded error
+  # second attempt returns a 429 blank error
+  # third attempt returns a Http 403 rateLimitExceeded error
+  # fourth attempt passes
+  param(
+  responses=[
+  exceptions.Forbidden(
+  "some message",
+  errors=({
+  "message": "transient", "reason": "rateLimitExceeded"
+  }, )) if exceptions else None,
+  exceptions.ResourceExhausted("some message")
+  if exceptions else None,
+  HttpForbiddenError(
+  response={'status': 403},
+  content={
+  "error": {
+  "errors": [{
+  "message": "transient",
+  "reason": "rateLimitExceeded"
+  }]
+  }
+  },
+  url="") if HttpForbiddenError else None,
+  ],
+  expected_retries=3),
+  ])
+  def test_get_table_transient_exception(self, responses, expected_retries):
+class DummyTable:
+  class DummySchema:
+fields = []
+
+  numBytes = 5
+  schema = DummySchema()
+
+with mock.patch('time.sleep'), \
+mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService,
+  'Get') as mock_get_table, \
+mock.patch.

(beam) branch master updated: update container tag (#29872)

2023-12-27 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 43e79080b44 update container tag (#29872)
43e79080b44 is described below

commit 43e79080b445c8fc227edc20cbea06a296964d6f
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Dec 27 19:15:46 2023 +0300

update container tag (#29872)
---
 sdks/python/apache_beam/runners/dataflow/internal/names.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py 
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 1b7795916f0..8d3dc94a3ce 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -34,6 +34,6 @@ SERIALIZED_SOURCE_KEY = 'serialized_source'
 # Unreleased sdks use container image tag specified below.
 # Update this tag whenever there is a change that
 # requires changes to SDK harness container or SDK harness launcher.
-BEAM_DEV_SDK_CONTAINER_TAG = 'beam-master-20231215'
+BEAM_DEV_SDK_CONTAINER_TAG = 'beam-master-20231227'
 
 DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3'



(beam) branch master updated: Allow large timestamp skew for at-least-once streaming (#29858)

2023-12-27 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 989a2198a31 Allow large timestamp skew for at-least-once streaming 
(#29858)
989a2198a31 is described below

commit 989a2198a3174a66cd733834383f3603de70d1fa
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Dec 27 12:16:53 2023 +0300

Allow large timestamp skew for at-least-once streaming (#29858)

* large skew

* test

* use AppendSerializationError everywhere
---
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  11 ++-
 .../bigquery/StorageApiWritesShardedRecords.java   |   6 +-
 .../sdk/io/gcp/testing/FakeDatasetService.java |   2 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 109 +
 .../io/gcp/bigquery/BigQuerySinkMetricsTest.java   |   4 +-
 5 files changed, 123 insertions(+), 9 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 8f24ebc8ad9..3c6c73dd021 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -670,10 +670,10 @@ public class 
StorageApiWriteUnshardedRecords
   
BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());
 
   if (failedContext.getError() != null
-  && failedContext.getError() instanceof 
Exceptions.AppendSerializtionError) {
-Exceptions.AppendSerializtionError error =
+  && failedContext.getError() instanceof 
Exceptions.AppendSerializationError) {
+Exceptions.AppendSerializationError error =
 Preconditions.checkStateNotNull(
-(Exceptions.AppendSerializtionError) 
failedContext.getError());
+(Exceptions.AppendSerializationError) 
failedContext.getError());
 
 Set failedRowIndices = 
error.getRowIndexToErrorMessage().keySet();
 for (int failedIndex : failedRowIndices) {
@@ -1164,5 +1164,10 @@ public class 
StorageApiWriteUnshardedRecords
 throw new RuntimeException(e);
   }
 }
+
+@Override
+public Duration getAllowedTimestampSkew() {
+  return Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+}
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 8cf8ad0ee02..0f9b07d0c40 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -662,10 +662,10 @@ public class StorageApiWritesShardedRecords failedRowIndices = 
error.getRowIndexToErrorMessage().keySet();
   for (int failedIndex : failedRowIndices) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 6a50127acd8..1e746d7f96b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -680,7 +680,7 @@ public class FakeDatasetService implements DatasetService, 
WriteStreamService, S
   }
   if (!rowIndexToErrorMessage.isEmpty()) {
 return ApiFutures.immediateFailedFuture(
-new Exceptions.AppendSerializtionError(
+new Exceptions.AppendSerializationError(
 Code.INVALID_ARGUMENT.getNumber(),
 "Append serialization failed for writer: " + streamName,
 stream.streamName,
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 720419f2227..55269342155 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest

(beam) branch master updated: Dynamic SchemaTransform wrapper provider (#29561)

2023-12-13 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 921e40a12f4 Dynamic SchemaTransform wrapper provider (#29561)
921e40a12f4 is described below

commit 921e40a12f467c51161bf33a0144fb8a1d4ca334
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Dec 14 05:58:29 2023 +0300

Dynamic SchemaTransform wrapper provider (#29561)

* wrapper provider

* add typehints

* add GenerateSequence schematransform and expansion service for java core

* address comments; add description property

* add description test

* add experimental note
---
 .../GenerateSequenceSchemaTransformProvider.java   | 201 +++
 .../apache/beam/sdk/providers/package-info.java|  23 ++
 ...enerateSequenceSchemaTransformProviderTest.java |  61 +
 .../io/external/xlang_kafkaio_it_test.py   |   4 +-
 sdks/python/apache_beam/transforms/external.py |   1 +
 .../external_schematransform_provider.py   | 277 +
 .../external_schematransform_provider_test.py  | 140 +++
 sdks/python/apache_beam/typehints/schemas.py   |   3 +
 sdks/python/pytest.ini |   2 +-
 sdks/python/test-suites/dataflow/build.gradle  |   4 +-
 sdks/python/test-suites/direct/build.gradle|   8 +-
 sdks/python/test-suites/gradle.properties  |   4 +-
 sdks/python/test-suites/xlang/build.gradle |   9 +-
 13 files changed, 723 insertions(+), 14 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
new file mode 100644
index 000..f4cada661b0
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.providers;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.GenerateSequence;
+import 
org.apache.beam.sdk.providers.GenerateSequenceSchemaTransformProvider.GenerateSequenceConfiguration;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Duration;
+
+@AutoService(SchemaTransformProvider.class)
+public class GenerateSequenceSchemaTransformProvider
+extends TypedSchemaTransformProvider {
+  public static final String OUTPUT_ROWS_TAG = "output";
+  public static final Schema OUTPUT_SCHEMA = 
Schema.builder().addInt64Field("value").build();
+
+  @Override
+  public String identifier() {
+return "beam:schematransform:org.apache.beam:generate_sequence:v1";
+  }
+
+  @Override
+  public List inputCollectionNames() {
+return Collections.emptyList();
+  }
+
+  @Override
+  public List outputCollectionNames() {
+return Collections.singletonList(OUTPUT_ROWS_TAG);
+  }
+
+  @Override
+  public String desc

(beam) branch master updated: Pass Java SchemaTransform descriptions to Python SDK (#29606)

2023-12-10 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new a96fa74f0cc Pass Java SchemaTransform descriptions to Python SDK 
(#29606)
a96fa74f0cc is described below

commit a96fa74f0ccb8d0c5490ba2872bfd52675a15558
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Dec 11 00:35:32 2023 +0300

Pass Java SchemaTransform descriptions to Python SDK (#29606)

* pipe thru schematransform descriptions
---
 .../beam/sdk/schemas/transforms/SchemaTransformProvider.java   |  6 +-
 .../schemas/transforms/TypedSchemaTransformProviderTest.java   |  6 ++
 .../BigQueryStorageWriteApiSchemaTransformProvider.java| 10 ++
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
index e73ec5d870c..c76d7a25e69 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
@@ -36,7 +36,11 @@ public interface SchemaTransformProvider {
   /** Returns an id that uniquely represents this transform. */
   String identifier();
 
-  /** Returns a description of this transform to be used for documentation. */
+  /**
+   * Returns a description regarding the {@link SchemaTransform} represented 
by the {@link
+   * SchemaTransformProvider}. Please keep the language generic (i.e. not 
specific to any
+   * programming language). The description may be markdown formatted.
+   */
   default String description() {
 return "";
   }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
index db7b1436a12..2b698f4f67b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
@@ -61,6 +61,11 @@ public class TypedSchemaTransformProviderTest {
   return "fake:v1";
 }
 
+@Override
+public String description() {
+  return "Description of fake provider";
+}
+
 @Override
 protected Class configurationClass() {
   return Configuration.class;
@@ -115,6 +120,7 @@ public class TypedSchemaTransformProviderTest {
 Configuration outputConfig = ((FakeSchemaTransform) 
provider.from(inputConfig)).config;
 assertEquals("field1", outputConfig.getField1());
 assertEquals(13, outputConfig.getField2().intValue());
+assertEquals("Description of fake provider", provider.description());
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 98cc246ce0d..8c4edd2244b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -98,6 +98,16 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 return 
String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v2");
   }
 
+  @Override
+  public String description() {
+return String.format(
+"Writes data to BigQuery using the Storage Write API 
(https://cloud.google.com/bigquery/docs/write-api)."
++ "\n\nThis expects a single PCollection of Beam Rows and outputs 
two dead-letter queues (DLQ) that "
++ "contain failed rows. The first DLQ has tag [%s] and contains 
the failed rows. The second DLQ has "
++ "tag [%s] and contains failed rows and along with their 
respective errors.",
+FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG);
+  }
+
   @Override
   public List inputCollectionNames() {
 return Collections.singletonList(INPUT_ROWS_TAG);



(beam) branch master updated (ebc4a5585e0 -> 39e1615f945)

2023-12-01 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from ebc4a5585e0 Documents the transform upgrade feature (#29581)
 add 39e1615f945 fix typos (#29588)

No new revisions were added by this update.

Summary of changes:
 .../site/content/en/documentation/programming-guide.md | 18 +-
 1 file changed, 9 insertions(+), 9 deletions(-)



(beam) branch master updated: Revert "revert protobuf version (#29523)" (#29530)

2023-11-24 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 5a6329492f7 Revert "revert protobuf version (#29523)" (#29530)
5a6329492f7 is described below

commit 5a6329492f713750ac141d90607653978eccabc2
Author: Shunping Huang 
AuthorDate: Fri Nov 24 14:14:23 2023 -0500

Revert "revert protobuf version (#29523)" (#29530)
---
 buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 1664f5fc670..359aeea55a2 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -626,7 +626,7 @@ class BeamModulePlugin implements Plugin {
 def postgres_version = "42.2.16"
 def powermock_version = "2.0.9"
 // Try to keep protobuf_version consistent with the protobuf version in 
google_cloud_platform_libraries_bom
-def protobuf_version = "3.23.2"
+def protobuf_version = "3.24.4"
 def qpid_jms_client_version = "0.61.0"
 def quickcheck_version = "1.0"
 def sbe_tool_version = "1.25.1"



(beam) branch master updated: revert protobuf version (#29523)

2023-11-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 183141d2eab revert protobuf version (#29523)
183141d2eab is described below

commit 183141d2eab4f8b1f4ffd67200f5618c3abd48af
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Nov 22 22:51:27 2023 -0500

revert protobuf version (#29523)
---
 buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 359aeea55a2..1664f5fc670 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -626,7 +626,7 @@ class BeamModulePlugin implements Plugin {
 def postgres_version = "42.2.16"
 def powermock_version = "2.0.9"
 // Try to keep protobuf_version consistent with the protobuf version in 
google_cloud_platform_libraries_bom
-def protobuf_version = "3.24.4"
+def protobuf_version = "3.23.2"
 def qpid_jms_client_version = "0.61.0"
 def quickcheck_version = "1.0"
 def sbe_tool_version = "1.25.1"



(beam) branch master updated: Fix BigQueryStorageWriteApiSchemaTransformProvider null pointer on numStreams (#29434)

2023-11-21 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new bb4b633b019 Fix BigQueryStorageWriteApiSchemaTransformProvider null 
pointer on numStreams (#29434)
bb4b633b019 is described below

commit bb4b633b01970d46359637d58a296b532269b18e
Author: Jeff Kinard <35542536+pol...@users.noreply.github.com>
AuthorDate: Tue Nov 21 10:38:36 2023 -0500

Fix BigQueryStorageWriteApiSchemaTransformProvider null pointer on 
numStreams (#29434)

* Fix BigQueryStorageWriteApiSchemaTransformProvider null pointer on 
numStreams

Signed-off-by: Jeffrey Kinard 
---
 .../BigQueryStorageWriteApiSchemaTransformProvider.java  | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 1b9eb309ec4..39e6fd7c809 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -177,7 +177,9 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 invalidConfigMessage + "Output must not be empty if error handling 
specified.");
   }
 
-  if (this.getAutoSharding() != null && this.getAutoSharding()) {
+  if (this.getAutoSharding() != null
+  && this.getAutoSharding()
+  && this.getNumStreams() != null) {
 checkArgument(
 this.getNumStreams() == 0,
 invalidConfigMessage
@@ -338,7 +340,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 Boolean autoSharding = configuration.getAutoSharding();
 Integer numStreams = configuration.getNumStreams();
 // Triggering frequency is only applicable for exactly-once
-if (!configuration.getUseAtLeastOnceSemantics()) {
+if (configuration.getUseAtLeastOnceSemantics() == null
+|| !configuration.getUseAtLeastOnceSemantics()) {
   write =
   write.withTriggeringFrequency(
   (triggeringFrequency == null || triggeringFrequency <= 0)
@@ -346,7 +349,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
   : Duration.standardSeconds(triggeringFrequency));
 }
 // set num streams if specified, otherwise default to autoSharding
-if (numStreams > 0) {
+if (numStreams != null && numStreams > 0) {
   write = write.withNumStorageWriteApiStreams(numStreams);
 } else if (autoSharding == null || autoSharding) {
   write = write.withAutoSharding();



(beam) branch master updated: Add ability to run performance regression checks on Beam IO Load tests. (#29226)

2023-11-01 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ea8596f2df0 Add ability to run performance regression checks on Beam 
IO Load tests. (#29226)
ea8596f2df0 is described below

commit ea8596f2df0e3e4b9da9f215ae6745c2ddfb6612
Author: Pranav Bhandari 
AuthorDate: Wed Nov 1 12:50:01 2023 -0400

Add ability to run performance regression checks on Beam IO Load tests. 
(#29226)
---
 .../testing/analyzers/io_tests_config.yaml | 256 +
 .../testing/analyzers/load_test_perf_analysis.py   |  98 
 .../testing/analyzers/perf_analysis_utils.py   |  11 +-
 3 files changed, 364 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/testing/analyzers/io_tests_config.yaml 
b/sdks/python/apache_beam/testing/analyzers/io_tests_config.yaml
new file mode 100644
index 000..2a33ae31797
--- /dev/null
+++ b/sdks/python/apache_beam/testing/analyzers/io_tests_config.yaml
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+spanner_io_read:
+  test_description: |
+SpannerIO Read test 100 GB.
+  project: apache-beam-testing
+  metrics_dataset: performance_tests
+  metrics_table: io_performance_metrics
+  # test_name is in the format testName,pipelineName
+  test_name: testSpannerWriteAndRead,read-spanner
+  metric_name:
+- RunTime
+- EstimatedCost
+
+spanner_io_read_runnerV2:
+  test_description: |
+SpannerIO RunnerV2 Read test 100 GB.
+  project: apache-beam-testing
+  metrics_dataset: performance_tests
+  metrics_table: io_performance_metrics
+  # test_name is in the format testName,pipelineName
+  test_name: testSpannerWriteAndRead,read_spanner_v2
+  metric_name:
+- RunTime
+- EstimatedCost
+
+spanner_io_write:
+  test_description: |
+SpannerIO write test 100 GB.
+  project: apache-beam-testing
+  metrics_dataset: performance_tests
+  metrics_table: io_performance_metrics
+  # test_name is in the format testName,pipelineName
+  test_name: testSpannerWriteAndRead,write-spanner
+  metric_name:
+- RunTime
+- EstimatedCost
+
+spanner_io_write_runnerV2:
+  test_description: |
+SpannerIO RunnerV2 write test 100 GB.
+  project: apache-beam-testing
+  metrics_dataset: performance_tests
+  metrics_table: io_performance_metrics
+  # test_name is in the format testName,pipelineName
+  test_name: testSpannerWriteAndRead,write_spanner_v2
+  metric_name:
+- RunTime
+- EstimatedCost
+
+bigquery_io_storage_api_read:
+  test_description: |
+BigQueryIO Storage write API read test 100 GB.
+  project: apache-beam-testing
+  metrics_dataset: performance_tests
+  metrics_table: io_performance_metrics
+  # test_name is in the format testName,pipelineName
+  test_name: testStorageAPIWriteThenRead,read-bigquery
+  metric_name:
+- RunTime
+- EstimatedCost
+
+bigquery_io_storage_api_read_runnerV2:
+  test_description: |
+BigQueryIO RunnerV2 Storage write API read test 100 GB.
+  project: apache-beam-testing
+  metrics_dataset: performance_tests
+  metrics_table: io_performance_metrics
+  # test_name is in the format testName,pipelineName
+  test_name: testStorageAPIWriteThenRead,read_bigquery_v2
+  metric_name:
+- RunTime
+- EstimatedCost
+
+bigquery_io_storage_api_write:
+  test_description: |
+BigQueryIO Storage write API write test 100 GB.
+  project: apache-beam-testing
+  metrics_dataset: performance_tests
+  metrics_table: io_performance_metrics
+  # test_name is in the format testName,pipelineName
+  test_name: testStorageAPIWriteThenRead,write-bigquery
+  metric_name:
+- RunTime
+- EstimatedCost
+
+bigquery_io_storage_api_write_runnerV2:
+  test_description: |
+BigQueryIO Storage write API write test 100 GB.
+  project: apache-beam-testing
+  metrics_dataset: performance_tests
+  metrics_table: io_performance_metrics
+  # test_name is in the format testName,pipelineName
+  test_name: testStorageAPIWriteThenRead,write_bigquery_v2
+  metric_name:
+- RunTime
+- EstimatedCost
+
+bigquery_io_avro_file_loads_read:

[beam] branch master updated (2bccee14cf9 -> a859863d392)

2023-10-10 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 2bccee14cf9 Bump google.golang.org/api from 0.144.0 to 0.146.0 in 
/sdks (#28911)
 add a859863d392 update dataflow containers (#28904)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/runners/dataflow/internal/names.py| 2 +-
 website/www/site/content/en/documentation/runtime/environments.md | 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)



[beam] branch master updated (604629798a3 -> d17063412ea)

2023-10-09 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 604629798a3 Add dataclasses to perf alert tool and refactor code. 
(#28889)
 add d17063412ea Update BigQueryIO Documentation (#28591)

No new revisions were added by this update.

Summary of changes:
 .../en/documentation/io/built-in/google-bigquery.md| 14 ++
 1 file changed, 6 insertions(+), 8 deletions(-)



[beam] branch master updated (31e1c7a8aef -> 0304cae8cc8)

2023-10-05 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 31e1c7a8aef Bump pillow (#28810)
 add 0304cae8cc8 add bigtable reviewers (#28834)

No new revisions were added by this update.

Summary of changes:
 .github/REVIEWERS.yml   | 4 
 .github/autolabeler.yml | 1 +
 2 files changed, 5 insertions(+)



[beam] branch master updated (c01b41f9758 -> be805372238)

2023-10-03 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from c01b41f9758 Bump urllib3 from 1.26.16 to 1.26.17 in 
/sdks/python/container/py310 (#28785)
 add be805372238 BigQuery testing suite that runs against BQ's day 0 region 
(#28397)

No new revisions were added by this update.

Summary of changes:
 ... beam_PostCommit_Java_BigQueryEarlyRollout.yml} | 27 -
 sdks/java/io/google-cloud-platform/build.gradle| 44 +-
 .../sdk/io/gcp/bigquery/TestBigQueryOptions.java   |  7 
 .../gcp/common/GcpIoPipelineOptionsRegistrar.java  |  2 +
 .../beam/sdk/io/gcp/testing/BigqueryClient.java| 37 --
 .../io/gcp/bigquery/BigQueryIOStorageQueryIT.java  | 10 -
 .../io/gcp/bigquery/BigQueryIOStorageReadIT.java   |  9 -
 .../bigquery/BigQueryIOStorageReadTableRowIT.java  | 43 -
 .../io/gcp/bigquery/BigQueryIOStorageWriteIT.java  | 31 +++
 .../bigquery/BigQuerySchemaUpdateOptionsIT.java|  6 ++-
 .../BigQueryTimePartitioningClusteringIT.java  | 42 -
 .../sdk/io/gcp/bigquery/BigQueryToTableIT.java |  9 +++--
 .../sdk/io/gcp/bigquery/FileLoadsStreamingIT.java  |  9 -
 .../bigquery/StorageApiDirectWriteProtosIT.java|  9 -
 .../gcp/bigquery/StorageApiSinkFailedRowsIT.java   | 13 ++-
 .../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java |  9 -
 .../gcp/bigquery/StorageApiSinkSchemaUpdateIT.java | 17 ++---
 .../gcp/bigquery/TableRowToStorageApiProtoIT.java  | 11 --
 18 files changed, 261 insertions(+), 74 deletions(-)
 copy .github/workflows/{beam_PostCommit_Java.yml => 
beam_PostCommit_Java_BigQueryEarlyRollout.yml} (75%)



[beam] branch master updated (c7d7896533b -> 2e0521162f6)

2023-10-03 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from c7d7896533b Github Workflow Replacement for Jenkins Jobs, 
beam_LoadTests_Java_GBK_Dataflow_Batch_* (#28738)
 add 2e0521162f6 [Java BQ] Storage API streaming load test (#28264)

No new revisions were added by this update.

Summary of changes:
 .../beam_PostCommit_Java_IO_Performance_Tests.yml  |   2 +-
 it/build.gradle|   4 +
 it/google-cloud-platform/build.gradle  |   5 +-
 .../beam/it/gcp/bigquery/BigQueryStreamingLT.java  | 643 +
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |   6 +-
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |   6 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java|  29 +-
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java |  37 +-
 8 files changed, 723 insertions(+), 9 deletions(-)
 create mode 100644 
it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java



[beam] branch master updated: Add Bigtable xlang bug to release notes for versions 2.49.0 and 2.50.0 (#28633)

2023-09-25 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 72b6645ec55 Add Bigtable xlang bug to release notes for versions 
2.49.0 and 2.50.0 (#28633)
72b6645ec55 is described below

commit 72b6645ec5533dcedeadcec9fad764acc1be46d0
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Sep 25 15:17:44 2023 -0400

Add Bigtable xlang bug to release notes for versions 2.49.0 and 2.50.0 
(#28633)
---
 CHANGES.md  | 3 +++
 website/www/site/content/en/blog/beam-2.49.0.md | 1 +
 website/www/site/content/en/blog/beam-2.50.0.md | 1 +
 3 files changed, 5 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index a990a5fd730..650b33c1240 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -85,6 +85,7 @@
 
 * Fixed exception chaining issue in GCS connector (Python) 
([#26769](https://github.com/apache/beam/issues/26769#issuecomment-1700422615)).
 * Fixed streaming inserts exception handling, GoogleAPICallErrors are now 
retried according to retry strategy and routed to failed rows where appropriate 
rather than causing a pipeline error (Python) 
([#21080](https://github.com/apache/beam/issues/21080)).
+* Fixed a bug in Python SDK's cross-language Bigtable sink that mishandled 
records that don't have an explicit timestamp set: 
[#28632](https://github.com/apache/beam/issues/28632).
 
 
 ## Security Fixes
@@ -153,6 +154,7 @@
 * Long-running Python pipelines might experience a memory leak: 
[#28246](https://github.com/apache/beam/issues/28246).
 * Python Pipelines using BigQuery IO or `orjson` dependency might experience 
segmentation faults or get stuck: 
[#28318](https://github.com/apache/beam/issues/28318).
 * Beam Python containers rely on a version of Debian/aom that has several 
security vulnerabilities: 
[CVE-2021-30474](https://nvd.nist.gov/vuln/detail/CVE-2021-30474), 
[CVE-2021-30475](https://nvd.nist.gov/vuln/detail/CVE-2021-30475), 
[CVE-2021-30473](https://nvd.nist.gov/vuln/detail/CVE-2021-30473), 
[CVE-2020-36133](https://nvd.nist.gov/vuln/detail/CVE-2020-36133), 
[CVE-2020-36131](https://nvd.nist.gov/vuln/detail/CVE-2020-36131), 
[CVE-2020-36130](https://nvd.nist.gov/vuln/detail/CVE-202 [...]
+* Python SDK's cross-language Bigtable sink mishandles records that don't have 
an explicit timestamp set: 
[#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set 
explicit timestamps for all records before writing to Bigtable.
 
 
 # [2.49.0] - 2023-07-17
@@ -225,6 +227,7 @@
 
 * PubsubIO writes will throw *SizeLimitExceededException* for any message 
above 100 bytes, when used in batch (bounded) mode. (Java) 
([#27000](https://github.com/apache/beam/issues/27000)).
 * Long-running Python pipelines might experience a memory leak: 
[#28246](https://github.com/apache/beam/issues/28246).
+* Python SDK's cross-language Bigtable sink mishandles records that don't have 
an explicit timestamp set: 
[#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set 
explicit timestamps for all records before writing to Bigtable.
 
 
 # [2.47.0] - 2023-05-10
diff --git a/website/www/site/content/en/blog/beam-2.49.0.md 
b/website/www/site/content/en/blog/beam-2.49.0.md
index 595b8e71253..a2e7af0e18f 100644
--- a/website/www/site/content/en/blog/beam-2.49.0.md
+++ b/website/www/site/content/en/blog/beam-2.49.0.md
@@ -51,6 +51,7 @@ For more information on changes in 2.49.0, check out the 
[detailed release notes
 ### Known Issues
 
 * Long-running Python pipelines might experience a memory leak: 
[#28246](https://github.com/apache/beam/issues/28246).
+* Python SDK's cross-language Bigtable sink mishandles records that don't have 
an explicit timestamp set: 
[#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set 
explicit timestamps for all records before writing to Bigtable.
 
 
 ## List of Contributors
diff --git a/website/www/site/content/en/blog/beam-2.50.0.md 
b/website/www/site/content/en/blog/beam-2.50.0.md
index 4cfddd6167a..43bdecdc3ba 100644
--- a/website/www/site/content/en/blog/beam-2.50.0.md
+++ b/website/www/site/content/en/blog/beam-2.50.0.md
@@ -83,6 +83,7 @@ For more information on changes in 2.50.0, check out the 
[detailed release notes
 
 * Long-running Python pipelines might experience a memory leak: 
[#28246](https://github.com/apache/beam/issues/28246).
 * Python Pipelines using BigQuery IO or `orjson` dependency might experience 
segmentation faults or get stuck: 
[#28318](https://github.com/apache/beam/issues/28318).
+* Python SDK's cross-language Bigtable sink mishandles records that don't have 
an explicit timestamp set: 
[#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set 
explicit timestamps for all records before writing to Bigtable.
 
 
 #

[beam] branch master updated: [Fix Python Bigtable dataloss bug] Stop unsetting timestamps of -1 (#28624)

2023-09-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 68cf802626a [Fix Python Bigtable dataloss bug] Stop unsetting 
timestamps of -1 (#28624)
68cf802626a is described below

commit 68cf802626a1ef7d41aadca13d009f0d1b609b33
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Sat Sep 23 03:56:19 2023 +

[Fix Python Bigtable dataloss bug] Stop unsetting timestamps of -1 (#28624)

* add test for unset timestamp

* pass thru all timestamps

* default to -1 in schematransform
---
 .../BigtableWriteSchemaTransformProvider.java | 13 +++--
 .../BigtableWriteSchemaTransformProviderIT.java   | 19 +++
 sdks/python/apache_beam/io/gcp/bigtableio.py  |  7 +++
 sdks/python/apache_beam/io/gcp/bigtableio_it_test.py  | 18 ++
 4 files changed, 39 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
index d38bdae2f09..b99b69621a8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
@@ -179,12 +179,13 @@ public class BigtableWriteSchemaTransformProvider
 .setColumnQualifier(
 
ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get()))
 .setFamilyNameBytes(
-
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()));
-if (mutation.containsKey("timestamp_micros")) {
-  setMutation =
-  setMutation.setTimestampMicros(
-  
Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get()));
-}
+
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()))
+// Use timestamp if provided, else default to -1 (current 
Bigtable server time)
+.setTimestampMicros(
+mutation.containsKey("timestamp_micros")
+? Longs.fromByteArray(
+
ofNullable(mutation.get("timestamp_micros")).get())
+: -1);
 bigtableMutation = 
Mutation.newBuilder().setSetCell(setMutation.build()).build();
 break;
   case "DeleteFromColumn":
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
index 14bb04b0315..1a60fe661b5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
@@ -154,8 +154,8 @@ public class BigtableWriteSchemaTransformProviderIT {
   public void testSetMutationsExistingColumn() {
 RowMutation rowMutation =
 RowMutation.create(tableId, "key-1")
-.setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a")
-.setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c");
+.setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a")
+.setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c");
 dataClient.mutateRow(rowMutation);
 
 List> mutations = new ArrayList<>();
@@ -165,13 +165,15 @@ public class BigtableWriteSchemaTransformProviderIT {
 "type", "SetCell".getBytes(StandardCharsets.UTF_8),
 "value", "new-val-1-a".getBytes(StandardCharsets.UTF_8),
 "column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8),
-"family_name", 
COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)));
+"family_name", 
COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8),
+"timestamp_micros", Longs.toByteArray(2000)));
 mutations.add(
 ImmutableMap.of(
 "type", "SetCell".getBytes(StandardCharsets.UTF_8),
 &quo

[beam] branch master updated: [Python BQ] Follow up of #28592: Allow setting a fixed number of Storage API streams (#28618)

2023-09-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 1b6de5d407b [Python BQ] Follow up of #28592: Allow setting a fixed 
number of Storage API streams (#28618)
1b6de5d407b is described below

commit 1b6de5d407ba4122710ec870c880a1ee443b6d2b
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Fri Sep 22 19:49:41 2023 +

[Python BQ] Follow up of #28592: Allow setting a fixed number of Storage 
API streams (#28618)

* expose num streams option; fix some streaming tests

* add test phrase in description

* lint fix
---
 ...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy |  2 +-
 ...stCommit_Python_CrossLanguage_Gcp_Direct.groovy |  2 +-
 ...ueryStorageWriteApiSchemaTransformProvider.java | 37 ++
 .../io/external/xlang_bigqueryio_it_test.py| 44 ++
 sdks/python/apache_beam/io/gcp/bigquery.py |  9 +
 .../documentation/io/built-in/google-bigquery.md   |  2 +-
 6 files changed, 69 insertions(+), 27 deletions(-)

diff --git 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
index d1ee27088c7..1280fcb4e23 100644
--- 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
+++ 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
@@ -28,7 +28,7 @@ import static 
PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
 // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service 
decorator
 PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow',
 'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow 
(\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) {
-  description('Runs end-to-end cross language GCP IO tests on the Dataflow 
runner.')
+  description('Runs end-to-end cross language GCP IO tests on the Dataflow 
runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"')
 
 
   // Set common parameters.
diff --git 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
index 438b735fba7..e4bf771be1a 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
@@ -28,7 +28,7 @@ import static 
PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
 // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service 
decorator
 PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct',
 'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run 
Python_Xlang_Gcp_Direct PostCommit\")', this) {
-  description('Runs end-to-end cross language GCP IO tests on the Direct 
runner.')
+  description('Runs end-to-end cross language GCP IO tests on the Direct 
runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"')
 
   // Set common parameters.
   commonJobProperties.setTopLevelMainJobProperties(delegate)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index e4461793011..1b9eb309ec4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -176,6 +176,13 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
 invalidConfigMessage + "Output must not be empty if error handling 
specified.");
   }
+
+  if (this.getAutoSharding() != null && this.getAutoSharding()) {
+checkArgument(
+this.getNumStreams() == 0,
+invalidConfigMessage
++ "Cannot set a fixed number of streams when auto-sharding is 
enabled. Please pick only one of the two options.");
+  }
 }
 
 /**
@@ -218,11 +225,17 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
 public abstract Boolean getUseAtLeastOnceSemantics();
 
 @SchemaFieldDescription(
-"This option enables using a dynamically determined number of shards 
to write to "
+"This op

[beam] branch master updated (04a26da777f -> 426dbd3955e)

2023-09-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 04a26da777f [Python BQ] Allow setting a fixed number of Storage API 
streams (#28592)
 add 426dbd3955e Revert "[Python BQ] Allow setting a fixed number of 
Storage API streams (#28592)" (#28613)

No new revisions were added by this update.

Summary of changes:
 ...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy |  2 +-
 ...stCommit_Python_CrossLanguage_Gcp_Direct.groovy |  2 +-
 ...ueryStorageWriteApiSchemaTransformProvider.java | 37 +
 .../io/external/xlang_bigqueryio_it_test.py| 38 --
 sdks/python/apache_beam/io/gcp/bigquery.py |  9 -
 .../documentation/io/built-in/google-bigquery.md   |  2 +-
 6 files changed, 24 insertions(+), 66 deletions(-)



[beam] 01/01: Revert "[Python BQ] Allow setting a fixed number of Storage API streams (#28592)"

2023-09-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch revert-28592-expose_numshards
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 65bf0e2616e78ba6ffe43748d7cd5a1614c79653
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Fri Sep 22 09:13:13 2023 -0400

Revert "[Python BQ] Allow setting a fixed number of Storage API streams 
(#28592)"

This reverts commit 04a26da777ff4c0ed9112f07bf0f41a39bc7260d.
---
 ...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy |  2 +-
 ...stCommit_Python_CrossLanguage_Gcp_Direct.groovy |  2 +-
 ...ueryStorageWriteApiSchemaTransformProvider.java | 37 +
 .../io/external/xlang_bigqueryio_it_test.py| 38 --
 sdks/python/apache_beam/io/gcp/bigquery.py |  9 -
 .../documentation/io/built-in/google-bigquery.md   |  2 +-
 6 files changed, 24 insertions(+), 66 deletions(-)

diff --git 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
index 1280fcb4e23..d1ee27088c7 100644
--- 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
+++ 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
@@ -28,7 +28,7 @@ import static 
PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
 // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service 
decorator
 PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow',
 'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow 
(\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) {
-  description('Runs end-to-end cross language GCP IO tests on the Dataflow 
runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"')
+  description('Runs end-to-end cross language GCP IO tests on the Dataflow 
runner.')
 
 
   // Set common parameters.
diff --git 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
index e4bf771be1a..438b735fba7 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
@@ -28,7 +28,7 @@ import static 
PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
 // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service 
decorator
 PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct',
 'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run 
Python_Xlang_Gcp_Direct PostCommit\")', this) {
-  description('Runs end-to-end cross language GCP IO tests on the Direct 
runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"')
+  description('Runs end-to-end cross language GCP IO tests on the Direct 
runner.')
 
   // Set common parameters.
   commonJobProperties.setTopLevelMainJobProperties(delegate)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index c3eed246723..e4461793011 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -176,13 +176,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
 invalidConfigMessage + "Output must not be empty if error handling 
specified.");
   }
-
-  if (this.getAutoSharding() != null && this.getAutoSharding()) {
-checkArgument(
-this.getNumStreams() == 0,
-invalidConfigMessage
-+ "Cannot set a fixed number of streams when auto-sharding is 
enabled. Please pick only one of the two options.");
-  }
 }
 
 /**
@@ -225,17 +218,11 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
 public abstract Boolean getUseAtLeastOnceSemantics();
 
 @SchemaFieldDescription(
-"This option enables using a dynamically determined number of Storage 
Write API streams to write to "
+"This option enables using a dynamically determined number of shards 
to write to "
 + "BigQuery. Only applicable to unbounded data.")
 @Nullable
 public abstract Boolean getAutoSharding();
 
-@SchemaFieldDescription(

[beam] branch revert-28592-expose_numshards created (now 65bf0e2616e)

2023-09-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch revert-28592-expose_numshards
in repository https://gitbox.apache.org/repos/asf/beam.git


  at 65bf0e2616e Revert "[Python BQ] Allow setting a fixed number of 
Storage API streams (#28592)"

This branch includes the following new commits:

 new 65bf0e2616e Revert "[Python BQ] Allow setting a fixed number of 
Storage API streams (#28592)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[beam] branch master updated (0146a8389b4 -> 04a26da777f)

2023-09-22 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 0146a8389b4 cleanup jobs (#28528)
 add 04a26da777f [Python BQ] Allow setting a fixed number of Storage API 
streams (#28592)

No new revisions were added by this update.

Summary of changes:
 ...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy |  2 +-
 ...stCommit_Python_CrossLanguage_Gcp_Direct.groovy |  2 +-
 ...ueryStorageWriteApiSchemaTransformProvider.java | 37 -
 .../io/external/xlang_bigqueryio_it_test.py| 38 ++
 sdks/python/apache_beam/io/gcp/bigquery.py |  9 +
 .../documentation/io/built-in/google-bigquery.md   |  2 +-
 6 files changed, 66 insertions(+), 24 deletions(-)



[beam] branch master updated (f676d93030c -> ef0d8d4041c)

2023-09-20 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from f676d93030c When comparing Series, sort the values in Dataframe tests 
(#28557)
 add ef0d8d4041c Label Python external SchemaTransform with its URN (#28540)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/transforms/external.py | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)



[beam] branch master updated: Return errors for insert_rows_json exceptions (#21080) (#28091)

2023-09-19 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 0f727927c01 Return errors for insert_rows_json exceptions (#21080) 
(#28091)
0f727927c01 is described below

commit 0f727927c0136c63e8127ab5e9d3279fd5748dc4
Author: Adam Whitmore <61937566+ajdub...@users.noreply.github.com>
AuthorDate: Tue Sep 19 15:05:46 2023 -0400

Return errors for insert_rows_json exceptions (#21080) (#28091)
---
 CHANGES.md |   1 +
 sdks/python/apache_beam/io/gcp/bigquery_test.py| 853 +++--
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   |  22 +-
 .../apache_beam/io/gcp/bigquery_write_it_test.py   |  51 +-
 4 files changed, 681 insertions(+), 246 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 47c35fe3491..cdf93909cb6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -82,6 +82,7 @@
 ## Bugfixes
 
 * Fixed exception chaining issue in GCS connector (Python) 
([#26769](https://github.com/apache/beam/issues/26769#issuecomment-1700422615)).
+* Fixed streaming inserts exception handling, GoogleAPICallErrors are now 
retried according to retry strategy and routed to failed rows where appropriate 
rather than causing a pipeline error (Python) 
([#21080](https://github.com/apache/beam/issues/21080)).
 
 
 ## Security Fixes
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 3a3033dfcaf..7e9c1e63474 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -77,7 +77,6 @@ from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils import retry
 
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
@@ -931,77 +930,269 @@ class TestWriteToBigQuery(unittest.TestCase):
 'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #/api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  # 
/python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+  # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+  # transient error retried and succeeds on second attempt, 0 rows sent to
+  # failed rows
   param(
-  exception_type=exceptions.Forbidden if exceptions else None,
-  error_reason='rateLimitExceeded'),
+  insert_response=[
+exceptions.TooManyRequests if exceptions else None,
+None],
+  error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+  failed_rows=[]),
+  # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+  # failed rows after hitting max_retries
+  param(
+  insert_response=[
+exceptions.InternalServerError if exceptions else None,
+exceptions.InternalServerError if exceptions else None],
+  error_reason='Internal Server Error', # not in _NON_TRANSIENT_ERRORS
+  failed_rows=['value1', 'value3', 'value5']),
+  # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+  # failed_rows after hitting max_retries
+  param(
+  insert_response=[
+exceptions.Forbidden if exceptions else None,
+exceptions.Forbidden if exceptions else None],
+  error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+  failed_rows=['value1', 'value3', 'value5']),
+  ])
+  def test_insert_rows_json_exception_retry_always(
+  self, insert_response, error_reason, failed_rows):
+# In this test, a pipeline will always retry all caught exception types
+# since RetryStrategy is not set and defaults to RETRY_ALWAYS
+with mock.patch('time.sleep'):
+  call_counter = 0
+  mock_response = mock.Mock()
+  mock_response.reason = error_reason
+
+  def store_callback(table, **kwargs):
+nonlocal call_counter
+# raise exception if insert_response element is an exception
+if insert_response[call_counter]:
+  exception_type = insert_response[call_counter]
+  call_counter += 1
+  raise exception_type('some exc

[beam] branch master updated: Updating Storage API Autosharding documentation to include that it doesn't work on Runner V2 (#28233)

2023-09-18 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d6068ad66b0 Updating Storage API Autosharding documentation to include 
that it doesn't work on Runner V2 (#28233)
d6068ad66b0 is described below

commit d6068ad66b0a28a6dd628bb8ef48f1e2182acb4b
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Mon Sep 18 18:27:08 2023 +

Updating Storage API Autosharding documentation to include that it doesn't 
work on Runner V2 (#28233)

* add documentation

* doc for python too
---
 .../www/site/content/en/documentation/io/built-in/google-bigquery.md  | 4 
 1 file changed, 4 insertions(+)

diff --git 
a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md 
b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
index 24314dc1180..eae98b84d2c 100644
--- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
+++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
@@ -788,6 +788,8 @@ BigQuery Storage Write API for Python SDK currently has 
some limitations on supp
 {{< paragraph class="language-py" >}}
 **Note:** If you want to run WriteToBigQuery with Storage Write API from the 
source code, you need to run `./gradlew 
:sdks:java:io:google-cloud-platform:expansion-service:build` to build the 
expansion-service jar. If you are running from a released Beam SDK, the jar 
will already be included.
 
+**Note:** Auto sharding is not currently supported for Python's Storage Write 
API.
+
 {{< /paragraph >}}
 
  Exactly-once semantics
@@ -877,6 +879,8 @@ explicitly enable this using 
[`withAutoSharding`](https://beam.apache.org/releas
 
 ***Note:*** `STORAGE_WRITE_API` will default to dynamic sharding when
 `numStorageWriteApiStreams` is set to 0 or is unspecified.
+
+***Note:*** Auto sharding with `STORAGE_WRITE_API` is supported on Dataflow's 
legacy runner, but **not** on Runner V2
 {{< /paragraph >}}
 
 When using `STORAGE_WRITE_API`, the PCollection returned by



[beam] branch master updated: Add file loads streaming integration tests (#28312)

2023-09-13 Thread ahmedabualsaud
This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 103253348ec Add file loads streaming integration tests (#28312)
103253348ec is described below

commit 103253348ec3f1e193bb124e52dffcc603c929d9
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Sep 13 15:34:59 2023 +

Add file loads streaming integration tests (#28312)

* file loads streaming integration tests

* fix dynamic destinations copy jobs

* disable for runnerV2 until pane index is fixed
---
 runners/google-cloud-dataflow-java/build.gradle|   3 +
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java   |  11 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java|   4 +
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java |  27 +-
 .../sdk/io/gcp/bigquery/FileLoadsStreamingIT.java  | 497 +
 5 files changed, 532 insertions(+), 10 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index f6e2b9b147c..2acc30455e2 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -612,6 +612,9 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) 
{
   exclude '**/FhirIOLROIT.class'
   exclude '**/FhirIOSearchIT.class'
   exclude '**/FhirIOPatientEverythingIT.class'
+  // failing due to pane index not incrementing after Reshuffle:
+  // https://github.com/apache/beam/issues/28219
+  exclude '**/FileLoadsStreamingIT.class'
 
   maxParallelForks 4
   classpath = configurations.googleCloudPlatformIntegrationTest
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index f5f193aecb7..32ee29738bf 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -399,10 +398,12 @@ class BatchLoads
 "Window Into Global Windows",
 Window.>into(new 
GlobalWindows())
 
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1
-.apply("Add Void Key", WithKeys.of((Void) null))
-.setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
-.apply("GroupByKey", GroupByKey.create())
-.apply("Extract Values", Values.create())
+// We use this and the following GBK to aggregate by final 
destination.
+// This way, each destination has its own pane sequence
+.apply("AddDestinationKeys", WithKeys.of(result -> 
result.getKey()))
+.setCoder(KvCoder.of(destinationCoder, tempTables.getCoder()))
+.apply("GroupTempTablesByFinalDestination", GroupByKey.create())
+.apply("ExtractTempTables", Values.create())
 .apply(
 ParDo.of(
 new UpdateSchemaDestination(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 0063952d8b1..00ee815c3c9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -689,6 +689,10 @@ public class BigQueryUtils {
   }
 }
 
+if (jsonBQValue instanceof byte[] && fieldType.getTypeName() == 
TypeName.BYTES) {
+  return jsonBQValue;
+}
+
 if (jsonBQValue instanceof List) {
   if (fieldType.getCollectionElementType() == null) {
 throw new IllegalArgumentException(
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 9bff77a1658..f4074cc1a55 100644
--- 
a/sdks/java/io/google-cloud-pla