(beam) branch master updated (a85b0a636dc -> 7d5c97320b9)

2024-08-28 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from a85b0a636dc Filter out old actions runs from dashboard (#32347)
 add 7d5c97320b9 Add predicate to control which columns are propagated when 
propagating successful rows (#32312)

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |   6 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  31 ++-
 .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java |  11 +++
 .../StorageApiDynamicDestinationsProto.java|   4 +-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |   7 ++
 .../StorageApiWriteRecordsInconsistent.java|   7 ++
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  24 -
 .../bigquery/StorageApiWritesShardedRecords.java   |  17 +++-
 .../io/gcp/bigquery/TableRowToStorageApiProto.java |  30 --
 .../sdk/io/gcp/testing/FakeDatasetService.java |   5 +-
 .../io/gcp/bigquery/BigQueryIOTranslationTest.java |   3 +
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 103 +
 12 files changed, 229 insertions(+), 19 deletions(-)



(beam) branch master updated: Merge pull request #28050: Don't invalidate streams on quota errors

2023-11-02 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 8afdda64983 Merge pull request #28050: Don't invalidate streams on 
quota errors
8afdda64983 is described below

commit 8afdda64983d92e78370ee2bd08176519e63150a
Author: Reuven Lax 
AuthorDate: Thu Nov 2 13:13:56 2023 -0700

Merge pull request #28050: Don't invalidate streams on quota errors
---
 .../src/main/java/org/apache/beam/sdk/io/FileIO.java |  2 +-
 .../io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 16 +---
 .../io/gcp/bigquery/StorageApiWritesShardedRecords.java  | 12 +---
 3 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 2d28279f90b..76fc1a70b78 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -227,7 +227,7 @@ import org.slf4j.LoggerFactory;
  * {@link Sink}, e.g. write different elements to Avro files in different 
directories with different
  * schemas.
  *
- * This feature is supported by {@link #writeDynamic}. Use {@link Write#by} 
to specify how to
+ * This feature is supported by {@link #writeDynamic}. Use {@link Write#by} 
to specify how too
  * partition the elements into groups ("destinations"). Then elements will be 
grouped by
  * destination, and {@link Write#withNaming(Contextful)} and {@link 
Write#via(Contextful)} will be
  * applied separately within each group, i.e. different groups will be written 
using the file naming
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index df79ece207f..21c2a485c27 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -711,7 +711,19 @@ public class StorageApiWriteUnshardedRecords
   retrieveErrorDetails(contexts));
   failedContext.failureCount += 1;
 
-  invalidateWriteStream();
+  boolean quotaError = false;
+  Throwable error = failedContext.getError();
+  Status.Code statusCode = Status.Code.OK;
+  if (error != null) {
+statusCode = Status.fromThrowable(error).getCode();
+quotaError = statusCode.equals(Status.Code.RESOURCE_EXHAUSTED);
+  }
+
+  if (!quotaError) {
+// This forces us to close and reopen all gRPC connections to 
Storage API on error,
+// which empirically fixes random stuckness issues.
+invalidateWriteStream();
+  }
 
   // Maximum number of times we retry before we fail the work item.
   if (failedContext.failureCount > 5) {
@@ -720,8 +732,6 @@ public class StorageApiWriteUnshardedRecords
 
   // The following errors are known to be persistent, so always 
fail the work item in
   // this case.
-  Throwable error = 
Preconditions.checkStateNotNull(failedContext.getError());
-  Status.Code statusCode = Status.fromThrowable(error).getCode();
   if (statusCode.equals(Status.Code.OUT_OF_RANGE)
   || statusCode.equals(Status.Code.ALREADY_EXISTS)) {
 throw new RuntimeException(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 77d9e8023a0..f4982396e9d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -689,7 +689,7 @@ public class StorageApiWritesShardedRecords

(beam) branch master updated (2e52a6559a8 -> 2a96cedca15)

2023-11-01 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 2e52a6559a8 Fix bigtable_it_test instance naming (#29249)
 add 97354ee83c2 Change WindmillStateReader to not batch OrderedListFetches 
for the same family and tag. Fix issue with MultimapState delayed fetches due 
to batching.
 new 2a96cedca15 Merge pull request #28371: Change WindmillStateReader to 
not batch OrderedListFetches for the same family and tag.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../worker/windmill/state/WindmillStateReader.java | 112 ++---
 .../worker/windmill/state/WrappedFuture.java   |   4 +-
 .../windmill/state/WindmillStateReaderTest.java|  96 +++---
 3 files changed, 137 insertions(+), 75 deletions(-)



(beam) 01/01: Merge pull request #28371: Change WindmillStateReader to not batch OrderedListFetches for the same family and tag.

2023-11-01 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2a96cedca1516d11d3faa3a10387cd6c34a85b8c
Merge: 2e52a6559a8 97354ee83c2
Author: Reuven Lax 
AuthorDate: Wed Nov 1 13:38:07 2023 -0700

Merge pull request #28371: Change WindmillStateReader to not batch 
OrderedListFetches for the same family and tag.

 .../worker/windmill/state/WindmillStateReader.java | 112 ++---
 .../worker/windmill/state/WrappedFuture.java   |   4 +-
 .../windmill/state/WindmillStateReaderTest.java|  96 +++---
 3 files changed, 137 insertions(+), 75 deletions(-)



(beam) branch master updated (d5fc02479c2 -> 8648e583c84)

2023-10-30 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from d5fc02479c2 [YAML] Add a basic aggregating transform to Beam Yaml. 
(#29167)
 add 8648e583c84 Merge pull request #29114: Support default values in 
storage-api sink

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |   7 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  26 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |   6 +-
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |   7 +-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  17 +-
 .../StorageApiWriteRecordsInconsistent.java|   9 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  17 +-
 .../bigquery/StorageApiWritesShardedRecords.java   |  24 +-
 .../io/gcp/bigquery/TableRowToStorageApiProto.java |  23 +-
 .../sdk/io/gcp/testing/FakeDatasetService.java |   6 +-
 .../bigquery/StorageApiSinkDefaultValuesIT.java| 317 +
 11 files changed, 432 insertions(+), 27 deletions(-)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkDefaultValuesIT.java



[beam] 01/01: Merge pull request #28171: Properly handle in-flight deletes followed by adds in OrderedListState

2023-09-10 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5e4b9bbae5ea508c9298fdfd644d1fe3b6adfbd3
Merge: f86d9e2d012 f2f5bf6a293
Author: Reuven Lax 
AuthorDate: Sun Sep 10 17:24:08 2023 -0700

Merge pull request #28171: Properly handle in-flight deletes followed by 
adds in OrderedListState

 .../dataflow/worker/WindmillStateInternals.java|  9 +--
 .../worker/WindmillStateInternalsTest.java | 65 ++
 2 files changed, 70 insertions(+), 4 deletions(-)



[beam] branch master updated (f86d9e2d012 -> 5e4b9bbae5e)

2023-09-10 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from f86d9e2d012 update concurrency group definition (#28373)
 add f2f5bf6a293 Don't improperly filter newly-added elements that overlap 
with a delete.
 new 5e4b9bbae5e Merge pull request #28171: Properly handle in-flight 
deletes followed by adds in OrderedListState

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dataflow/worker/WindmillStateInternals.java|  9 +--
 .../worker/WindmillStateInternalsTest.java | 65 ++
 2 files changed, 70 insertions(+), 4 deletions(-)



[beam] branch master updated: Merge pull request #28124: Allow using CREATE_IF_NEEDED when writing deletes or updates to BigQuery

2023-08-24 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new fef4cf8a028 Merge pull request #28124: Allow using CREATE_IF_NEEDED 
when writing deletes or updates to BigQuery
fef4cf8a028 is described below

commit fef4cf8a0289e060ca05b6808beaf5734707d8a2
Author: Reuven Lax 
AuthorDate: Thu Aug 24 15:45:15 2023 -0700

Merge pull request #28124: Allow using CREATE_IF_NEEDED when writing 
deletes or updates to BigQuery
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   | 39 
 .../sdk/io/gcp/bigquery/CreateTableHelpers.java|  9 +
 .../beam/sdk/io/gcp/bigquery/CreateTables.java |  5 +++
 .../sdk/io/gcp/bigquery/DynamicDestinations.java   |  9 +
 .../gcp/bigquery/DynamicDestinationsHelpers.java   | 30 
 .../bigquery/StorageApiDynamicDestinations.java| 36 ++-
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  1 +
 .../bigquery/StorageApiWritesShardedRecords.java   |  4 ++-
 .../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 42 +-
 9 files changed, 100 insertions(+), 75 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 96da67321cb..58d76931244 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -30,6 +30,7 @@ import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableConstraints;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -510,7 +511,8 @@ import org.slf4j.LoggerFactory;
  *.apply(BigQueryIO.applyRowMutations()
  *   .to(my_project:my_dataset.my_table)
  *   .withSchema(schema)
- *   .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER));
+ *   .withPrimaryKey(ImmutableList.of("field1", "field2"))
+ *   .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
  * }
  *
  * If writing a type other than TableRow (e.g. using {@link 
BigQueryIO#writeGenericRecords} or
@@ -523,12 +525,17 @@ import org.slf4j.LoggerFactory;
  * cdcEvent.apply(BigQueryIO.write()
  *  .to("my-project:my_dataset.my_table")
  *  .withSchema(schema)
+ *  .withPrimaryKey(ImmutableList.of("field1", "field2"))
  *  .withFormatFunction(CdcEvent::getTableRow)
  *  .withRowMutationInformationFn(cdc -> 
RowMutationInformation.of(cdc.getChangeType(),
  * 
cdc.getSequenceNumber()))
  *  .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
- *  .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER));
+ *  .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
  * }
+ *
+ * Note that in order to use inserts or deletes, the table must bet set up 
with a primary key. If
+ * the table is not previously created and CREATE_IF_NEEDED is used, a primary 
key must be
+ * specified.
  */
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20506)
@@ -2318,6 +2325,8 @@ public class BigQueryIO {
 
 abstract @Nullable String getKmsKey();
 
+abstract @Nullable List getPrimaryKey();
+
 abstract Boolean getOptimizeWrites();
 
 abstract Boolean getUseBeamSchema();
@@ -2416,7 +2425,9 @@ public class BigQueryIO {
 
   abstract Builder setIgnoreInsertIds(Boolean ignoreInsertIds);
 
-  abstract Builder setKmsKey(String kmsKey);
+  abstract Builder setKmsKey(@Nullable String kmsKey);
+
+  abstract Builder setPrimaryKey(@Nullable List primaryKey);
 
   abstract Builder setOptimizeWrites(Boolean optimizeWrites);
 
@@ -2947,6 +2958,10 @@ public class BigQueryIO {
   return toBuilder().setKmsKey(kmsKey).build();
 }
 
+public Write withPrimaryKey(List primaryKey) {
+  return toBuilder().setPrimaryKey(primaryKey).build();
+}
+
 /**
  * If true, enables new codepaths that are expected to use less resources 
while writing to
  * BigQuery. Not enabled by default in order to maintain backwards 
compatibility.
@@ -3241,6 +3256,7 @@ public class BigQueryIO {
   LOG.warn("Setting the numb

[beam] branch master updated: Merge pull request #27866: Allow writing protos directly to the storage API without conversion

2023-08-10 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ba04f6098f7 Merge pull request #27866: Allow writing protos directly 
to the storage API without conversion
ba04f6098f7 is described below

commit ba04f6098f7472a123c6cd7b8d4a991ce16d1958
Author: Reuven Lax 
AuthorDate: Thu Aug 10 09:15:58 2023 -0700

Merge pull request #27866: Allow writing protos directly to the storage API 
without conversion
---
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |  32 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  96 +
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |   5 +-
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |   8 +-
 .../bigquery/StorageApiDynamicDestinations.java|   3 +
 .../StorageApiDynamicDestinationsBeamRow.java  |   6 +
 ...StorageApiDynamicDestinationsGenericRecord.java |   7 +
 .../StorageApiDynamicDestinationsProto.java| 100 +
 .../StorageApiDynamicDestinationsTableRow.java |   7 +
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  66 +++-
 .../bigquery/StorageApiWritesShardedRecords.java   |  29 +-
 .../io/gcp/bigquery/TableRowToStorageApiProto.java |  83 +++-
 .../sdk/io/gcp/testing/FakeDatasetService.java |   7 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 111 +-
 .../bigquery/StorageApiDirectWriteProtosIT.java| 197 ++
 .../bigquery/TableRowToStorageApiProtoTest.java| 418 -
 16 files changed, 1109 insertions(+), 66 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index b8a7fc760bc..46c25d47e7a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -22,9 +22,9 @@ import com.google.auto.value.AutoValue;
 import com.google.auto.value.extension.memoized.Memoized;
 import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -49,7 +49,7 @@ abstract class AppendClientInfo {
 
   abstract @Nullable String getStreamName();
 
-  abstract Descriptors.Descriptor getDescriptor();
+  abstract DescriptorProtos.DescriptorProto getDescriptor();
 
   @AutoValue.Builder
   abstract static class Builder {
@@ -63,7 +63,7 @@ abstract class AppendClientInfo {
 
 abstract Builder 
setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value);
 
-abstract Builder setDescriptor(Descriptors.Descriptor value);
+abstract Builder setDescriptor(DescriptorProtos.DescriptorProto value);
 
 abstract Builder setStreamName(@Nullable String name);
 
@@ -74,8 +74,8 @@ abstract class AppendClientInfo {
 
   static AppendClientInfo of(
   TableSchema tableSchema,
-  Consumer closeAppendClient,
-  boolean includeCdcColumns)
+  DescriptorProtos.DescriptorProto descriptor,
+  Consumer closeAppendClient)
   throws Exception {
 return new AutoValue_AppendClientInfo.Builder()
 .setTableSchema(tableSchema)
@@ -83,12 +83,22 @@ abstract class AppendClientInfo {
 
.setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema))
 .setSchemaInformation(
 
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema))
-.setDescriptor(
-TableRowToStorageApiProto.getDescriptorFromTableSchema(
-tableSchema, true, includeCdcColumns))
+.setDescriptor(descriptor)
 .build();
   }
 
+  static AppendClientInfo of(
+  TableSchema tableSchema,
+  Consumer closeAppendClient,
+  boolean includeCdcColumns)
+  throws Exception {
+return of(
+tableSchema,
+TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+tableSchema, true, includeCdcColumns),
+closeAppendClient);
+  }
+
   public AppendClientInfo withNoAppendClient() {
 return toBuilder().setStreamAppendClient(null).build();
   }
@@ -149,8 +159,10 @@ abstract class AppendClientInfo {
   public TableRow toTableRow(ByteString protoBytes) {
 try {
   return TableRowToStorageApiProto.tableRowFromMessage(
-  DynamicMessage.parseFrom(getDescriptor(), protoBytes), true);
-} catch (InvalidProtocolBufferException e

[beam] branch master updated (488279c236e -> 1f90261aef4)

2023-08-04 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 488279c236e Bump commons-io to 2.13.0 (#27776)
 add b40ca796d8d propagate fewKeys to hot-key path
 new 1f90261aef4 Merge pull request #27845: propagate fewKeys to hot-key 
path

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/beam/sdk/transforms/Combine.java| 23 +-
 1 file changed, 18 insertions(+), 5 deletions(-)



[beam] 01/01: Merge pull request #27845: propagate fewKeys to hot-key path

2023-08-04 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1f90261aef435d947db5eff30172afa3e7d4fd27
Merge: 488279c236e b40ca796d8d
Author: Reuven Lax 
AuthorDate: Fri Aug 4 00:15:46 2023 -0700

Merge pull request #27845: propagate fewKeys to hot-key path

 .../org/apache/beam/sdk/transforms/Combine.java| 23 +-
 1 file changed, 18 insertions(+), 5 deletions(-)



[beam] branch master updated: Merge pull request #27835: Enable combiner lifting for the Group transform

2023-08-03 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 86426c814db Merge pull request #27835: Enable combiner lifting for the 
Group transform
86426c814db is described below

commit 86426c814db19736f485d7be9a24ac571d4a2b1a
Author: Reuven Lax 
AuthorDate: Thu Aug 3 14:38:38 2023 -0700

Merge pull request #27835: Enable combiner lifting for the Group transform
---
 .../apache/beam/sdk/schemas/transforms/Group.java| 20 +++-
 .../java/org/apache/beam/sdk/transforms/Combine.java |  8 
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
index fe8933d24d5..0115c4900ac 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
@@ -998,6 +998,8 @@ public class Group {
 
 abstract @Nullable Fanout getFanout();
 
+abstract Boolean getFewKeys();
+
 abstract ByFields getByFields();
 
 abstract SchemaAggregateFn.Inner getSchemaAggregateFn();
@@ -1012,6 +1014,8 @@ public class Group {
 abstract static class Builder {
   public abstract Builder setFanout(@Nullable Fanout value);
 
+  public abstract Builder setFewKeys(Boolean fewKeys);
+
   abstract Builder setByFields(ByFields byFields);
 
   abstract Builder setSchemaAggregateFn(SchemaAggregateFn.Inner 
schemaAggregateFn);
@@ -1033,6 +1037,9 @@ public class Group {
   .setSchemaAggregateFn(schemaAggregateFn)
   .setKeyField(keyField)
   .setValueField(valueField)
+  .setFewKeys(
+  true) // We are often selecting only certain fields, so combiner 
lifting usually
+  // helps.
   .build();
 }
 
@@ -1046,6 +1053,17 @@ public class Group {
   return toBuilder().setValueField(valueField).build();
 }
 
+/**
+ * Enable precombining.
+ *
+ * This is on by default. In certain cases (e.g. if there are many 
unique field values and
+ * the combiner's intermediate state is larger than the average row size) 
precombining makes
+ * things worse, in which case it can be turned off.
+ */
+public CombineFieldsByFields withPrecombining(boolean value) {
+  return toBuilder().setFewKeys(value).build();
+}
+
 public CombineFieldsByFields withHotKeyFanout(int n) {
   return toBuilder().setFanout(Fanout.of(n)).build();
 }
@@ -1267,7 +1285,7 @@ public class Group {
 throw new RuntimeException("Unexpected kind: " + fanout.getKind());
 }
   }
-  return Combine.perKey(fn);
+  return getFewKeys() ? Combine.fewKeys(fn) : Combine.perKey(fn);
 }
 
 @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 422450c89d0..ff5a8268ab7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -202,6 +203,13 @@ public class Combine {
 return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/);
   }
 
+  /** Returns a {@link PerKey Combine.PerKey}, and set fewKeys in {@link 
GroupByKey}. */
+  @Internal
+  public static  PerKey fewKeys(
+  GlobalCombineFn fn) {
+return new PerKey<>(fn, displayDataForFn(fn), true /*fewKeys*/);
+  }
+
   /** Returns a {@link PerKey Combine.PerKey}, and set fewKeys in {@link 
GroupByKey}. */
   private static  PerKey fewKeys(
   GlobalCombineFn fn,



[beam] branch master updated (0c15645a561 -> 42cdd61f8a5)

2023-08-02 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 0c15645a561 Explicitly load go license container locally (#27824)
 add 4a8fc73792d override encoding positions for nested schemas
 new 42cdd61f8a5 Merge pull request #27774: Override encoding positions for 
nested schemas

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../util/RowCoderCloudObjectTranslator.java|  7 +---
 .../util/SchemaCoderCloudObjectTranslator.java | 44 --
 2 files changed, 41 insertions(+), 10 deletions(-)



[beam] 01/01: Merge pull request #27774: Override encoding positions for nested schemas

2023-08-02 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 42cdd61f8a522869709f729d4f4b0dcc470263f7
Merge: 0c15645a561 4a8fc73792d
Author: Reuven Lax 
AuthorDate: Wed Aug 2 23:35:19 2023 -0700

Merge pull request #27774: Override encoding positions for nested schemas

 .../util/RowCoderCloudObjectTranslator.java|  7 +---
 .../util/SchemaCoderCloudObjectTranslator.java | 44 --
 2 files changed, 41 insertions(+), 10 deletions(-)



[beam] branch master updated (66f0592f8b4 -> 90f2b3445ff)

2023-07-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 66f0592f8b4 Add flags to build multiarch Go SDK containers and fix Go 
Dataflow ARM github actions test (#27716)
 add 5989a662dc9 populate name map during snapshot restore
 new 90f2b3445ff Merge pull request #27638: Populate name map during 
snapshot restore

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java   | 1 +
 1 file changed, 1 insertion(+)



[beam] 01/01: Merge pull request #27638: Populate name map during snapshot restore

2023-07-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 90f2b3445ff4b0da50ecc022b46f5da90b2183f3
Merge: 66f0592f8b4 5989a662dc9
Author: Reuven Lax 
AuthorDate: Thu Jul 27 19:05:23 2023 -0700

Merge pull request #27638: Populate name map during snapshot restore

 .../src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java   | 1 +
 1 file changed, 1 insertion(+)




[beam] branch master updated (d241eaeeb94 -> 1f4defab42a)

2023-07-26 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from d241eaeeb94 Make all Github Action tests to run at minute 0 of every 6 
hours (#27701)
 add 2e958499e5d lowercase everything
 new 1f4defab42a Merge pull request #27699: Don't make StorageAPI tablerow 
names case-sensitive

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java  | 6 +++---
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java| 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)



[beam] 01/01: Merge pull request #27699: Don't make StorageAPI tablerow names case-sensitive

2023-07-26 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1f4defab42a67cba13154b46ad15a87c63ba3ec9
Merge: d241eaeeb94 2e958499e5d
Author: Reuven Lax 
AuthorDate: Wed Jul 26 15:40:04 2023 -0700

Merge pull request #27699: Don't make StorageAPI tablerow names 
case-sensitive

 .../apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java  | 6 +++---
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java| 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)



[beam] 01/01: Merge pull request #27590: Properly translate TimestampedValueCoder on runnerv1

2023-07-22 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cbe2e0f5ad5e45a7f6d380b1b1ac36d5dede8f54
Merge: 05305ede453 52807b6e1b6
Author: Reuven Lax 
AuthorDate: Sat Jul 22 18:59:04 2023 -0700

Merge pull request #27590: Properly translate TimestampedValueCoder on 
runnerv1

 .../dataflow/util/CloudObjectTranslators.java  | 34 ++
 ...DefaultCoderCloudObjectTranslatorRegistrar.java |  1 +
 .../runners/dataflow/util/CloudObjectsTest.java|  2 ++
 3 files changed, 37 insertions(+)



[beam] branch master updated (05305ede453 -> cbe2e0f5ad5)

2023-07-22 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 05305ede453 Merge pull request #27617: Support withFanout and 
withHotKeyFanout on schema group transform
 add 52807b6e1b6 properly translate TimestampedValueCoder on runnerv1
 new cbe2e0f5ad5 Merge pull request #27590: Properly translate 
TimestampedValueCoder on runnerv1

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dataflow/util/CloudObjectTranslators.java  | 34 ++
 ...DefaultCoderCloudObjectTranslatorRegistrar.java |  1 +
 .../runners/dataflow/util/CloudObjectsTest.java|  2 ++
 3 files changed, 37 insertions(+)



[beam] branch master updated: Merge pull request #27617: Support withFanout and withHotKeyFanout on schema group transform

2023-07-22 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 05305ede453 Merge pull request #27617: Support withFanout and 
withHotKeyFanout on schema group transform
05305ede453 is described below

commit 05305ede45366f158f27fc2b83b9ce00db4df2ab
Author: Reuven Lax 
AuthorDate: Sat Jul 22 10:40:43 2023 -0700

Merge pull request #27617: Support withFanout and withHotKeyFanout on 
schema group transform
---
 .../apache/beam/sdk/schemas/transforms/Group.java  | 162 -
 .../beam/sdk/schemas/transforms/GroupTest.java |  96 +---
 .../sdk/extensions/sql/impl/rel/BeamWindowRel.java |   5 +-
 3 files changed, 201 insertions(+), 62 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
index fb48ceed311..fe8933d24d5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.sdk.schemas.transforms;
 
+import com.google.auto.value.AutoOneOf;
 import com.google.auto.value.AutoValue;
+import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
@@ -34,6 +36,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -42,6 +45,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * A generic grouping transform for schema {@link PCollection}s.
@@ -153,7 +157,7 @@ public class Group {
  */
 public  CombineGlobally aggregate(
 CombineFn combineFn) {
-  return new CombineGlobally<>(combineFn);
+  return new CombineGlobally<>(combineFn, 0);
 }
 
 /**
@@ -169,10 +173,8 @@ public class Group {
   return new CombineFieldsGlobally<>(
   SchemaAggregateFn.create()
   .aggregateFields(
-  FieldAccessDescriptor.withFieldNames(inputFieldName),
-  false,
-  fn,
-  outputFieldName));
+  FieldAccessDescriptor.withFieldNames(inputFieldName), false, 
fn, outputFieldName),
+  0);
 }
 
 public 
@@ -183,7 +185,8 @@ public class Group {
   return new CombineFieldsGlobally<>(
   SchemaAggregateFn.create()
   .aggregateFields(
-  FieldAccessDescriptor.withFieldNames(inputFieldName), true, 
fn, outputFieldName));
+  FieldAccessDescriptor.withFieldNames(inputFieldName), true, 
fn, outputFieldName),
+  0);
 }
 
 /** The same as {@link #aggregateField} but using field id. */
@@ -194,7 +197,8 @@ public class Group {
   return new CombineFieldsGlobally<>(
   SchemaAggregateFn.create()
   .aggregateFields(
-  FieldAccessDescriptor.withFieldIds(inputFieldId), false, fn, 
outputFieldName));
+  FieldAccessDescriptor.withFieldIds(inputFieldId), false, fn, 
outputFieldName),
+  0);
 }
 
 public 
@@ -205,7 +209,8 @@ public class Group {
   return new CombineFieldsGlobally<>(
   SchemaAggregateFn.create()
   .aggregateFields(
-  FieldAccessDescriptor.withFieldIds(inputFieldId), true, fn, 
outputFieldName));
+  FieldAccessDescriptor.withFieldIds(inputFieldId), true, fn, 
outputFieldName),
+  0);
 }
 
 /**
@@ -221,7 +226,8 @@ public class Group {
   return new CombineFieldsGlobally<>(
   SchemaAggregateFn.create()
   .aggregateFields(
-  FieldAccessDescriptor.withFieldNames(inputFieldName), false, 
fn, outputField));
+  FieldAccessDescriptor.withFieldNames(inputFieldName), false, 
fn, outputField),
+  0);
 }
 
 public 
@@ -232,7 +238,8 @@ public class Group {
   return new CombineFieldsGlobally<>(
   SchemaAggregateFn.create()
   .aggregateFields(
-  FieldAccessDescriptor.withFieldNames(inputFieldName), true, 
fn, outputField));
+  

[beam] branch master updated: Merge pull request #26849: add attribute support to writeAvros and writeProtos

2023-06-13 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d998ba3475f Merge pull request #26849: add attribute support to 
writeAvros and writeProtos
d998ba3475f is described below

commit d998ba3475f67f66716d54f784ba46f5efa88cd7
Author: Reuven Lax 
AuthorDate: Tue Jun 13 14:48:35 2023 -0700

Merge pull request #26849: add attribute support to writeAvros and 
writeProtos
---
 .../beam/sdk/io/gcp/pubsub/ExternalWrite.java  | 14 -
 .../sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java  | 12 ++---
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java| 63 ++
 3 files changed, 73 insertions(+), 16 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 4479323d66f..7c63b9023e3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -26,8 +26,10 @@ import 
org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -72,7 +74,7 @@ public final class ExternalWrite implements 
ExternalTransformRegistrar {
 @Override
 public PTransform, PDone> buildExternal(Configuration 
config) {
   PubsubIO.Write.Builder writeBuilder =
-  PubsubIO.Write.newBuilder(new ParsePubsubMessageProtoAsPayload());
+  PubsubIO.Write.newBuilder(new 
ParsePubsubMessageProtoAsPayloadFromWindowedValue());
   if (config.topic != null) {
 StaticValueProvider topic = 
StaticValueProvider.of(config.topic);
 writeBuilder.setTopicProvider(NestedValueProvider.of(topic, 
PubsubTopic::fromPath));
@@ -87,4 +89,14 @@ public final class ExternalWrite implements 
ExternalTransformRegistrar {
   return writeBuilder.build();
 }
   }
+
+  public static class ParsePubsubMessageProtoAsPayloadFromWindowedValue
+  implements SerializableFunction, 
PubsubMessage> {
+static final ParsePubsubMessageProtoAsPayload INNER = new 
ParsePubsubMessageProtoAsPayload();
+
+@Override
+public PubsubMessage apply(ValueInSingleWindow input) {
+  return INNER.apply(input.getValue());
+}
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
index c082b2007aa..26f4fb5d076 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
@@ -38,7 +38,7 @@ public class PreparePubsubWriteDoFn extends 
DoFn
   private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 
6;
   private int maxPublishBatchSize;
 
-  private SerializableFunction formatFunction;
+  private SerializableFunction, PubsubMessage> 
formatFunction;
   @Nullable SerializableFunction, 
PubsubIO.PubsubTopic> topicFunction;
 
   static int validatePubsubMessageSize(PubsubMessage message, int 
maxPublishBatchSize)
@@ -110,7 +110,7 @@ public class PreparePubsubWriteDoFn extends 
DoFn
   }
 
   PreparePubsubWriteDoFn(
-  SerializableFunction formatFunction,
+  SerializableFunction, PubsubMessage> 
formatFunction,
   @Nullable
   SerializableFunction, 
PubsubIO.PubsubTopic> topicFunction,
   int maxPublishBatchSize) {
@@ -126,11 +126,11 @@ public class PreparePubsubWriteDoFn extends 
DoFn
   BoundedWindow window,
   PaneInfo paneInfo,
   OutputReceiver o) {
-PubsubMessage message = formatFunction.apply(element);
+ValueInSingleWindow valueInSingleWindow =
+ValueInSingleWindow.of(element, ts, window, paneInfo);
+PubsubMessage message = formatFunction.apply(valueInSingleWindow);
 if (topicFunction != null) {
-  message =
-  message.withTopic(
-  topicFunction.apply(ValueInSingleWindow.of(element, ts, window, 
paneInfo)).asPath());
+  message = 

[beam] branch master updated (92e33e8e6a8 -> 1e49c512ac1)

2023-06-13 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 92e33e8e6a8 Bump google.golang.org/api from 0.126.0 to 0.127.0 in 
/sdks (#27112)
 add 1e49c512ac1 Merge pull request #26975: Add upsert and delete support 
to BigQueryIO

No new revisions were added by this update.

Summary of changes:
 ...DefaultCoderCloudObjectTranslatorRegistrar.java |   2 +
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |  17 +-
 .../AvroGenericRecordToStorageApiProto.java|  24 +-
 .../io/gcp/bigquery/BeamRowToStorageApiProto.java  |  18 +-
 .../bigquery/BigQueryCoderProviderRegistrar.java   |   4 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   | 111 ++-
 .../beam/sdk/io/gcp/bigquery/RowMutation.java  |  69 
 .../io/gcp/bigquery/RowMutationInformation.java|  46 +++
 .../beam/sdk/io/gcp/bigquery/StorageApiCDC.java}   |  26 +-
 .../io/gcp/bigquery/StorageApiConvertMessages.java |  26 +-
 .../bigquery/StorageApiDynamicDestinations.java|   7 +-
 .../StorageApiDynamicDestinationsBeamRow.java  |  39 ++-
 ...StorageApiDynamicDestinationsGenericRecord.java |  38 ++-
 .../StorageApiDynamicDestinationsTableRow.java |  40 ++-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  26 +-
 .../StorageApiWriteRecordsInconsistent.java|   8 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  62 +++-
 .../bigquery/StorageApiWritesShardedRecords.java   |   7 +-
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 106 +--
 .../beam/sdk/io/gcp/testing/BigqueryClient.java|   4 +
 .../sdk/io/gcp/testing/FakeDatasetService.java | 100 +-
 .../beam/sdk/io/gcp/testing/TableContainer.java| 120 ++-
 .../AvroGenericRecordToStorageApiProtoTest.java|  43 ++-
 .../gcp/bigquery/BeamRowToStorageApiProtoTest.java |  34 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 353 +
 .../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 161 ++
 .../bigquery/TableRowToStorageApiProtoTest.java|  86 +++--
 27 files changed, 1407 insertions(+), 170 deletions(-)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java
 copy 
sdks/java/{extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryPerfTableProvider.java
 => 
io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java}
 (70%)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java



[beam] branch master updated: Merge pull request #27047: Enable pubsub dynamic destinations by default

2023-06-07 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new c3b330c66c6 Merge pull request #27047: Enable pubsub dynamic 
destinations by default
c3b330c66c6 is described below

commit c3b330c66c6e098a3a3a9505d3b4eaddc9ccb9c0
Author: Reuven Lax 
AuthorDate: Wed Jun 7 14:45:18 2023 -0700

Merge pull request #27047: Enable pubsub dynamic destinations by default
---
 .../java/org/apache/beam/runners/dataflow/DataflowRunner.java| 9 +
 .../beam/runners/dataflow/options/DataflowPipelineOptions.java   | 7 ---
 .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 1 -
 3 files changed, 1 insertion(+), 16 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2951b48bab4..60ebe8d8846 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1916,14 +1916,7 @@ public class DataflowRunner extends 
PipelineRunner {
   ((NestedValueProvider) 
overriddenTransform.getTopicProvider()).propertyName());
 }
   } else {
-DataflowPipelineOptions options =
-input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
-if (options.getEnableDynamicPubsubDestinations()) {
-  stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, 
true);
-} else {
-  throw new RuntimeException(
-  "Dynamic Pubsub destinations not yet supported. Topic must be 
set.");
-}
+stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true);
   }
   if (overriddenTransform.getTimestampAttribute() != null) {
 stepContext.addInput(
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index bf1027cdd3a..f87af28ca61 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -153,13 +153,6 @@ public interface DataflowPipelineOptions
 
   void setDataflowWorkerJar(String dataflowWorkerJafr);
 
-  // Disable this support for now until the Dataflow backend fully supports 
this option.
-  @Description("Whether to allow dynamic pubsub destinations. Temporary 
option: will be removed.")
-  @Default.Boolean(false)
-  Boolean getEnableDynamicPubsubDestinations();
-
-  void setEnableDynamicPubsubDestinations(Boolean enable);
-
   /** Set of available Flexible Resource Scheduling goals. */
   enum FlexResourceSchedulingGoal {
 /** No goal specified. */
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d16afeb91e0..84f1069f592 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -2505,7 +2505,6 @@ public class DataflowRunnerTest implements Serializable {
 PipelineOptions options = buildPipelineOptions();
 DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
 dataflowOptions.setStreaming(true);
-dataflowOptions.setEnableDynamicPubsubDestinations(true);
 Pipeline p = Pipeline.create(options);
 
 List testValues =



[beam] branch master updated: Merge pull request #26919: warn if BigQuery failed rows collection is not processed

2023-05-28 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new fbeae980e93 Merge pull request #26919: warn if BigQuery failed rows 
collection is not processed
fbeae980e93 is described below

commit fbeae980e93ea64fc8cc3ad074cbc8aebd157691
Author: Reuven Lax 
AuthorDate: Sun May 28 20:06:27 2023 -0700

Merge pull request #26919: warn if BigQuery failed rows collection is not 
processed
---
 .../beam/runners/dataflow/DataflowRunner.java  | 56 ++
 .../beam/runners/dataflow/DataflowRunnerTest.java  | 89 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  4 +-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  5 ++
 .../sdk/io/gcp/bigquery/StreamingWriteTables.java  |  8 +-
 5 files changed, 157 insertions(+), 5 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a2f6298ab10..2951b48bab4 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -105,6 +105,8 @@ import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.WriteFilesResult;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.bigquery.StorageApiLoads;
+import org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteTables;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -169,6 +171,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Utf8;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
@@ -1069,6 +1072,7 @@ public class DataflowRunner extends 
PipelineRunner {
 }
 
 logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
+logWarningIfBigqueryDLQUnused(pipeline);
 if (shouldActAsStreaming(pipeline)) {
   options.setStreaming(true);
 
@@ -1569,6 +1573,58 @@ public class DataflowRunner extends 
PipelineRunner {
 
   /
 
+  private void logWarningIfBigqueryDLQUnused(Pipeline pipeline) {
+Map, String> unconsumedDLQ = Maps.newHashMap();
+pipeline.traverseTopologically(
+new PipelineVisitor.Defaults() {
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+PTransform transform = node.getTransform();
+if (transform != null) {
+  TupleTag failedTag = null;
+  String rootBigQueryTransform = "";
+  if (transform.getClass().equals(StorageApiLoads.class)) {
+StorageApiLoads storageLoads = (StorageApiLoads) 
transform;
+failedTag = storageLoads.getFailedRowsTag();
+// For storage API the transform that outputs failed rows is 
nested one layer below
+// BigQueryIO.
+rootBigQueryTransform = node.getEnclosingNode().getFullName();
+  } else if 
(transform.getClass().equals(StreamingWriteTables.class)) {
+StreamingWriteTables streamingInserts = 
(StreamingWriteTables) transform;
+failedTag = streamingInserts.getFailedRowsTupleTag();
+// For streaming inserts the transform that outputs failed 
rows is nested two layers
+// below BigQueryIO.
+rootBigQueryTransform = 
node.getEnclosingNode().getEnclosingNode().getFullName();
+  }
+  if (failedTag != null) {
+PCollection dlq = node.getOutputs().get(failedTag);
+if (dlq != null) {
+  unconsumedDLQ.put(dlq, rootBigQueryTransform);
+}
+  }
+}
+
+for (PCollection input : node.getInputs().values()) {
+  unconsumedDLQ.remove(input);
+}
+return CompositeBehavior.EN

[beam] branch master updated (05c5b1b5bb7 -> d91ea8e4491)

2023-05-24 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 05c5b1b5bb7 minor fix introduction (#25587)
 add d91ea8e4491 Merge pull request #26878: make sure that we capture all 
writes to the failed records in a metric

No new revisions were added by this update.

Summary of changes:
 .../io/gcp/bigquery/StorageApiWriteUnshardedRecords.java|  2 ++
 .../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java | 13 -
 2 files changed, 10 insertions(+), 5 deletions(-)



[beam] branch master updated: Merge pull request #25113: #25112 No longer use GetTable to implement CREATE_IF_NEEDED to avoid low quotas

2023-05-22 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 2ee12027627 Merge pull request #25113: #25112 No longer use GetTable 
to implement CREATE_IF_NEEDED to avoid low quotas
2ee12027627 is described below

commit 2ee12027627dd842c52053bec833e0c56d9ebf51
Author: Reuven Lax 
AuthorDate: Mon May 22 15:34:53 2023 -0700

Merge pull request #25113: #25112 No longer use GetTable to implement 
CREATE_IF_NEEDED to avoid low quotas
---
 .../io/gcp/bigquery/CreateTableDestinations.java   | 128 
 .../sdk/io/gcp/bigquery/CreateTableHelpers.java|  50 ++-
 .../beam/sdk/io/gcp/bigquery/CreateTables.java |   2 +-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  77 +-
 .../StorageApiWriteRecordsInconsistent.java|  12 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 165 ++---
 .../bigquery/StorageApiWritesShardedRecords.java   |  47 +-
 .../sdk/io/gcp/testing/FakeDatasetService.java |  42 --
 .../bigquery/StorageApiSinkCreateIfNeededIT.java   | 140 +
 9 files changed, 404 insertions(+), 259 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java
deleted file mode 100644
index 0c2babdeef8..000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.services.bigquery.model.TableSchema;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
-import org.checkerframework.checker.nullness.qual.Nullable;
-
-/**
- * Creates any tables needed before performing writes to the tables. This is a 
side-effect {@link
- * DoFn}, and returns the original collection unchanged.
- */
-public class CreateTableDestinations
-extends PTransform<
-PCollection>, PCollection>> {
-  private final CreateDisposition createDisposition;
-  private final BigQueryServices bqServices;
-  private final DynamicDestinations dynamicDestinations;
-  private final @Nullable String kmsKey;
-
-  public CreateTableDestinations(
-  CreateDisposition createDisposition,
-  DynamicDestinations dynamicDestinations) {
-this(createDisposition, new BigQueryServicesImpl(), dynamicDestinations, 
null);
-  }
-
-  public CreateTableDestinations(
-  CreateDisposition createDisposition,
-  BigQueryServices bqServices,
-  DynamicDestinations dynamicDestinations,
-  @Nullable String kmsKey) {
-this.createDisposition = createDisposition;
-this.bqServices = bqServices;
-this.dynamicDestinations = dynamicDestinations;
-this.kmsKey = kmsKey;
-  }
-
-  CreateTableDestinations withKmsKey(String kmsKey) {
-return new CreateTableDestinations<>(
-createDisposition, bqServices, dynamicDestinations, kmsKey);
-  }
-
-  CreateTableDestinations 
withTestServices(BigQueryServices bqServices) {
-return new CreateTableDestinations<>(
-createDisposition, bqServices, dynam

[beam] branch master updated: Merge pull request #26794: #26789 Fix auto schema update when schema order has changed.

2023-05-21 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ff166391e2b Merge pull request #26794: #26789 Fix auto schema update 
when schema order has changed.
ff166391e2b is described below

commit ff166391e2bcada4d846b5d74b1168dd248ab4ff
Author: Reuven Lax 
AuthorDate: Sun May 21 14:18:46 2023 -0700

Merge pull request #26794: #26789 Fix auto schema update when schema order 
has changed.
---
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  19 +-
 .../bigquery/StorageApiWritesShardedRecords.java   |  22 +-
 .../io/gcp/bigquery/TableSchemaUpdateUtils.java| 122 ++
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |   8 +-
 .../gcp/bigquery/TableSchemaUpdateUtilsTest.java   | 253 +
 5 files changed, 409 insertions(+), 15 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index f9f9b1b0b92..46b542a84a5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -685,16 +686,22 @@ public class 
StorageApiWriteUnshardedRecords
 
   void postFlush() {
 // If we got a response indicating an updated schema, recreate the 
client.
-if (this.appendClientInfo != null) {
+if (this.appendClientInfo != null && autoUpdateSchema) {
   @Nullable
   StreamAppendClient streamAppendClient = 
appendClientInfo.getStreamAppendClient();
   @Nullable
-  TableSchema updatedTableSchema =
+  TableSchema updatedTableSchemaReturned =
   (streamAppendClient != null) ? 
streamAppendClient.getUpdatedSchema() : null;
-  if (updatedTableSchema != null && autoUpdateSchema) {
-invalidateWriteStream();
-appendClientInfo =
-Preconditions.checkStateNotNull(getAppendClientInfo(false, 
updatedTableSchema));
+  if (updatedTableSchemaReturned != null) {
+Optional updatedTableSchema =
+TableSchemaUpdateUtils.getUpdatedSchema(
+this.initialTableSchema, updatedTableSchemaReturned);
+if (updatedTableSchema.isPresent()) {
+  invalidateWriteStream();
+  appendClientInfo =
+  Preconditions.checkStateNotNull(
+  getAppendClientInfo(false, updatedTableSchema.get()));
+}
   }
 }
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index cd23b7be2c5..e0353bf9a90 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -36,6 +36,7 @@ import java.time.Instant;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -751,16 +752,23 @@ public class StorageApiWritesShardedRecords newSchema =
+TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, 
updatedSchemaReturned);
+if (newSchema.isPresent()) {
+  appendClientInfo.set(
+  AppendClientInfo.of(
+  newSchema.get(), 
appendClientInfo.get().getCloseAppendClient()));
+  APPEND_CLIENTS.invalidate(element.getKey());
+  APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
+  updatedSchema.write(newSchema.get());
+}
   }
 }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtils.java
new file mode 100644
index 000..cba394abc89
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk

[beam] 01/01: Merge pull request #26752: Only use updated schema if auto update is enabled

2023-05-18 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 809d4b3efcc84babeb8b9d206bf1032ccfa070f2
Merge: f2400c828c3 11de4424aa2
Author: Reuven Lax 
AuthorDate: Thu May 18 22:42:07 2023 -0700

Merge pull request #26752: Only use updated schema if auto update is enabled

 .../beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[beam] branch master updated (f2400c828c3 -> 809d4b3efcc)

2023-05-18 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from f2400c828c3 Don't fail if creating a venv didn't succeed. Also provide 
a way to disable creating separate venvs. (#26753)
 add 11de4424aa2 Only use updated schema if auto update is enabled
 new 809d4b3efcc Merge pull request #26752: Only use updated schema if auto 
update is enabled

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[beam] branch master updated (14c24f92d95 -> 85d90fbd757)

2023-05-16 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 14c24f92d95 Update cloud profile download to use curl instead of wget 
(#26720)
 add 85d90fbd757 Merge pull request #26707: Properly tag test so it is 
excluded on unsupported runners

No new revisions were added by this update.

Summary of changes:
 runners/portability/java/build.gradle  | 1 +
 .../core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java| 3 ++-
 2 files changed, 3 insertions(+), 1 deletion(-)



[beam] branch master updated (69367347a41 -> 9b1448bcce4)

2023-05-12 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 69367347a41 Capture build scans on ge.apache.org to benefit from deep 
build insights (#26351)
 add 9b1448bcce4 Merge pull request #25940: Fix triggered side inputs

No new revisions were added by this update.

Summary of changes:
 .../runners/direct/DirectWriteViewVisitor.java |  28 +++--
 runners/flink/flink_runner.gradle  |   1 +
 runners/flink/job-server/flink_job_server.gradle   |   1 +
 .../runners/dataflow/StreamingViewOverrides.java   |  22 +++-
 runners/jet/build.gradle   |   3 +
 runners/samza/build.gradle |   1 +
 runners/samza/job-server/build.gradle  |   3 +-
 runners/spark/job-server/spark_job_server.gradle   |   1 +
 runners/spark/spark_runner.gradle  |   3 +
 runners/twister2/build.gradle  |   1 +
 .../org/apache/beam/sdk/io/GenerateSequence.java   |   2 +-
 ...reMessage.java => UsesTriggeredSideInputs.java} |   6 +-
 .../java/org/apache/beam/sdk/transforms/View.java  |   1 -
 .../apache/beam/sdk/values/PCollectionViews.java   |   8 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |   8 +-
 .../org/apache/beam/sdk/transforms/ViewTest.java   | 114 +
 16 files changed, 164 insertions(+), 39 deletions(-)
 copy 
sdks/java/core/src/main/java/org/apache/beam/sdk/testing/{UsesFailureMessage.java
 => UsesTriggeredSideInputs.java} (80%)



[beam] branch master updated: Merge pull request #26503: fix dataloss bug in batch Storage API sink.

2023-05-02 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 57ee20902dd Merge pull request #26503: fix dataloss bug in batch 
Storage API sink.
57ee20902dd is described below

commit 57ee20902dd1be275dc4cf7a752dff3e59db1b31
Author: Reuven Lax 
AuthorDate: Tue May 2 15:31:24 2023 -0700

Merge pull request #26503: fix dataloss bug in batch Storage API sink.
---
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 67 ++
 1 file changed, 55 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index f0015cddc38..3c082752449 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -31,11 +31,13 @@ import 
com.google.cloud.bigquery.storage.v1.WriteStream.Type;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.InvalidProtocolBufferException;
+import io.grpc.Status;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -224,11 +226,14 @@ public class 
StorageApiWriteUnshardedRecords
   ProtoRows protoRows;
   List timestamps;
 
+  int failureCount;
+
   public AppendRowsContext(
   long offset, ProtoRows protoRows, List 
timestamps) {
 this.offset = offset;
 this.protoRows = protoRows;
 this.timestamps = timestamps;
+this.failureCount = 0;
   }
 }
 
@@ -301,17 +306,20 @@ public class 
StorageApiWriteUnshardedRecords
   }
 
   String getOrCreateStreamName() {
-try {
-  if (!useDefaultStream) {
-this.streamName =
-Preconditions.checkStateNotNull(maybeDatasetService)
-.createWriteStream(tableUrn, Type.PENDING)
-.getName();
-  } else {
-this.streamName = getDefaultStreamName();
+if (Strings.isNullOrEmpty(this.streamName)) {
+  try {
+if (!useDefaultStream) {
+  this.streamName =
+  Preconditions.checkStateNotNull(maybeDatasetService)
+  .createWriteStream(tableUrn, Type.PENDING)
+  .getName();
+  this.currentOffset = 0;
+} else {
+  this.streamName = getDefaultStreamName();
+}
+  } catch (Exception e) {
+throw new RuntimeException(e);
   }
-} catch (Exception e) {
-  throw new RuntimeException(e);
 }
 return this.streamName;
   }
@@ -376,7 +384,6 @@ public class StorageApiWriteUnshardedRecords
   // This pin is "owned" by the current DoFn.
   
Preconditions.checkStateNotNull(newAppendClientInfo.getStreamAppendClient()).pin();
 }
-this.currentOffset = 0;
 nextCacheTickle = 
Instant.now().plus(java.time.Duration.ofMinutes(1));
 this.appendClientInfo = newAppendClientInfo;
   }
@@ -507,6 +514,7 @@ public class StorageApiWriteUnshardedRecords
 
 long offset = -1;
 if (!this.useDefaultStream) {
+  getOrCreateStreamName(); // Force creation of the stream before we 
get offsets.
   offset = this.currentOffset;
   this.currentOffset += inserts.getSerializedRowsCount();
 }
@@ -598,7 +606,42 @@ public class StorageApiWriteUnshardedRecords
   streamName,
   clientNumber,
   retrieveErrorDetails(contexts));
+  failedContext.failureCount += 1;
+
+  // Maximum number of times we retry before we fail the work item.
+  if (failedContext.failureCount > 5) {
+throw new RuntimeException("More than 5 attempts to call 
AppendRows failed.");
+  }
+
+  // The following errors are known to be persistent, so always 
fail the work item in
+  // this case.
+  Throwable error = 
Preconditions.checkStateNotNull(failedContext.getError());
+  Status.Code statusCode = Status.fromThrowable(error).getCode();
+  if (statusCode.equals(Status.Code.OUT_OF_RANGE)
+  || statusCode.equals(Status.Code.ALREADY_EXISTS)) {
+  

[beam] branch master updated: Merge pull request #26063: #21431 Pubsub dynamic topic destinations

2023-04-21 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f549fd33abd Merge pull request #26063: #21431 Pubsub dynamic topic 
destinations
f549fd33abd is described below

commit f549fd33abdc672143ccbe3f0f66104995d30fe6
Author: Reuven Lax 
AuthorDate: Fri Apr 21 21:01:11 2023 -0700

Merge pull request #26063: #21431 Pubsub dynamic topic destinations
---
 .../core/construction/PTransformTranslation.java   |   2 +
 .../beam/runners/dataflow/DataflowRunner.java  |  26 +-
 .../dataflow/options/DataflowPipelineOptions.java  |   9 +-
 .../beam/runners/dataflow/util/PropertyNames.java  |   4 +
 .../beam/runners/dataflow/DataflowRunnerTest.java  |  74 +
 .../runners/dataflow/worker/PubsubDynamicSink.java | 163 ++
 .../dataflow/worker/PubsubDynamicSinkTest.java | 163 ++
 .../beam/sdk/io/gcp/pubsub/ExternalWrite.java  |   1 +
 .../sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java  | 142 
 .../io/gcp/pubsub/PubSubPayloadTranslation.java|  39 ++-
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java   |  19 +-
 .../beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java   |   4 +-
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java| 357 -
 .../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java   |  13 +-
 .../beam/sdk/io/gcp/pubsub/PubsubMessage.java  |  27 +-
 .../io/gcp/pubsub/PubsubMessageWithTopicCoder.java |  72 +
 .../beam/sdk/io/gcp/pubsub/PubsubTestClient.java   |  27 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 194 ---
 .../io/gcp/pubsub/PreparePubsubWriteDoFnTest.java  | 135 
 .../pubsub/PubSubWritePayloadTranslationTest.java  |  41 +++
 .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java|   3 +-
 .../sdk/io/gcp/pubsub/PubsubIOExternalTest.java|   4 +-
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java   | 183 +--
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java|   9 +-
 .../sdk/io/gcp/pubsub/PubsubTestClientTest.java|   9 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java |  82 -
 26 files changed, 1472 insertions(+), 330 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index e701ae60bb5..485350715c9 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -107,6 +107,8 @@ public class PTransformTranslation {
   public static final String PUBSUB_READ = "beam:transform:pubsub_read:v1";
   public static final String PUBSUB_WRITE = "beam:transform:pubsub_write:v1";
 
+  public static final String PUBSUB_WRITE_DYNAMIC = 
"beam:transform:pubsub_write:v2";
+
   // CombineComponents
   public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN =
   "beam:transform:combine_per_key_precombine:v1";
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index dfa3d37a400..2c24df1852b 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1797,8 +1797,7 @@ public class DataflowRunner extends 
PipelineRunner {
* Suppress application of {@link PubsubUnboundedSink#expand} in streaming 
mode so that we can
* instead defer to Windmill's implementation.
*/
-  private static class StreamingPubsubIOWrite
-  extends PTransform, PDone> {
+  static class StreamingPubsubIOWrite extends 
PTransform, PDone> {
 
 private final PubsubUnboundedSink transform;
 
@@ -1850,13 +1849,24 @@ public class DataflowRunner extends 
PipelineRunner {
 StepTranslationContext stepContext,
 PCollection input) {
   stepContext.addInput(PropertyNames.FORMAT, "pubsub");
-  if (overriddenTransform.getTopicProvider().isAccessible()) {
-stepContext.addInput(
-PropertyNames.PUBSUB_TOPIC, 
overriddenTransform.getTopic().getFullPath());
+  if (overriddenTransform.getTopicProvider() != null) {
+if (overriddenTransform.getTopicProvider().isAccessible()) {
+  stepContext.addInput(
+  PropertyNames.PUBSUB_TOPIC, 
overriddenTransform.getTopic().getFullPath());
+} else {
+  stepContext.addInput(
+  PropertyName

[beam] branch master updated: Merge pull request #26284: Fix GroupIntoBatches hold

2023-04-14 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f5f7a471321 Merge pull request #26284: Fix GroupIntoBatches hold
f5f7a471321 is described below

commit f5f7a471321c903174b452a1982c5183a79ac6bc
Author: Reuven Lax 
AuthorDate: Fri Apr 14 19:24:25 2023 -0700

Merge pull request #26284: Fix GroupIntoBatches hold
---
 .../java/org/apache/beam/sdk/transforms/GroupIntoBatches.java | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 311a3dac6ca..3616cc2e59f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -580,6 +580,7 @@ public class GroupIntoBatches
 timerTs,
 minBufferedTs);
 bufferingTimer.clear();
+holdTimer.clear();
   }
 }
 
@@ -593,13 +594,18 @@ public class GroupIntoBatches
 @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState 
storedBatchSizeBytes,
 @StateId(TIMER_TIMESTAMP) ValueState timerTs,
 @StateId(MIN_BUFFERED_TS) CombiningState 
minBufferedTs,
-@TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer) {
+@TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer,
+@TimerId(TIMER_HOLD_ID) Timer holdTimer) {
   LOG.debug(
   "*** END OF BUFFERING *** for timer timestamp {} with buffering 
duration {}",
   timestamp,
   maxBufferingDuration);
   flushBatch(
   receiver, key, batch, storedBatchSize, storedBatchSizeBytes, 
timerTs, minBufferedTs);
+  // Generally this is a noop, since holdTimer is not set if 
bufferingTimer is set. However we
+  // delete the holdTimer
+  // here in order to allow users to modify this policy on pipeline update.
+  holdTimer.clear();
 }
 
 @OnWindowExpiration



[beam] branch master updated (5a9ab685d9b -> 241e40f2e9e)

2023-04-13 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 5a9ab685d9b Extract BundleManager to an Interface in SamzaRunner 
(#26268)
 add b6d8ac3d3a3 Optimize counters by reducing allocations - use forEach 
instead of requiring Iterator/UnmodifiableIterator/Iterables.concat overhead - 
optimize shortId cache hashing by just using MetricName as stepName is fixed - 
reduce MetricKey objects - avoid MetricsUpdate construction in 
getMonitoringData by directly processing maps
 new 241e40f2e9e Merge pull request #25930: Optimize counters by reducing 
allocations

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runners/core/metrics/MetricsContainerImpl.java | 129 ++---
 .../core/metrics/MetricsContainerStepMap.java  |  57 +
 .../beam/runners/core/metrics/MetricsMap.java  |  15 +++
 3 files changed, 102 insertions(+), 99 deletions(-)



[beam] 01/01: Merge pull request #25930: Optimize counters by reducing allocations

2023-04-13 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 241e40f2e9e53470f675e33eb6511e67d506e1e8
Merge: 5a9ab685d9b b6d8ac3d3a3
Author: Reuven Lax 
AuthorDate: Thu Apr 13 17:48:17 2023 -0700

Merge pull request #25930: Optimize counters by reducing allocations

 .../runners/core/metrics/MetricsContainerImpl.java | 129 ++---
 .../core/metrics/MetricsContainerStepMap.java  |  57 +
 .../beam/runners/core/metrics/MetricsMap.java  |  15 +++
 3 files changed, 102 insertions(+), 99 deletions(-)



[beam] branch master updated: Merge pull request #25723: #25722 Add option to propagate successful storage-api writes

2023-03-21 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ca0787642a6 Merge pull request #25723: #25722 Add option to propagate 
successful storage-api writes
ca0787642a6 is described below

commit ca0787642a6b3804a742326147281c99ae8d08d2
Author: Reuven Lax 
AuthorDate: Tue Mar 21 15:57:38 2023 -0700

Merge pull request #25723: #25722 Add option to propagate successful 
storage-api writes
---
 .../beam/sdk/transforms/GroupIntoBatches.java  |  64 ---
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java   |   2 +
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  26 -
 .../sdk/io/gcp/bigquery/SplittingIterable.java |  46 ++--
 .../io/gcp/bigquery/StorageApiConvertMessages.java |   5 +-
 .../StorageApiDynamicDestinationsBeamRow.java  |   4 +-
 ...StorageApiDynamicDestinationsGenericRecord.java |   4 +-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  78 +++--
 .../io/gcp/bigquery/StorageApiWritePayload.java|  26 -
 .../StorageApiWriteRecordsInconsistent.java|  18 ++-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 123 +
 .../bigquery/StorageApiWritesShardedRecords.java   |  73 +---
 .../sdk/io/gcp/bigquery/StreamingWriteTables.java  |   2 +
 .../beam/sdk/io/gcp/bigquery/WriteResult.java  |  40 ++-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |   6 +
 15 files changed, 434 insertions(+), 83 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 78ce549c3b0..311a3dac6ca 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -329,7 +329,7 @@ public class GroupIntoBatches
 
   @Override
   public PCollection>> expand(PCollection> input) {
-
+Duration allowedLateness = 
input.getWindowingStrategy().getAllowedLateness();
 checkArgument(
 input.getCoder() instanceof KvCoder,
 "coder specified in the input PCollection is not a KvCoder");
@@ -344,6 +344,7 @@ public class GroupIntoBatches
 params.getBatchSizeBytes(),
 weigher,
 params.getMaxBufferingDuration(),
+allowedLateness,
 valueCoder)));
   }
 
@@ -357,12 +358,20 @@ public class GroupIntoBatches
 @Nullable private final SerializableFunction weigher;
 private final Duration maxBufferingDuration;
 
+private final Duration allowedLateness;
+
 // The following timer is no longer set. We maintain the spec for update 
compatibility.
 private static final String END_OF_WINDOW_ID = "endOFWindow";
 
 @TimerId(END_OF_WINDOW_ID)
 private final TimerSpec windowTimer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
+// This timer manages the watermark hold if there is no buffering timer.
+private static final String TIMER_HOLD_ID = "watermarkHold";
+
+@TimerId(TIMER_HOLD_ID)
+private final TimerSpec holdTimerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
 // This timer expires when it's time to batch and output the buffered data.
 private static final String END_OF_BUFFERING_ID = "endOfBuffering";
 
@@ -410,11 +419,13 @@ public class GroupIntoBatches
 long batchSizeBytes,
 @Nullable SerializableFunction weigher,
 Duration maxBufferingDuration,
+Duration allowedLateness,
 Coder inputValueCoder) {
   this.batchSize = batchSize;
   this.batchSizeBytes = batchSizeBytes;
   this.weigher = weigher;
   this.maxBufferingDuration = maxBufferingDuration;
+  this.allowedLateness = allowedLateness;
   this.batchSpec = StateSpecs.bag(inputValueCoder);
 
   Combine.BinaryCombineLongFn sumCombineFn =
@@ -452,9 +463,18 @@ public class GroupIntoBatches
   this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : 
(batchSize / 5);
 }
 
+@Override
+public Duration getAllowedTimestampSkew() {
+  // This is required since flush is sometimes called from processElement. 
This is safe because
+  // a watermark hold
+  // will always be set using timer.withOutputTimestamp.
+  return Duration.millis(Long.MAX_VALUE);
+}
+
 @ProcessElement
 public void processElement(
 @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer,
+@TimerId(TIMER_HOLD_ID) Timer holdTimer,
 @StateId(BATCH_ID) BagState batch,
 @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState 
storedBatchSize,
 @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState 
storedBatchSize

[beam] branch master updated (04c2de61e5d -> 4189320e49e)

2023-03-14 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 04c2de61e5d Fix OutputSampler's coder. (#25805)
 add dafc0e0 [BEAM-14307] Re-Fix Slow Side input pattern bug in sample
 add 4189320e49e Merge pull request #25829: [BEAM-14307] Re-Fix Slow Side 
input pattern bug in sample

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/beam/examples/snippets/Snippets.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[beam] branch master updated: Merge pull request #25804: Reuse client when constructing StreamWriter

2023-03-11 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 227a846dc2d Merge pull request #25804: Reuse client when constructing 
StreamWriter
227a846dc2d is described below

commit 227a846dc2dd0b78129739d0501d3dae31b17484
Author: Reuven Lax 
AuthorDate: Sat Mar 11 09:55:13 2023 -0800

Merge pull request #25804: Reuse client when constructing StreamWriter
---
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 11 ++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 3687a163d76..4e2e7aec0ec 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1338,7 +1338,7 @@ class BigQueryServicesImpl implements BigQueryServices {
   .build();
 
   StreamWriter streamWriter =
-  StreamWriter.newBuilder(streamName)
+  StreamWriter.newBuilder(streamName, newWriteClient)
   .setExecutorProvider(
   FixedExecutorProvider.create(
   
options.as(ExecutorOptions.class).getScheduledExecutorService()))
@@ -1514,9 +1514,18 @@ class BigQueryServicesImpl implements BigQueryServices {
 
   private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions 
options) {
 try {
+  TransportChannelProvider transportChannelProvider =
+  BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
+  .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
+  .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
+  .setKeepAliveWithoutCalls(true)
+  .setChannelsPerCpu(2)
+  .build();
+
   return BigQueryWriteClient.create(
   BigQueryWriteSettings.newBuilder()
   .setCredentialsProvider(() -> 
options.as(GcpOptions.class).getGcpCredential())
+  .setTransportChannelProvider(transportChannelProvider)
   .setBackgroundExecutorProvider(
   FixedExecutorProvider.create(
   
options.as(ExecutorOptions.class).getScheduledExecutorService()))



[beam] branch master updated: Merge pull request #25790: update to new BOM

2023-03-11 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new fb040ff9c10 Merge pull request #25790: update to new BOM
fb040ff9c10 is described below

commit fb040ff9c1059f97a29096597b42e864d8c87a82
Author: Reuven Lax 
AuthorDate: Sat Mar 11 08:53:21 2023 -0800

Merge pull request #25790: update to new BOM
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy | 10 +++
 .../container/license_scripts/dep_urls_java.yaml   |  2 +-
 .../io/gcp/pubsublite/internal/PubsubLiteSink.java |  3 +--
 .../pubsublite/internal/SubscribeTransform.java|  6 +
 .../pubsublite/internal/PubsubLiteSinkTest.java| 31 --
 5 files changed, 25 insertions(+), 27 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 8c4071b70c7..7e8ef20953e 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -466,15 +466,15 @@ class BeamModulePlugin implements Plugin {
 def dbcp2_version = "2.8.0"
 def errorprone_version = "2.10.0"
 // Try to keep gax_version consistent with gax-grpc version in 
google_cloud_platform_libraries_bom
-def gax_version = "2.23.0"
+def gax_version = "2.23.2"
 def google_clients_version = "2.0.0"
 def google_cloud_bigdataoss_version = "2.2.6"
 // Try to keep google_cloud_spanner_version consistent with 
google_cloud_spanner_bom in google_cloud_platform_libraries_bom
-def google_cloud_spanner_version = "6.36.0"
+def google_cloud_spanner_version = "6.37.0"
 def google_code_gson_version = "2.9.1"
 def google_oauth_clients_version = "1.34.1"
 // Try to keep grpc_version consistent with gRPC version in 
google_cloud_platform_libraries_bom
-def grpc_version = "1.52.1"
+def grpc_version = "1.53.0"
 def guava_version = "31.1-jre"
 def hadoop_version = "2.10.2"
 def hamcrest_version = "2.1"
@@ -609,9 +609,9 @@ class BeamModulePlugin implements Plugin {
 google_cloud_pubsub : 
"com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom 
sets version
 google_cloud_pubsublite : 
"com.google.cloud:google-cloud-pubsublite",  // 
google_cloud_platform_libraries_bom sets version
 // The release notes shows the versions set by the BOM:
-// https://github.com/googleapis/java-cloud-bom/releases/tag/v26.8.0
+// https://github.com/googleapis/java-cloud-bom/releases/tag/v26.10.0
 // Update libraries-bom version on 
sdks/java/container/license_scripts/dep_urls_java.yaml
-google_cloud_platform_libraries_bom : 
"com.google.cloud:libraries-bom:26.8.0",
+google_cloud_platform_libraries_bom : 
"com.google.cloud:libraries-bom:26.10.0",
 google_cloud_spanner: 
"com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom 
sets version
 google_cloud_spanner_test   : 
"com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
 google_code_gson: 
"com.google.code.gson:gson:$google_code_gson_version",
diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml 
b/sdks/java/container/license_scripts/dep_urls_java.yaml
index 2a46fd84477..acf8a6bef23 100644
--- a/sdks/java/container/license_scripts/dep_urls_java.yaml
+++ b/sdks/java/container/license_scripts/dep_urls_java.yaml
@@ -42,7 +42,7 @@ jaxen:
   '1.1.6':
 type: "3-Clause BSD"
 libraries-bom:
-  '26.8.0':
+  '26.10.0':
 license: 
"https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE";
 type: "Apache License 2.0"
 paranamer:
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
index f22a3204021..dbf312ae5a4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
@@ -22,7 +22,6 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import com.goo

[beam] branch master updated (4bbd7b89944 -> e4ddb861756)

2023-03-07 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 4bbd7b89944 Bump golang.org/x/net from 0.7.0 to 0.8.0 in /sdks (#25729)
 add be4cbba4b1d add logging
 add 237c088cf9a add logging
 add e4ddb861756 Merge pull request #25744: Add more logging to Storage API 
writes

No new revisions were added by this update.

Summary of changes:
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  4 ++--
 .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java   |  8 +++-
 .../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java| 10 +-
 3 files changed, 18 insertions(+), 4 deletions(-)



[beam] 01/01: Merge pull request #25639: Fix paginated windmill sorted list state

2023-02-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 62e9a1239bb6f9ef2f1469d0c1af192697f8b10c
Merge: 170c4184ce1 1087abeb734
Author: Reuven Lax 
AuthorDate: Mon Feb 27 14:39:32 2023 -0800

Merge pull request #25639: Fix paginated windmill sorted list state

 .../dataflow/worker/WindmillStateReader.java   | 15 --
 .../dataflow/worker/WindmillStateReaderTest.java   | 58 ++
 2 files changed, 58 insertions(+), 15 deletions(-)



[beam] branch master updated (170c4184ce1 -> 62e9a1239bb)

2023-02-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 170c4184ce1 Merge pull request #25633 from andreykot/patch-1
 add 1087abeb734 Fix paginated windmill sorted list state.
 new 62e9a1239bb Merge pull request #25639: Fix paginated windmill sorted 
list state

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dataflow/worker/WindmillStateReader.java   | 15 --
 .../dataflow/worker/WindmillStateReaderTest.java   | 58 ++
 2 files changed, 58 insertions(+), 15 deletions(-)



[beam] branch master updated (fabd1f6d7a6 -> a3d82e64f31)

2023-02-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from fabd1f6d7a6 Add ReadFrom/WriteTo Csv/Json as top-level transforms to 
the Python SDK. (#25614)
 add a3d82e64f31 Merge pull request #25642: #25626 Revert to old 
constructor.

No new revisions were added by this update.

Summary of changes:
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)



[beam] branch master updated: Merge pull request #25627: #25626 Use the correct constructor when creating StreamWriter

2023-02-24 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 124e1d6a5f1 Merge pull request #25627: #25626 Use the correct 
constructor when creating StreamWriter
124e1d6a5f1 is described below

commit 124e1d6a5f14dd7b194a98d43dbc771cbad05065
Author: Reuven Lax 
AuthorDate: Fri Feb 24 14:52:47 2023 -0800

Merge pull request #25627: #25626 Use the correct constructor when creating 
StreamWriter
---
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index bc798c47573..040346bb910 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1338,7 +1338,9 @@ class BigQueryServicesImpl implements BigQueryServices {
   .build();
 
   StreamWriter streamWriter =
-  StreamWriter.newBuilder(streamName)
+  StreamWriter.newBuilder(
+  streamName,
+  
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(newWriteClient))
   .setWriterSchema(protoSchema)
   .setChannelProvider(transportChannelProvider)
   .setEnableConnectionPool(useConnectionPool)



[beam] branch master updated (957301519bb -> bc2895c99a2)

2023-01-31 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 957301519bb Fix Debezium expansion service fail to start (#25243)
 add bc2895c99a2 Merge pull request #25094: Externalizing the StreamWriter 
parameters for StorageWrites

No new revisions were added by this update.

Summary of changes:
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java| 13 +
 .../beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 10 ++
 2 files changed, 23 insertions(+)



[beam] branch release-2.45.0 updated (7d07ffe68ba -> ade83b355ee)

2023-01-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch release-2.45.0
in repository https://gitbox.apache.org/repos/asf/beam.git


from 7d07ffe68ba update cloudpickle version (#25143)
 add ab5cebf5525 Handle schema updates in Storage API writes.
 add 7f6c93ea039 Fix inconsistent cache bug
 new ade83b355ee Merge pull request #25205: Cherry pick/fix bigquery 
inconsistent cache

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 120 ++--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  18 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |   8 +
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  11 ++
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java|  29 ---
 .../sdk/io/gcp/bigquery/SplittingIterable.java |  61 +-
 .../bigquery/StorageApiDynamicDestinations.java|   2 +
 .../StorageApiDynamicDestinationsBeamRow.java  |   8 +-
 .../StorageApiDynamicDestinationsTableRow.java |  24 ++-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  20 +-
 .../io/gcp/bigquery/StorageApiWritePayload.java|  25 ++-
 .../StorageApiWriteRecordsInconsistent.java|  12 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 146 +++---
 .../bigquery/StorageApiWritesShardedRecords.java   | 173 +++--
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 110 +--
 .../sdk/io/gcp/testing/FakeDatasetService.java |  63 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 216 +
 .../bigquery/TableRowToStorageApiProtoTest.java|  95 -
 18 files changed, 968 insertions(+), 173 deletions(-)



[beam] 01/01: Merge pull request #25205: Cherry pick/fix bigquery inconsistent cache

2023-01-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch release-2.45.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ade83b355eed496c7e23606ab63e86eddd1d3988
Merge: 7d07ffe68ba 7f6c93ea039
Author: Reuven Lax 
AuthorDate: Fri Jan 27 08:41:58 2023 -0800

Merge pull request #25205: Cherry pick/fix bigquery inconsistent cache

 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 120 ++--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  18 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |   8 +
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  11 ++
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java|  29 ---
 .../sdk/io/gcp/bigquery/SplittingIterable.java |  61 +-
 .../bigquery/StorageApiDynamicDestinations.java|   2 +
 .../StorageApiDynamicDestinationsBeamRow.java  |   8 +-
 .../StorageApiDynamicDestinationsTableRow.java |  24 ++-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  20 +-
 .../io/gcp/bigquery/StorageApiWritePayload.java|  25 ++-
 .../StorageApiWriteRecordsInconsistent.java|  12 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 146 +++---
 .../bigquery/StorageApiWritesShardedRecords.java   | 173 +++--
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 110 +--
 .../sdk/io/gcp/testing/FakeDatasetService.java |  63 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 216 +
 .../bigquery/TableRowToStorageApiProtoTest.java|  95 -
 18 files changed, 968 insertions(+), 173 deletions(-)



[beam] 01/01: Merge pull request #25194: update GCP cloud libraries BOM to 26.5.0

2023-01-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3bf17574c6f18942d1015d5a837af23ae83c4d1a
Merge: 61045b670bf 40243ffc284
Author: Reuven Lax 
AuthorDate: Fri Jan 27 06:00:57 2023 -0800

Merge pull request #25194: update GCP cloud libraries BOM to 26.5.0

 .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 12 ++--
 sdks/java/container/license_scripts/dep_urls_java.yaml   |  2 +-
 2 files changed, 7 insertions(+), 7 deletions(-)



[beam] branch master updated (61045b670bf -> 3bf17574c6f)

2023-01-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 61045b670bf Update Dataflow container versions (#25192)
 add 40243ffc284 update GCP cloud libraries BOM to 26.5.0
 new 3bf17574c6f Merge pull request #25194: update GCP cloud libraries BOM 
to 26.5.0

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 12 ++--
 sdks/java/container/license_scripts/dep_urls_java.yaml   |  2 +-
 2 files changed, 7 insertions(+), 7 deletions(-)



[beam] branch master updated (91efbd3a933 -> 1ecb4ef6d6f)

2023-01-25 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 91efbd3a933 Fix intruction -> instruction typo
 add e8484a790e3 Fix inconsistent cache bug
 add 1ecb4ef6d6f Merge pull request #25159: Fix inconsistent cache bug

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |  9 ++-
 .../bigquery/StorageApiWritesShardedRecords.java   | 82 --
 2 files changed, 53 insertions(+), 38 deletions(-)



[beam] 01/01: Merge pull request #24145: Handle updates to table schema when using Storage API writes.

2023-01-19 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f5020e7ac2bdd3363619aafbf257f2ebf8e3fe2b
Merge: 428ec97e30c 7ad44c84585
Author: Reuven Lax 
AuthorDate: Thu Jan 19 23:53:33 2023 -0800

Merge pull request #24145: Handle updates to table schema when using 
Storage API writes.

 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 115 +--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  18 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |   8 +
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  11 ++
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java|  29 ---
 .../sdk/io/gcp/bigquery/SplittingIterable.java |  61 +-
 .../bigquery/StorageApiDynamicDestinations.java|   2 +
 .../StorageApiDynamicDestinationsBeamRow.java  |   8 +-
 .../StorageApiDynamicDestinationsTableRow.java |  24 ++-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  20 +-
 .../io/gcp/bigquery/StorageApiWritePayload.java|  25 ++-
 .../StorageApiWriteRecordsInconsistent.java|  12 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 146 +++---
 .../bigquery/StorageApiWritesShardedRecords.java   | 173 +++--
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 110 +--
 .../sdk/io/gcp/testing/FakeDatasetService.java |  63 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 216 +
 .../bigquery/TableRowToStorageApiProtoTest.java|  95 -
 18 files changed, 958 insertions(+), 178 deletions(-)



[beam] branch master updated (428ec97e30c -> f5020e7ac2b)

2023-01-19 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 428ec97e30c improve error message for mismatched pipelines (#24834)
 add 7ad44c84585 Handle schema updates in Storage API writes.
 new f5020e7ac2b Merge pull request #24145: Handle updates to table schema 
when using Storage API writes.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 115 +--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  18 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |   8 +
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  11 ++
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java|  29 ---
 .../sdk/io/gcp/bigquery/SplittingIterable.java |  61 +-
 .../bigquery/StorageApiDynamicDestinations.java|   2 +
 .../StorageApiDynamicDestinationsBeamRow.java  |   8 +-
 .../StorageApiDynamicDestinationsTableRow.java |  24 ++-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  20 +-
 .../io/gcp/bigquery/StorageApiWritePayload.java|  25 ++-
 .../StorageApiWriteRecordsInconsistent.java|  12 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 146 +++---
 .../bigquery/StorageApiWritesShardedRecords.java   | 173 +++--
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 110 +--
 .../sdk/io/gcp/testing/FakeDatasetService.java |  63 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 216 +
 .../bigquery/TableRowToStorageApiProtoTest.java|  95 -
 18 files changed, 958 insertions(+), 178 deletions(-)



[beam] branch master updated (1c4e4241c2a -> 876ce5bcafc)

2023-01-19 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 1c4e4241c2a [Playground][Frontend] Run timer fix; CodeRunner and 
OutputType extraction. (#24871)
 add 876ce5bcafc Merge pull request #25073: Improves StorageWrite API error 
logging

No new revisions were added by this update.

Summary of changes:
 .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java  | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)



[beam] branch master updated (f40057b7fb6 -> 482c3107674)

2023-01-17 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from f40057b7fb6 Bump json5 from 1.0.1 to 1.0.2 in 
/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel
 (#24895)
 add e22afb090fc Properly synchronize pins
 add 6847255514d typo
 new 482c3107674 Merge pull request #25047: Properly synchronize pins

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java   | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)



[beam] 01/01: Merge pull request #25047: Properly synchronize pins

2023-01-17 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 482c3107674749b8362130ac9df331c5f9062f99
Merge: f40057b7fb6 6847255514d
Author: Reuven Lax 
AuthorDate: Tue Jan 17 17:54:51 2023 -0800

Merge pull request #25047: Properly synchronize pins

 .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java   | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)



[beam] 01/01: Merge pull request #25044: Add public qualifier to FillGaps methods

2023-01-17 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5a2a3a4dfb26b7e8a7f2b132e1913fb4504216b2
Merge: 3dcb08510fd 120558cb9f7
Author: Reuven Lax 
AuthorDate: Tue Jan 17 12:08:51 2023 -0800

Merge pull request #25044: Add public qualifier to FillGaps methods

 .../java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)



[beam] branch master updated (3dcb08510fd -> 5a2a3a4dfb2)

2023-01-17 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 3dcb08510fd Merge pull request #24951: Update website metadata to 
2.44.0 and blog post
 add 120558cb9f7 add public qualifier
 new 5a2a3a4dfb2 Merge pull request #25044: Add public qualifier to 
FillGaps methods

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)



[beam] branch master updated (92223e3ea2f -> c0e689331c2)

2023-01-10 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 92223e3ea2f [WebSite] Add new Go quickstart (#24885)
 add 5a57d3f75ce fix pinning bug in storage-api writes
 new c0e689331c2 Merge pull request #24968: Fix pinning bug in storage-api 
writes

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 19 ++-
 .../gcp/bigquery/StorageApiWritesShardedRecords.java  | 13 +++--
 2 files changed, 25 insertions(+), 7 deletions(-)



[beam] 01/01: Merge pull request #24968: Fix pinning bug in storage-api writes

2023-01-10 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c0e689331c2a6573ecf267b9bef133a85ea8a36c
Merge: 92223e3ea2f 5a57d3f75ce
Author: Reuven Lax 
AuthorDate: Tue Jan 10 16:00:21 2023 -0800

Merge pull request #24968: Fix pinning bug in storage-api writes

 .../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 19 ++-
 .../gcp/bigquery/StorageApiWritesShardedRecords.java  | 13 +++--
 2 files changed, 25 insertions(+), 7 deletions(-)



[beam] 01/01: Merge pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

2023-01-06 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c162f4c13de69d4c62eb017169b104b57d4e212a
Merge: f9d5de34ae1 de0cf60eb28
Author: Reuven Lax 
AuthorDate: Fri Jan 6 22:25:04 2023 -0800

Merge pull request #24853: Modify windmill DirectStreamObserver to call 
isReady only every 10 messages by default

 .../options/StreamingDataflowWorkerOptions.java|  9 +++
 .../worker/windmill/DirectStreamObserver.java  | 77 ++
 .../worker/windmill/GrpcWindmillServer.java|  3 +-
 .../worker/windmill/StreamObserverFactory.java | 12 ++--
 4 files changed, 68 insertions(+), 33 deletions(-)



[beam] branch master updated (f9d5de34ae1 -> c162f4c13de)

2023-01-06 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from f9d5de34ae1 update Java version from 11 to 17 (#24839)
 add de0cf60eb28 Modify windmill DirectStreamObserver to call isReady only 
every 10 messages by default. This provides more output buffering which ensures 
that output is not throttled on synchronization when message sizes exceed 32KB 
grpc isready limit.
 new c162f4c13de Merge pull request #24853: Modify windmill 
DirectStreamObserver to call isReady only every 10 messages by default

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../options/StreamingDataflowWorkerOptions.java|  9 +++
 .../worker/windmill/DirectStreamObserver.java  | 77 ++
 .../worker/windmill/GrpcWindmillServer.java|  3 +-
 .../worker/windmill/StreamObserverFactory.java | 12 ++--
 4 files changed, 68 insertions(+), 33 deletions(-)



[beam] branch master updated (6ad75405c97 -> e54871d76f0)

2023-01-06 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6ad75405c97 TFT Criteo benchmarks (#24382)
 add db1f9d23b5f Add thread prefix for StreamingDataflowWorker work executor
 new e54871d76f0 Merge pull request #24646: Add thread prefix for 
StreamingDataflowWorker work executor

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 +---
 1 file changed, 5 insertions(+), 7 deletions(-)



[beam] 01/01: Merge pull request #24646: Add thread prefix for StreamingDataflowWorker work executor

2023-01-06 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e54871d76f09292f8a61a2639b10568fd901f5b5
Merge: 6ad75405c97 db1f9d23b5f
Author: Reuven Lax 
AuthorDate: Fri Jan 6 11:39:04 2023 -0800

Merge pull request #24646: Add thread prefix for StreamingDataflowWorker 
work executor

 .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 +---
 1 file changed, 5 insertions(+), 7 deletions(-)



[beam] 01/01: Merge pull request #24546: Fix null pointer exception caused by clearing member variable

2022-12-06 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ed29ceb90fa384d8daf0f9604643c031539b5fc2
Merge: 8ec0568d12d 40aca90f8d0
Author: Reuven Lax 
AuthorDate: Tue Dec 6 11:09:16 2022 -0800

Merge pull request #24546: Fix null pointer exception caused by clearing 
member variable

 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 12 ++---
 .../bigquery/StorageApiWritesShardedRecords.java   | 62 +-
 2 files changed, 41 insertions(+), 33 deletions(-)



[beam] branch master updated (8ec0568d12d -> ed29ceb90fa)

2022-12-06 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 8ec0568d12d Disallow using the JRH with Python streaming pipelines 
(#24513)
 add 40aca90f8d0 fix null pointer exception caused by clearing member 
variable
 new ed29ceb90fa Merge pull request #24546: Fix null pointer exception 
caused by clearing member variable

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 12 ++---
 .../bigquery/StorageApiWritesShardedRecords.java   | 62 +-
 2 files changed, 41 insertions(+), 33 deletions(-)



[beam] branch master updated (32673a9d553 -> d979d88741a)

2022-12-02 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 32673a9d553 Bump cloud.google.com/go/profiler from 0.3.0 to 0.3.1 in 
/sdks (#24498)
 add 1743dc59073 fix exception
 new d979d88741a Merge pull request #24512:Fix exception caused by null 
StreamAppendClient

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 25 +-
 1 file changed, 10 insertions(+), 15 deletions(-)



[beam] 01/01: Merge pull request #24512:Fix exception caused by null StreamAppendClient

2022-12-02 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d979d88741abf4a4a6b79b2240460bd2969f465a
Merge: 32673a9d553 1743dc59073
Author: Reuven Lax 
AuthorDate: Fri Dec 2 21:16:09 2022 -0800

Merge pull request #24512:Fix exception caused by null StreamAppendClient

 .../bigquery/StorageApiWriteUnshardedRecords.java  | 25 +-
 1 file changed, 10 insertions(+), 15 deletions(-)



[beam] branch master updated: Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395

2022-11-28 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 5bb13fa35b9 Merge pull request #24147: First step in adding schema 
update to Storage API sink. Refactor code #21395
5bb13fa35b9 is described below

commit 5bb13fa35b9bc36764895c57f23d3890f0f1b567
Author: Reuven Lax 
AuthorDate: Mon Nov 28 20:00:52 2022 -0800

Merge pull request #24147: First step in adding schema update to Storage 
API sink. Refactor code #21395
---
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |  68 +
 .../io/gcp/bigquery/BeamRowToStorageApiProto.java  | 144 -
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  12 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |   6 -
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java|   2 +-
 .../sdk/io/gcp/bigquery/SplittingIterable.java |  31 +-
 .../bigquery/StorageApiDynamicDestinations.java|  21 +-
 .../StorageApiDynamicDestinationsBeamRow.java  |  50 ++-
 .../StorageApiDynamicDestinationsTableRow.java | 201 +---
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  12 +-
 .../io/gcp/bigquery/StorageApiWritePayload.java|   5 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 155 +-
 .../bigquery/StorageApiWritesShardedRecords.java   | 214 +++--
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 337 -
 .../gcp/bigquery/BeamRowToStorageApiProtoTest.java |  28 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 273 -
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java |  26 +-
 .../bigquery/TableRowToStorageApiProtoTest.java|  82 -
 18 files changed, 849 insertions(+), 818 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
new file mode 100644
index 000..1680ef48e4d
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+
+/**
+ * Container class used by {@link StorageApiWritesShardedRecords} and {@link
+ * StorageApiWritesShardedRecords} to enapsulate a destination {@link 
TableSchema} along with a
+ * {@link BigQueryServices.StreamAppendClient} and other objects needed to 
write records.
+ */
+class AppendClientInfo {
+  @Nullable BigQueryServices.StreamAppendClient streamAppendClient;
+  @Nullable TableSchema tableSchema;
+  Consumer closeAppendClient;
+  Descriptors.Descriptor descriptor;
+
+  public AppendClientInfo(
+  TableSchema tableSchema, Consumer 
closeAppendClient)
+  throws Exception {
+this.tableSchema = tableSchema;
+this.descriptor = 
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
+this.closeAppendClient = closeAppendClient;
+  }
+
+  public AppendClientInfo clearAppendClient() {
+if (streamAppendClient != null) {
+  closeAppendClient.accept(streamAppendClient);
+  this.streamAppendClient = null;
+}
+return this;
+  }
+
+  public AppendClientInfo createAppendClient(
+  BigQueryServices.DatasetService datasetService,
+  Supplier getStreamName,
+  boolean useConnectionPool)
+  throws Exception {
+if (streamAppendClient == null) {
+  this.streamAppendClient =
+  datasetService.getStreamAppendClient(getStreamName.get(), 
descriptor, useConnectionPool);
+}
+return this;
+  }
+
+  public void close() {
+clearAppendClient();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
 
b/sdks/java/io/googl

[beam] branch master updated: Merge pull request #24320: update bom to the latest one

2022-11-28 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ec6d163cf36 Merge pull request #24320: update bom to the latest one
ec6d163cf36 is described below

commit ec6d163cf36a5d9951764b951e8fccd7b298aa2d
Author: Reuven Lax 
AuthorDate: Mon Nov 28 18:42:49 2022 -0800

Merge pull request #24320: update bom to the latest one
---
 .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 12 ++--
 sdks/java/container/license_scripts/dep_urls_java.yaml   |  2 +-
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index abb32698ddb..107fbefd164 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -466,15 +466,15 @@ class BeamModulePlugin implements Plugin {
 def dbcp2_version = "2.8.0"
 def errorprone_version = "2.10.0"
 // Try to keep gax_version consistent with gax-grpc version in 
google_cloud_platform_libraries_bom
-def gax_version = "2.19.2"
+def gax_version = "2.19.5"
 def google_clients_version = "2.0.0"
 def google_cloud_bigdataoss_version = "2.2.6"
 // Try to keep google_cloud_spanner_version consistent with 
google_cloud_spanner_bom in google_cloud_platform_libraries_bom
-def google_cloud_spanner_version = "6.31.2"
+def google_cloud_spanner_version = "6.33.0"
 def google_code_gson_version = "2.9.1"
 def google_oauth_clients_version = "1.34.1"
 // Try to keep grpc_version consistent with gRPC version in 
google_cloud_platform_libraries_bom
-def grpc_version = "1.49.2"
+def grpc_version = "1.50.2"
 def guava_version = "31.1-jre"
 def hadoop_version = "2.10.2"
 def hamcrest_version = "2.1"
@@ -490,7 +490,7 @@ class BeamModulePlugin implements Plugin {
 def postgres_version = "42.2.16"
 def powermock_version = "2.0.9"
 // Try to keep protobuf_version consistent with the protobuf version in 
google_cloud_platform_libraries_bom
-def protobuf_version = "3.21.7"
+def protobuf_version = "3.21.9"
 def quickcheck_version = "1.0"
 def sbe_tool_version = "1.25.1"
 def singlestore_jdbc_version = "1.1.4"
@@ -607,9 +607,9 @@ class BeamModulePlugin implements Plugin {
 google_cloud_pubsub : 
"com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom 
sets version
 google_cloud_pubsublite : 
"com.google.cloud:google-cloud-pubsublite",  // 
google_cloud_platform_libraries_bom sets version
 // The GCP Libraries BOM dashboard shows the versions set by the BOM:
-// 
https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.1.3/artifact_details.html
+// 
https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.1.5/artifact_details.html
 // Update libraries-bom version on 
sdks/java/container/license_scripts/dep_urls_java.yaml
-google_cloud_platform_libraries_bom : 
"com.google.cloud:libraries-bom:26.1.3",
+google_cloud_platform_libraries_bom : 
"com.google.cloud:libraries-bom:26.1.5",
 google_cloud_spanner: 
"com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom 
sets version
 google_cloud_spanner_test   : 
"com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
 google_code_gson: 
"com.google.code.gson:gson:$google_code_gson_version",
diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml 
b/sdks/java/container/license_scripts/dep_urls_java.yaml
index 54ee423..0fcf580320a 100644
--- a/sdks/java/container/license_scripts/dep_urls_java.yaml
+++ b/sdks/java/container/license_scripts/dep_urls_java.yaml
@@ -42,7 +42,7 @@ jaxen:
   '1.1.6':
 type: "3-Clause BSD"
 libraries-bom:
-  '26.1.3':
+  '26.1.5':
 license: 
"https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE";
 type: "Apache License 2.0"
 paranamer:



[beam] 01/01: Merge pull request #24215: Fix OrderedListState for Dataflow Streaming pipelines on SE.

2022-11-19 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a04682a71a957ec35f876b345caa5dd37cd52ab5
Merge: 36b79851359 10a9dc22318
Author: Reuven Lax 
AuthorDate: Sat Nov 19 21:59:24 2022 -0800

Merge pull request #24215: Fix OrderedListState for Dataflow Streaming 
pipelines on SE.

 .../beam/runners/dataflow/worker/WindmillStateInternals.java  | 11 ---
 .../runners/dataflow/worker/WindmillStateInternalsTest.java   | 10 +-
 2 files changed, 13 insertions(+), 8 deletions(-)



[beam] branch master updated (36b79851359 -> a04682a71a9)

2022-11-19 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 36b79851359 Merge pull request #24270: Update Java Multi-lang 
quickstart after the Beam 2.43.0 release
 add 10a9dc22318 Fix OrderedListState for Dataflow Streaming pipelines on 
SE. The id field is unsigned so setting LONG.MIN resulted in a larger id than 
supported.
 new a04682a71a9 Merge pull request #24215: Fix OrderedListState for 
Dataflow Streaming pipelines on SE.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/runners/dataflow/worker/WindmillStateInternals.java  | 11 ---
 .../runners/dataflow/worker/WindmillStateInternalsTest.java   | 10 +-
 2 files changed, 13 insertions(+), 8 deletions(-)



[beam] branch master updated (9c83de646ab -> 48c70cc3074)

2022-11-14 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 9c83de646ab Add more tests for S3 filesystem (#24138)
 add 48c70cc3074 Merge pull request #2: Track time on Cloud Dataflow 
streaming data reads and export via heartbeats

No new revisions were added by this update.

Summary of changes:
 .../dataflow/worker/StreamingDataflowWorker.java   |  237 ++--
 .../dataflow/worker/WindmillStateReader.java   |   25 +-
 .../dataflow/worker/FakeWindmillServer.java|  122 ++-
 .../worker/StreamingDataflowWorkerTest.java| 1147 ++--
 4 files changed, 1100 insertions(+), 431 deletions(-)



[beam] branch master updated: Merge pull request #23556: Forward failed storage-api row inserts to the failedStorageApiInserts PCollection addresses #23628

2022-10-22 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 8df6f67c65b Merge pull request #23556: Forward failed storage-api row 
inserts to the failedStorageApiInserts PCollection addresses #23628
8df6f67c65b is described below

commit 8df6f67c65b4888c45c31e088fb463972c4ec76b
Author: Reuven Lax 
AuthorDate: Sat Oct 22 10:37:18 2022 -0700

Merge pull request #23556: Forward failed storage-api row inserts to the 
failedStorageApiInserts PCollection addresses #23628
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |   6 +
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  | 100 +++---
 .../StorageApiWriteRecordsInconsistent.java|  50 +--
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 277 +
 .../bigquery/StorageApiWritesShardedRecords.java   | 342 ++---
 .../beam/sdk/io/gcp/testing/BigqueryClient.java|   4 +-
 .../sdk/io/gcp/testing/FakeDatasetService.java |  32 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |  21 +-
 .../io/gcp/bigquery/BigQueryNestedRecordsIT.java   |   5 +-
 .../gcp/bigquery/StorageApiSinkFailedRowsIT.java   | 266 
 .../gcp/bigquery/TableRowToStorageApiProtoIT.java  |   8 +-
 12 files changed, 865 insertions(+), 248 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 1f1fe4589ff..7f6ac755d6b 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -603,7 +603,7 @@ class BeamModulePlugin implements Plugin {
 google_cloud_pubsub : 
"com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom 
sets version
 google_cloud_pubsublite : 
"com.google.cloud:google-cloud-pubsublite",  // 
google_cloud_platform_libraries_bom sets version
 // The GCP Libraries BOM dashboard shows the versions set by the BOM:
-// 
https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/25.2.0/artifact_details.html
+// 
https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.1.3/artifact_details.html
 // Update libraries-bom version on 
sdks/java/container/license_scripts/dep_urls_java.yaml
 google_cloud_platform_libraries_bom : 
"com.google.cloud:libraries-bom:26.1.3",
 google_cloud_spanner: 
"com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom 
sets version
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 953d1237d9c..53cb2713641 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -150,4 +150,10 @@ public interface BigQueryOptions
   Integer getStorageApiAppendThresholdRecordCount();
 
   void setStorageApiAppendThresholdRecordCount(Integer value);
+
+  @Description("Maximum request size allowed by the storage write API. ")
+  @Default.Long(10 * 1000 * 1000)
+  Long getStorageWriteApiMaxRequestSize();
+
+  void setStorageWriteApiMaxRequestSize(Long value);
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
index e48b9a19690..20ab251c9c0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -32,6 +33,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.ShardedKey;
 import org.apache.beam.sdk.values.KV;
 impo

[beam] branch master updated: Merge pull request #23795: Revert 23234: issue #23794

2022-10-22 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d38f577624e Merge pull request #23795: Revert 23234: issue #23794
d38f577624e is described below

commit d38f577624eaf8b5f4e31fec43ca8cfba132a132
Author: Reuven Lax 
AuthorDate: Sat Oct 22 00:09:20 2022 -0700

Merge pull request #23795: Revert 23234: issue #23794
---
 .../apache/beam/sdk/options/ExecutorOptions.java   | 59 ++
 .../sdk/extensions/gcp/options/GcsOptions.java | 29 +--
 2 files changed, 71 insertions(+), 17 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
new file mode 100644
index 000..2037d217422
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
+
+/**
+ * Options for configuring the {@link ScheduledExecutorService} used 
throughout the Java runtime.
+ */
+public interface ExecutorOptions extends PipelineOptions {
+
+  /**
+   * The {@link ScheduledExecutorService} instance to use to create threads, 
can be overridden to
+   * specify a {@link ScheduledExecutorService} that is compatible with the 
user's environment. If
+   * unset, the default is to create an {@link 
UnboundedScheduledExecutorService}.
+   */
+  @JsonIgnore
+  @Description(
+  "The ScheduledExecutorService instance to use to create threads, can be 
overridden to specify "
+  + "a ScheduledExecutorService that is compatible with the user's 
environment. If unset, "
+  + "the default is to create an UnboundedScheduledExecutorService.")
+  @Default.InstanceFactory(ScheduledExecutorServiceFactory.class)
+  @Hidden
+  ScheduledExecutorService getScheduledExecutorService();
+
+  void setScheduledExecutorService(ScheduledExecutorService value);
+
+  /** Returns the default {@link ScheduledExecutorService} to use within the 
Apache Beam SDK. */
+  class ScheduledExecutorServiceFactory implements 
DefaultValueFactory {
+@Override
+public ScheduledExecutorService create(PipelineOptions options) {
+  /* The SDK requires an unbounded thread pool because a step may create X 
writers
+   * each requiring their own thread to perform the writes otherwise a 
writer may
+   * block causing deadlock for the step because the writers buffer is 
full.
+   * Also, the MapTaskExecutor launches the steps in reverse order and 
completes
+   * them in forward order thus requiring enough threads so that each 
step's writers
+   * can be active.
+   */
+  return new UnboundedScheduledExecutorService();
+}
+  }
+}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index 0b14b244da5..fea7be7f5c7 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -29,10 +29,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExecutorOptions;
 import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import or

[beam] branch master updated: Revert "Update BQIO to a single scheduled executor service reduce threads (#23234)" (#23793)

2022-10-21 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 01da3fcb3e3 Revert "Update BQIO to a single scheduled executor service 
reduce threads (#23234)" (#23793)
01da3fcb3e3 is described below

commit 01da3fcb3e312ea0b5a62bd67b1b221074105a70
Author: Reuven Lax 
AuthorDate: Fri Oct 21 19:40:56 2022 -0700

Revert "Update BQIO to a single scheduled executor service reduce threads 
(#23234)" (#23793)

This reverts commit 8e2431c0e55237af4bd00a9786e4c150e20d4e14.
---
 .../apache/beam/sdk/options/ExecutorOptions.java   | 59 --
 .../sdk/extensions/gcp/options/GcsOptions.java | 29 ++-
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 27 --
 3 files changed, 17 insertions(+), 98 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
deleted file mode 100644
index 2037d217422..000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import java.util.concurrent.ScheduledExecutorService;
-import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
-
-/**
- * Options for configuring the {@link ScheduledExecutorService} used 
throughout the Java runtime.
- */
-public interface ExecutorOptions extends PipelineOptions {
-
-  /**
-   * The {@link ScheduledExecutorService} instance to use to create threads, 
can be overridden to
-   * specify a {@link ScheduledExecutorService} that is compatible with the 
user's environment. If
-   * unset, the default is to create an {@link 
UnboundedScheduledExecutorService}.
-   */
-  @JsonIgnore
-  @Description(
-  "The ScheduledExecutorService instance to use to create threads, can be 
overridden to specify "
-  + "a ScheduledExecutorService that is compatible with the user's 
environment. If unset, "
-  + "the default is to create an UnboundedScheduledExecutorService.")
-  @Default.InstanceFactory(ScheduledExecutorServiceFactory.class)
-  @Hidden
-  ScheduledExecutorService getScheduledExecutorService();
-
-  void setScheduledExecutorService(ScheduledExecutorService value);
-
-  /** Returns the default {@link ScheduledExecutorService} to use within the 
Apache Beam SDK. */
-  class ScheduledExecutorServiceFactory implements 
DefaultValueFactory {
-@Override
-public ScheduledExecutorService create(PipelineOptions options) {
-  /* The SDK requires an unbounded thread pool because a step may create X 
writers
-   * each requiring their own thread to perform the writes otherwise a 
writer may
-   * block causing deadlock for the step because the writers buffer is 
full.
-   * Also, the MapTaskExecutor launches the steps in reverse order and 
completes
-   * them in forward order thus requiring enough threads so that each 
step's writers
-   * can be active.
-   */
-  return new UnboundedScheduledExecutorService();
-}
-  }
-}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index fea7be7f5c7..0b14b244da5 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -29,10 +29,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.

[beam] branch master updated: Merge pull request #23510: Vortex multiplexing streams

2022-10-18 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 171d3e3d9d7 Merge pull request #23510: Vortex multiplexing streams
171d3e3d9d7 is described below

commit 171d3e3d9d700aa90c498719a0a925916c1cb56e
Author: Reuven Lax 
AuthorDate: Tue Oct 18 21:12:15 2022 -0700

Merge pull request #23510: Vortex multiplexing streams
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  6 ++--
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |  5 +++
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |  4 +--
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  5 +--
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 36 --
 .../bigquery/StorageApiWritesShardedRecords.java   |  4 +--
 .../sdk/io/gcp/testing/FakeDatasetService.java |  3 +-
 7 files changed, 43 insertions(+), 20 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index a81dd6b9055..26a14e86012 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -576,12 +576,12 @@ class BeamModulePlugin implements Plugin {
 google_api_client_jackson2  : 
"com.google.api-client:google-api-client-jackson2:$google_clients_version",
 google_api_client_java6 : 
"com.google.api-client:google-api-client-java6:$google_clients_version",
 google_api_common   : 
"com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
-google_api_services_bigquery: 
"com.google.apis:google-api-services-bigquery:v2-rev20220827-$google_clients_version",
+google_api_services_bigquery: 
"com.google.apis:google-api-services-bigquery:v2-rev20220924-$google_clients_version",
 google_api_services_clouddebugger   : 
"com.google.apis:google-api-services-clouddebugger:v2-rev20220318-$google_clients_version",
 google_api_services_cloudresourcemanager: 
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20220828-$google_clients_version",
-google_api_services_dataflow: 
"com.google.apis:google-api-services-dataflow:v1b3-rev20220812-$google_clients_version",
+google_api_services_dataflow: 
"com.google.apis:google-api-services-dataflow:v1b3-rev20220920-$google_clients_version",
 google_api_services_healthcare  : 
"com.google.apis:google-api-services-healthcare:v1-rev20220818-$google_clients_version",
-google_api_services_pubsub  : 
"com.google.apis:google-api-services-pubsub:v1-rev20220829-$google_clients_version",
+google_api_services_pubsub  : 
"com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
 google_api_services_storage : 
"com.google.apis:google-api-services-storage:v1-rev20220705-$google_clients_version",
 google_auth_library_credentials : 
"com.google.auth:google-auth-library-credentials", // 
google_cloud_platform_libraries_bom sets version
 google_auth_library_oauth2_http : 
"com.google.auth:google-auth-library-oauth2-http", // 
google_cloud_platform_libraries_bom sets version
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index cfebb754216..953d1237d9c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -109,6 +109,11 @@ public interface BigQueryOptions
 
   void setNumStorageWriteApiStreamAppendClients(Integer value);
 
+  @Default.Boolean(false)
+  Boolean getUseStorageApiConnectionPool();
+
+  void setUseStorageApiConnectionPool(Boolean value);
+
   @Description(
   "If set, then BigQueryIO.Write will default to triggering the Storage 
Write API writes this often.")
   Integer getStorageWriteApiTriggeringFrequencySec();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index a0484ccf972..adf02ed31a6 100644
--- 
a/sdks/java/io/googl

[beam] branch master updated: Merge pull request #23547: update bom to the latest one.

2022-10-08 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new b94cff209cc Merge pull request #23547: update bom to the latest one.
b94cff209cc is described below

commit b94cff209cc8d1ae61cc916ff6b0b68561dc34c8
Author: Reuven Lax 
AuthorDate: Sat Oct 8 08:19:56 2022 -0700

Merge pull request #23547: update bom to the latest one.
---
 .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 10 +-
 sdks/java/container/license_scripts/dep_urls_java.yaml |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index f27888fb441..8541cf75ae6 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -464,15 +464,15 @@ class BeamModulePlugin implements Plugin {
 def classgraph_version = "4.8.104"
 def errorprone_version = "2.10.0"
 // Try to keep gax_version consistent with gax-grpc version in 
google_cloud_platform_libraries_bom
-def gax_version = "2.18.7"
+def gax_version = "2.19.2"
 def google_clients_version = "2.0.0"
 def google_cloud_bigdataoss_version = "2.2.8"
 // Try to keep google_cloud_spanner_version consistent with 
google_cloud_spanner_bom in google_cloud_platform_libraries_bom
-def google_cloud_spanner_version = "6.29.0"
+def google_cloud_spanner_version = "6.31.2"
 def google_code_gson_version = "2.9.1"
 def google_oauth_clients_version = "1.34.1"
 // Try to keep grpc_version consistent with gRPC version in 
google_cloud_platform_libraries_bom
-def grpc_version = "1.48.0"
+def grpc_version = "1.49.2"
 def guava_version = "31.1-jre"
 def hadoop_version = "2.10.2"
 def hamcrest_version = "2.1"
@@ -488,7 +488,7 @@ class BeamModulePlugin implements Plugin {
 def postgres_version = "42.2.16"
 def powermock_version = "2.0.9"
 // Try to keep protobuf_version consistent with the protobuf version in 
google_cloud_platform_libraries_bom
-def protobuf_version = "3.21.4"
+def protobuf_version = "3.21.7"
 def quickcheck_version = "1.0"
 def sbe_tool_version = "1.25.1"
 def slf4j_version = "1.7.30"
@@ -600,7 +600,7 @@ class BeamModulePlugin implements Plugin {
 // The GCP Libraries BOM dashboard shows the versions set by the BOM:
 // 
https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/25.2.0/artifact_details.html
 // Update libraries-bom version on 
sdks/java/container/license_scripts/dep_urls_java.yaml
-google_cloud_platform_libraries_bom : 
"com.google.cloud:libraries-bom:26.1.1",
+google_cloud_platform_libraries_bom : 
"com.google.cloud:libraries-bom:26.1.3",
 google_cloud_spanner: 
"com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom 
sets version
 google_cloud_spanner_test   : 
"com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
 google_code_gson: 
"com.google.code.gson:gson:$google_code_gson_version",
diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml 
b/sdks/java/container/license_scripts/dep_urls_java.yaml
index 0765594e154..54ee423 100644
--- a/sdks/java/container/license_scripts/dep_urls_java.yaml
+++ b/sdks/java/container/license_scripts/dep_urls_java.yaml
@@ -42,7 +42,7 @@ jaxen:
   '1.1.6':
 type: "3-Clause BSD"
 libraries-bom:
-  '26.1.1':
+  '26.1.3':
 license: 
"https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE";
 type: "Apache License 2.0"
 paranamer:



[beam] branch master updated: Merge pull request #23505: opt in for schema update. addresses #23504

2022-10-05 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 7980cb9a35c Merge pull request #23505: opt in for schema update. 
addresses #23504
7980cb9a35c is described below

commit 7980cb9a35cdb37ffd57a43ec9a1bfa5e204f5d7
Author: Reuven Lax 
AuthorDate: Wed Oct 5 14:03:12 2022 -0700

Merge pull request #23505: opt in for schema update. addresses #23504
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   | 25 +-
 .../StorageApiDynamicDestinationsTableRow.java |  7 --
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |  1 +
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3a6280c3038..024dcb053b5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1804,6 +1804,7 @@ public class BigQueryIO {
 .setUseBeamSchema(false)
 .setAutoSharding(false)
 .setPropagateSuccessful(true)
+.setAutoSchemaUpdate(false)
 .setDeterministicRecordIdFn(null)
 .build();
   }
@@ -1947,6 +1948,8 @@ public class BigQueryIO {
 
 abstract Boolean getPropagateSuccessful();
 
+abstract Boolean getAutoSchemaUpdate();
+
 @Experimental
 abstract @Nullable SerializableFunction 
getDeterministicRecordIdFn();
 
@@ -2038,6 +2041,8 @@ public class BigQueryIO {
 
   abstract Builder setPropagateSuccessful(Boolean propagateSuccessful);
 
+  abstract Builder setAutoSchemaUpdate(Boolean autoSchemaUpdate);
+
   @Experimental
   abstract Builder setDeterministicRecordIdFn(
   SerializableFunction toUniqueIdFunction);
@@ -2555,6 +2560,17 @@ public class BigQueryIO {
 }
 
 /**
+ * If true, enables automatically detecting BigQuery table schema updates. 
If a message with
+ * unknown fields is processed, the BigQuery table is tabled to see if the 
schema has been
+ * updated. This is intended for scenarios in which unknown fields are 
rare, otherwise calls to
+ * BigQuery will throttle the pipeline. only supported when using one of 
the STORAGE_API insert
+ * methods.
+ */
+public Write withAutoSchemaUpdate(boolean autoSchemaUpdate) {
+  return toBuilder().setAutoSchemaUpdate(autoSchemaUpdate).build();
+}
+
+/*
  * Provides a function which can serve as a source of deterministic unique 
ids for each record
  * to be written, replacing the unique ids generated with the default 
scheme. When used with
  * {@link Method#STREAMING_INSERTS} This also elides the re-shuffle from 
the BigQueryIO Write by
@@ -2750,6 +2766,12 @@ public class BigQueryIO {
 method);
   }
 
+  if (method != Method.STORAGE_WRITE_API && method != 
Method.STORAGE_API_AT_LEAST_ONCE) {
+checkArgument(
+!getAutoSchemaUpdate(),
+"withAutoSchemaUpdate only supported when using storage-api 
writes.");
+  }
+
   if (method != Write.Method.FILE_LOADS) {
 // we only support writing avro for FILE_LOADS
 checkArgument(
@@ -3045,7 +3067,8 @@ public class BigQueryIO {
   tableRowWriterFactory.getToRowFn(),
   getCreateDisposition(),
   getIgnoreUnknownValues(),
-  bqOptions.getSchemaUpdateRetries());
+  bqOptions.getSchemaUpdateRetries(),
+  getAutoSchemaUpdate());
 }
 
 StorageApiLoads storageApiLoads =
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index b33b220de5a..b025d01f02b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -40,6 +40,7 @@ public class StorageApiDynamicDestinationsTableRow formatFunction,
   CreateDisposition createDisposition,
   boolean ignoreUnknownValues,
-  int schemaUpdateRetries) {
+  int schemaUpdateRetries,
+  boolean autoSchemaUpdates) {
 super(inner);
 this.formatFunction = formatFunction;
 this.createDisposition = createDisposition;
 this.ignoreUnknownValues = ignoreUnknownValues;
 this.schemaUpdateRetries = s

[beam] branch master updated: Merge pull request #22347: [22188]Set allowed timestamp skew

2022-08-04 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new afb7dc93221 Merge pull request #22347: [22188]Set allowed timestamp 
skew
afb7dc93221 is described below

commit afb7dc93221c1988af03fb01b2db267240a9dd4e
Author: Reuven Lax 
AuthorDate: Thu Aug 4 10:32:20 2022 -0700

Merge pull request #22347: [22188]Set allowed timestamp skew
---
 .../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java| 10 ++
 1 file changed, 10 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 1ccd527497b..afecc966955 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -553,6 +553,11 @@ public class StorageApiWritesShardedRecords> o,
 BoundedWindow window) {
   // Stream is idle - clear it.
+  // Note: this is best effort. We are explicitly emiting a timestamp that 
is before
+  // the default output timestamp, which means that in some cases (usually 
when draining
+  // a pipeline) this finalize element will be dropped as late. This is 
usually ok as
+  // BigQuery will eventually garbage collect the stream. We attempt to 
finalize idle streams
+  // merely to remove the pressure of large numbers of orphaned streams 
from BigQuery.
   finalizeStream(streamName, streamOffset, o, window.maxTimestamp());
   streamsIdle.inc();
 }
@@ -567,5 +572,10 @@ public class StorageApiWritesShardedRecords

[beam] 01/01: Merge pull request #22461: Fixes #22438. Ensure that WindmillStateReader completes all batched read futures on exceptions

2022-07-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c581b4dc6bd33360af890c9342e5239b8fcc7561
Merge: 505b5b723b0 9e7cc7cfd76
Author: Reuven Lax 
AuthorDate: Wed Jul 27 16:09:35 2022 -0700

Merge pull request #22461: Fixes #22438. Ensure that WindmillStateReader 
completes all batched read futures on exceptions

 .../dataflow/worker/WindmillStateReader.java   | 109 --
 .../dataflow/worker/WindmillStateReaderTest.java   | 161 -
 2 files changed, 222 insertions(+), 48 deletions(-)



[beam] branch master updated (505b5b723b0 -> c581b4dc6bd)

2022-07-27 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 505b5b723b0 Restrict google-api-core and regenerate container 
dependencies. #22481
 add 9e7cc7cfd76 Fixes #22438. Ensure that WindmillStateReader completes 
all batched read futures on exceptions.  Also improve decoder exceptions to 
only affect the corresponding state not the rest of batched reads.
 new c581b4dc6bd Merge pull request #22461: Fixes #22438. Ensure that 
WindmillStateReader completes all batched read futures on exceptions

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dataflow/worker/WindmillStateReader.java   | 109 --
 .../dataflow/worker/WindmillStateReaderTest.java   | 161 -
 2 files changed, 222 insertions(+), 48 deletions(-)



[beam] branch master updated: Merge pull request #15786: Add gap-filling transform for timeseries

2022-07-13 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new b78a080e932 Merge pull request #15786: Add gap-filling transform for 
timeseries
b78a080e932 is described below

commit b78a080e93282c83b56a788459090530069033f2
Author: Reuven Lax 
AuthorDate: Wed Jul 13 13:11:34 2022 -0700

Merge pull request #15786: Add gap-filling transform for timeseries
---
 .github/autolabeler.yml|   1 +
 build.gradle.kts   |   1 +
 .../org/apache/beam/sdk/coders/SortedMapCoder.java | 197 
 .../beam/sdk/schemas/transforms/WithKeys.java  |  79 +++
 sdks/java/extensions/timeseries/build.gradle   |  32 ++
 .../beam/sdk/extensions/timeseries/FillGaps.java   | 535 +
 .../sdk/extensions/timeseries/package-info.java|  20 +
 .../sdk/extensions/timeseries/FillGapsTest.java| 355 ++
 settings.gradle.kts|   1 +
 9 files changed, 1221 insertions(+)

diff --git a/.github/autolabeler.yml b/.github/autolabeler.yml
index 715abeddecf..888c5d4f3a4 100644
--- a/.github/autolabeler.yml
+++ b/.github/autolabeler.yml
@@ -42,6 +42,7 @@ extensions: ["sdks/java/extensions/**/*", 
"runners/extensions-java/**/*"]
 "sketching": ["sdks/java/extensions/sketching/**/*"]
 "sorter": ["sdks/java/extensions/sorter/**/*"]
 "sql": ["sdks/java/extensions/sql/**/*"]
+"timeseries": ["sdks/java/extensions/timeseries/*"]
 "zetasketch": ["sdks/java/extensions/zetasketch/**/*"]
 
 # IO
diff --git a/build.gradle.kts b/build.gradle.kts
index aa5f8949fa5..7ea18895f77 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -195,6 +195,7 @@ tasks.register("javaPreCommitPortabilityApi") {
 
 tasks.register("javaPostCommit") {
   dependsOn(":sdks:java:extensions:google-cloud-platform-core:postCommit")
+  dependsOn(":sdks:java:extensions:timeseries:postCommit")
   dependsOn(":sdks:java:extensions:zetasketch:postCommit")
   dependsOn(":sdks:java:extensions:ml:postCommit")
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java
new file mode 100644
index 000..8448c915654
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+
+/**
+ * A {@link Coder} for {@link SortedMap Maps} that encodes them according to 
provided coders for
+ * keys and values.
+ *
+ * @param  the type of the keys of the KVs being transcoded
+ * @param  the type of the values of the KVs being transcoded
+ */
+public class SortedMapCoder, V>
+extends StructuredCoder> {
+  /** Produces a MapCoder with the given keyCoder and valueCoder. */
+  public static , V> SortedMapCoder of(
+  Coder keyCoder, Coder valueCoder) {
+return new SortedMapCoder<>(keyCoder, valueCoder);
+  }
+
+  public Coder getKeyCoder() {
+return keyCoder;
+  }
+
+  public Coder getValueCoder() {
+return valueCoder;
+  }
+
+  /
+
+  private Coder keyCoder;
+  private Coder valueCoder;
+
+  priv

[beam] branch master updated (c7e77f92248 -> 029a2e3d5aa)

2022-07-12 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from c7e77f92248 Adding VladMatyunin as collaborator (#22239)
 add d69b217f0a3 add new pubsub urn
 new 029a2e3d5aa Merge pull request #21966: add new pubsub urn

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto | 4 
 1 file changed, 4 insertions(+)



[beam] 01/01: Merge pull request #21966: add new pubsub urn

2022-07-12 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 029a2e3d5aa8a1d71f855fab3bdecf46533d5d57
Merge: c7e77f92248 d69b217f0a3
Author: Reuven Lax 
AuthorDate: Tue Jul 12 10:25:26 2022 -0700

Merge pull request #21966: add new pubsub urn

 .../proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto | 4 
 1 file changed, 4 insertions(+)



[beam] branch master updated (ad25b8774b5 -> a23eca67718)

2022-07-07 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from ad25b8774b5 Merge pull request #22158 from Declarative theming, Remove 
duplicate PlaygroundState for embedded page, Do not re-create CodeController on 
language change (#22147)
 add 493b8866cbe set timestamp when outputting finalize element
 add a23eca67718 Merge pull request #22119: [22188] Set timestamp when 
outputting finalize element

No new revisions were added by this update.

Summary of changes:
 .../io/gcp/bigquery/StorageApiWritesShardedRecords.java | 17 +++--
 1 file changed, 11 insertions(+), 6 deletions(-)



[beam] branch master updated: Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow.

2022-06-14 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ba01293ba31 Merge pull request #21793: [21794 ] Fix output timestamp 
in Dataflow.
ba01293ba31 is described below

commit ba01293ba31fdac351f93521443dd91eef56a2ff
Author: Reuven Lax 
AuthorDate: Tue Jun 14 21:32:12 2022 -0700

Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow.
---
 .../dataflow/worker/WindmillTimerInternals.java| 55 --
 .../worker/StreamingDataflowWorkerTest.java|  1 +
 .../worker/windmill/src/main/proto/windmill.proto  |  2 +
 3 files changed, 34 insertions(+), 24 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index 658e74ee6c1..c1f89c679d5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -210,22 +210,31 @@ class WindmillTimerInternals implements TimerInternals {
 // Setting the timer. If it is a user timer, set a hold.
 
 // Only set a hold if it's needed and if the hold is before the end of 
the global window.
-if (needsWatermarkHold(timerData)
-&& timerData
-.getOutputTimestamp()
-
.isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1 {
-  // Setting a timer, clear any prior hold and set to the new value
-  outputBuilder
-  .addWatermarkHoldsBuilder()
-  .setTag(timerHoldTag(prefix, timerData))
-  .setStateFamily(stateFamily)
-  .setReset(true)
-  .addTimestamps(
-  
WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+if (needsWatermarkHold(timerData)) {
+  if (timerData
+  .getOutputTimestamp()
+  
.isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1 {
+// Setting a timer, clear any prior hold and set to the new value
+outputBuilder
+.addWatermarkHoldsBuilder()
+.setTag(timerHoldTag(prefix, timerData))
+.setStateFamily(stateFamily)
+.setReset(true)
+.addTimestamps(
+
WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+  } else {
+// Clear the hold in case a previous iteration of this timer set 
one.
+outputBuilder
+.addWatermarkHoldsBuilder()
+.setTag(timerHoldTag(prefix, timerData))
+.setStateFamily(stateFamily)
+.setReset(true);
+  }
 }
   } else {
 // Deleting a timer. If it is a user timer, clear the hold
 timer.clearTimestamp();
+timer.clearMetadataTimestamp();
 // Clear the hold even if it's the end of the global window in order 
to maintain update
 // compatibility.
 if (needsWatermarkHold(timerData)) {
@@ -276,6 +285,9 @@ class WindmillTimerInternals implements TimerInternals {
 
 
builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));
 
+// Store the output timestamp in the metadata timestamp.
+builder.setMetadataTimestamp(
+
WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
 return builder;
   }
 
@@ -344,8 +356,10 @@ class WindmillTimerInternals implements TimerInternals {
 ? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8()
 : "";
 
-// Parse the output timestamp. Not using '+' as a terminator because the 
output timestamp is the
-// last segment in the tag and the timestamp encoding itself may contain 
'+'.
+// For backwards compatibility, parse the output timestamp from the tag. 
Not using '+' as a
+// terminator because the
+// output timestamp is the last segment in the tag and the timestamp 
encoding itself may contain
+// '+'.
 int outputTimestampStart = timerFamilyEnd + 1;
 int outputTimestampEnd = tag.size();
 
@@ -360,6 +374,8 @@ class WindmillTimerInternals implements TimerInternals {
   } catch (IOException e) {
 throw new RuntimeException(e);
   }
+} else if (timer.hasMetadataTimestamp()) {
+  outputTimestamp = 
WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());

[beam] branch master updated (6c6b5f74f23 -> b0d964c4309)

2022-06-08 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6c6b5f74f23 Merge pull request #21736 from 
damccorm/users/damccorm/metrics
 add b0d964c4309 Merge pull request: [Beam-14528]: Add ISO time format 
support for Timestamp, Date, DateTime, Time field.

No new revisions were added by this update.

Summary of changes:
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 33 -
 .../bigquery/TableRowToStorageApiProtoTest.java| 86 +-
 2 files changed, 113 insertions(+), 6 deletions(-)



[beam] branch master updated: Merge pull request #17779: [BEAM-14529] Add integer to float64 conversion support

2022-06-03 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 871a98b97fb Merge pull request #17779: [BEAM-14529] Add integer to 
float64 conversion support
871a98b97fb is described below

commit 871a98b97fb52169a245a8800b92586a0026a565
Author: Yiru Tang 
AuthorDate: Fri Jun 3 15:58:08 2022 -0700

Merge pull request #17779: [BEAM-14529] Add integer to float64 conversion 
support
---
 .../beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java   | 2 +-
 .../beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java   | 8 
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 7c1aefd4b85..442bc57eb0d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -403,7 +403,7 @@ public class TableRowToStorageApiProto {
   case "FLOAT":
 if (value instanceof String) {
   return Double.valueOf((String) value);
-} else if (value instanceof Double || value instanceof Float) {
+} else if (value instanceof Number) {
   return ((Number) value).doubleValue();
 }
 break;
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 459fed3a168..c2d6cf23984 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -457,7 +457,7 @@ public class TableRowToStorageApiProtoTest {
   new TableCell().setV("42"),
   new TableCell().setV("43"),
   new TableCell().setV("2.8168"),
-  new TableCell().setV("2.817"),
+  new TableCell().setV("2"),
   new TableCell().setV("true"),
   new TableCell().setV("true"),
   new TableCell().setV("1970-01-01T00:00:00.43Z"),
@@ -476,7 +476,7 @@ public class TableRowToStorageApiProtoTest {
   .set("int64Value", "42")
   .set("intValue", "43")
   .set("float64Value", "2.8168")
-  .set("floatValue", "2.817")
+  .set("floatValue", "2")
   .set("boolValue", "true")
   .set("booleanValue", "true")
   .set("timestampValue", "1970-01-01T00:00:00.43Z")
@@ -495,7 +495,7 @@ public class TableRowToStorageApiProtoTest {
   .put("int64value", (long) 42)
   .put("intvalue", (long) 43)
   .put("float64value", (double) 2.8168)
-  .put("floatvalue", (double) 2.817)
+  .put("floatvalue", (double) 2)
   .put("boolvalue", true)
   .put("booleanvalue", true)
   .put("timestampvalue", 43L)
@@ -518,7 +518,7 @@ public class TableRowToStorageApiProtoTest {
   .put("int64value", (long) 42)
   .put("intvalue", (long) 43)
   .put("float64value", (double) 2.8168)
-  .put("floatvalue", (double) 2.817)
+  .put("floatvalue", (double) 2)
   .put("boolvalue", true)
   .put("booleanvalue", true)
   .put("timestampvalue", 43L)



[beam] 01/01: Merge pull request #17423: Handle invalid rows in the Storage Api sink

2022-05-26 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 25039a8aea8ad549e8ba9fdbfcd67c9f8fc2f9bf
Merge: 92b8dc75286 c4af119545c
Author: Reuven Lax 
AuthorDate: Thu May 26 17:38:53 2022 -0700

Merge pull request #17423: Handle invalid rows in the Storage Api sink

 .../beam/sdk/io/gcp/bigquery/BatchLoads.java   |   4 +-
 .../bigquery/BigQueryStorageApiInsertError.java|  56 ++
 .../BigQueryStorageApiInsertErrorCoder.java|  53 +
 .../io/gcp/bigquery/StorageApiConvertMessages.java |  64 ---
 .../bigquery/StorageApiDynamicDestinations.java|   3 +
 .../StorageApiDynamicDestinationsBeamRow.java  |   6 +
 .../StorageApiDynamicDestinationsTableRow.java |   6 +-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  | 124 +++--
 .../sdk/io/gcp/bigquery/StreamingWriteTables.java  |   2 +
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 107 ++
 .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java |   1 +
 .../beam/sdk/io/gcp/bigquery/WriteResult.java  | 103 +++--
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |  94 +++-
 .../gcp/bigquery/TableRowToStorageApiProtoIT.java  |   1 -
 14 files changed, 513 insertions(+), 111 deletions(-)



[beam] branch master updated (92b8dc75286 -> 25039a8aea8)

2022-05-26 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 92b8dc75286 [BEAM-14505] Add Dataflow streaming pipeline update 
support to the Go SDK (#17747)
 add c4af119545c DLQ for BQ Storage Api writes
 new 25039a8aea8 Merge pull request #17423: Handle invalid rows in the 
Storage Api sink

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java   |   4 +-
 ...nfo.java => BigQueryStorageApiInsertError.java} |  39 +--
 .../BigQueryStorageApiInsertErrorCoder.java}   |  38 +++
 .../io/gcp/bigquery/StorageApiConvertMessages.java |  64 ---
 .../bigquery/StorageApiDynamicDestinations.java|   3 +
 .../StorageApiDynamicDestinationsBeamRow.java  |   6 +
 .../StorageApiDynamicDestinationsTableRow.java |   6 +-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  | 124 +++--
 .../sdk/io/gcp/bigquery/StreamingWriteTables.java  |   2 +
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 107 ++
 .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java |   1 +
 .../beam/sdk/io/gcp/bigquery/WriteResult.java  | 103 +++--
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |  94 +++-
 .../gcp/bigquery/TableRowToStorageApiProtoIT.java  |   1 -
 14 files changed, 454 insertions(+), 138 deletions(-)
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/{TableRowInfo.java
 => BigQueryStorageApiInsertError.java} (56%)
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/{bigtable/BigtableWriteResultCoder.java
 => bigquery/BigQueryStorageApiInsertErrorCoder.java} (51%)



[beam] branch master updated: Merge pull request #17359: [BEAM-14303] Add a way to exclude output timestamp watermark holds

2022-05-05 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new fa01615e520 Merge pull request #17359: [BEAM-14303] Add a way to 
exclude output timestamp watermark holds
fa01615e520 is described below

commit fa01615e5207b53a7f73ff6a5ff55c336717c88f
Author: Reuven Lax 
AuthorDate: Thu May 5 14:23:48 2022 -0700

Merge pull request #17359: [BEAM-14303] Add a way to exclude output 
timestamp watermark holds
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java | 170 +
 .../beam/runners/core/SimpleDoFnRunnerTest.java|  10 +-
 .../dataflow/worker/WindmillTimerInternals.java|  10 +-
 .../main/java/org/apache/beam/sdk/state/Timer.java |   6 +
 .../apache/beam/sdk/transforms/Deduplicate.java|   8 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  50 ++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java|  65 +---
 .../bigquery/StorageApiWritesShardedRecords.java   |  11 +-
 8 files changed, 186 insertions(+), 144 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a73dd521f86..a7d67bf7526 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -246,6 +246,30 @@ public class SimpleDoFnRunner implements 
DoFnRunner= Integer.MAX_VALUE
+  ? fn.getAllowedTimestampSkew()
+  : 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
+  BoundedWindow.TIMESTAMP_MAX_VALUE));
+}
+  }
+
   private  void outputWindowedValue(TupleTag tag, WindowedValue 
windowedElem) {
 checkArgument(outputTags.contains(tag), "Unknown output tag %s", tag);
 outputManager.output(tag, windowedElem);
@@ -389,7 +413,7 @@ public class SimpleDoFnRunner implements 
DoFnRunner implements 
DoFnRunner void outputWithTimestamp(TupleTag tag, T output, Instant 
timestamp) {
   checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
-  checkTimestamp(timestamp);
+  checkTimestamp(elem.getTimestamp(), timestamp);
   outputWindowedValue(
   tag, WindowedValue.of(output, timestamp, elem.getWindows(), 
elem.getPane()));
 }
@@ -416,30 +440,6 @@ public class SimpleDoFnRunner implements 
DoFnRunner= Integer.MAX_VALUE
-? fn.getAllowedTimestampSkew()
-: 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
-BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-}
-
 @Override
 public BoundedWindow window() {
   return Iterables.getOnlyElement(elem.getWindows());
@@ -834,18 +834,19 @@ public class SimpleDoFnRunner implements 
DoFnRunner void output(TupleTag tag, T output) {
+  checkTimestamp(timestamp(), timestamp);
   outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
 }
 
 @Override
 public  void outputWithTimestamp(TupleTag tag, T output, Instant 
timestamp) {
-  checkTimestamp(timestamp);
+  checkTimestamp(timestamp(), timestamp);
   outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
 }
 
@@ -854,30 +855,6 @@ public class SimpleDoFnRunner implements 
DoFnRunner= Integer.MAX_VALUE
-? fn.getAllowedTimestampSkew()
-: 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
-BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-}
   }
 
   /**
@@ -1064,17 +1041,19 @@ public class SimpleDoFnRunner 
implements DoFnRunner void output(TupleTag tag, T output) {
+  checkTimestamp(this.timestamp, timestamp);
   outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
 }
 
 @Override
 public  void outputWithTimestamp(TupleTag tag, T output, Instant 
timestamp) {
-  checkTimestamp(timestamp);
+  checkTimestamp(this.timestamp, timestamp);
   outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
 }
 
@@ -1083,30 +1062,6 @@ public class SimpleDoFnRunner 
implements DoFnRunner= Integer.MAX_VALUE
-? fn.getAllowedTimestampSkew()
-: 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
-BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-}
   }
 
   private class TimerInternalsTimer implements Timer {
@@ -1121,7 +1076,8 @@ public class SimpleDoFnRunner implements 
DoFnRunner implements 
DoFnRunner 
implements DoFnRunner 
implements DoFnRunner element,
-

[beam] branch master updated: Merge pull request #17417: [BEAM-14388] Address some performance problems with the storage API

2022-05-04 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 52c3f07aa56 Merge pull request #17417: [BEAM-14388] Address some 
performance problems with the storage API
52c3f07aa56 is described below

commit 52c3f07aa56de738db3130283716d38206de42b7
Author: Reuven Lax 
AuthorDate: Wed May 4 13:14:07 2022 -0700

Merge pull request #17417: [BEAM-14388] Address some performance problems 
with the storage API
---
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |  6 ++
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |  8 +++
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 15 
 .../StorageApiWriteRecordsInconsistent.java|  7 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 84 ++
 5 files changed, 106 insertions(+), 14 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 85437f267d0..a9beb5cbd7c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -123,4 +123,10 @@ public interface BigQueryOptions
   Integer getSchemaUpdateRetries();
 
   void setSchemaUpdateRetries(Integer value);
+
+  @Description("Maximum (best effort) size of a single append to the storage 
API.")
+  @Default.Integer(2 * 1024 * 1024)
+  Integer getStorageApiAppendThresholdBytes();
+
+  void setStorageApiAppendThresholdBytes(Integer value);
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 8a7eff378c6..23fce8ba6ff 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -206,6 +206,14 @@ public interface BigQueryServices extends Serializable {
 /** Append rows to a Storage API write stream at the given offset. */
 ApiFuture appendRows(long offset, ProtoRows rows) 
throws Exception;
 
+/**
+ * If the previous call to appendRows blocked due to flow control, returns 
how long the call
+ * blocked for.
+ */
+default long getInflightWaitSeconds() {
+  return 0;
+}
+
 /**
  * Pin this object. If close() is called before all pins are removed, the 
underlying resources
  * will not be freed until all pins are removed.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 85a53487f88..2949150c9ee 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -33,6 +33,7 @@ import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.FixedHeaderProvider;
 import com.google.api.gax.rpc.HeaderProvider;
 import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.TransportChannelProvider;
 import com.google.api.gax.rpc.UnaryCallSettings;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.Bigquery.Tables;
@@ -1223,9 +1224,18 @@ class BigQueryServicesImpl implements BigQueryServices {
   ProtoSchema protoSchema =
   
ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build();
 
+  TransportChannelProvider transportChannelProvider =
+  BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
+  .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
+  .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
+  .setKeepAliveWithoutCalls(true)
+  .setChannelsPerCpu(2)
+  .build();
+
   StreamWriter streamWriter =
   StreamWriter.newBuilder(streamName)
   .setWriterSchema(protoSchema)
+  .setChannelProvider(transportChannelProvider)
   .setTraceId(
   "Dataflow:"
   + (bqIOMetadata.getBeamJobId() != null
@@ -1275,6 +1285,11 @@ class BigQueryServicesImpl implements BigQueryServices {
 throws Exception {
   return streamWriter.append(rows, offset);
 }

[beam] branch master updated: Merge pull request #17404: [BEAM-13990] support date and timestamp fields

2022-04-29 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 58b4d762eec Merge pull request #17404: [BEAM-13990] support date and 
timestamp fields
58b4d762eec is described below

commit 58b4d762eece66774a5df6ca54e6f91c49057c9b
Author: Reuven Lax 
AuthorDate: Fri Apr 29 22:02:56 2022 -0700

Merge pull request #17404: [BEAM-13990] support date and timestamp fields
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   1 +
 sdks/java/io/google-cloud-platform/build.gradle|   2 +
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |   2 +
 .../StorageApiDynamicDestinationsTableRow.java |  13 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  |   4 +
 .../bigquery/StorageApiWritesShardedRecords.java   |   4 +
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java |   8 +-
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 339 -
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |   8 -
 .../gcp/bigquery/TableRowToStorageApiProtoIT.java  | 407 +
 .../bigquery/TableRowToStorageApiProtoTest.java| 140 ---
 .../beam/sdk/io/gcp/spanner/StructUtilsTest.java   |   3 +-
 12 files changed, 777 insertions(+), 154 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 5d5b1e9377f..0f0c11dc2cb 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -631,6 +631,7 @@ class BeamModulePlugin implements Plugin {
 jackson_dataformat_xml  : 
"com.fasterxml.jackson.dataformat:jackson-dataformat-xml:$jackson_version",
 jackson_dataformat_yaml : 
"com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jackson_version",
 jackson_datatype_joda   : 
"com.fasterxml.jackson.datatype:jackson-datatype-joda:$jackson_version",
+jackson_datatype_jsr310 : 
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jackson_version",
 jackson_module_scala_2_11   : 
"com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version",
 jackson_module_scala_2_12   : 
"com.fasterxml.jackson.module:jackson-module-scala_2.12:$jackson_version",
 // Swap to use the officially published version of 0.4.x once available
diff --git a/sdks/java/io/google-cloud-platform/build.gradle 
b/sdks/java/io/google-cloud-platform/build.gradle
index 8d1d0a1776d..cd71fc35e21 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -116,6 +116,8 @@ dependencies {
   implementation library.java.http_core
   implementation library.java.jackson_core
   implementation library.java.jackson_databind
+  implementation library.java.jackson_datatype_joda
+  implementation library.java.jackson_datatype_jsr310
   implementation library.java.joda_time
   implementation library.java.junit
   implementation library.java.netty_handler
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 54f485a9d71..2a3e595adc1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -3085,6 +3085,8 @@ public class BigQueryIO {
 CreateTables.clearCreatedTables();
 TwoLevelMessageConverterCache.clear();
 StorageApiDynamicDestinationsTableRow.clearSchemaCache();
+StorageApiWriteUnshardedRecords.clearCache();
+StorageApiWritesShardedRecords.clearCache();
   }
 
   /
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index c48dbe0dedb..3c9f676cfad 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -70,6 +70,7 @@ public class StorageApiDynamicDestinationsTableRow
   DestinationT destination, DatasetService datasetService) throws 
Exception {
 re

[beam] branch master updated: Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness

2022-04-22 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 00320aa7793 Merge pull request #17428: [BEAM-14326] Make sure BigQuery 
daemon thread doesn't exit suddenly, as this leads to pipeline stuckness
00320aa7793 is described below

commit 00320aa7793ed4322a3fd029864dd300ac8bec00
Author: Reuven Lax 
AuthorDate: Fri Apr 22 10:14:34 2022 -0700

Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread 
doesn't exit suddenly, as this leads to pipeline stuckness
---
 .../StorageApiDynamicDestinationsTableRow.java | 10 +-
 .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 42 ++
 2 files changed, 36 insertions(+), 16 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index 281a7adb529..c48dbe0dedb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -27,6 +27,7 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import 
org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaTooNarrowException;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,10 +75,10 @@ public class StorageApiDynamicDestinationsTableRow
 
   {
 tableSchema = getSchema(destination);
+TableReference tableReference = 
getTable(destination).getTableReference();
 if (tableSchema == null) {
   // If the table already exists, then try and fetch the schema from 
the existing
   // table.
-  TableReference tableReference = 
getTable(destination).getTableReference();
   tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
   if (tableSchema == null) {
 if (createDisposition == CreateDisposition.CREATE_NEVER) {
@@ -95,7 +96,14 @@ public class StorageApiDynamicDestinationsTableRow
   + "using a create disposition of CREATE_IF_NEEDED.");
 }
   }
+} else {
+  // Make sure we register this schema with the cache, unless there's 
already a more
+  // up-to-date schema.
+  tableSchema =
+  MoreObjects.firstNonNull(
+  SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), 
tableSchema);
 }
+
 descriptor = 
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
 descriptorHash = 
BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
index 83246b26de2..e1775af2289 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
@@ -178,6 +178,21 @@ public class TableSchemaCache {
 return schemaHolder.map(SchemaHolder::getTableSchema).orElse(null);
   }
 
+  /**
+   * Registers schema for a table if one is not already present. If a schema 
is already in the
+   * cache, returns the existing schema, otherwise returns null.
+   */
+  @Nullable
+  public TableSchema putSchemaIfAbsent(TableReference tableReference, 
TableSchema tableSchema) {
+final String key = tableKey(tableReference);
+Optional existing =
+runUnderMonitor(
+() ->
+Optional.ofNullable(
+this.cachedSchemas.putIfAbsent(key, 
SchemaHolder.of(tableSchema, 0;
+return existing.map(SchemaHolder::getTableSchema).orElse(null);
+  }
+
   public void refreshSchema(TableReference tableReference, DatasetService 
datasetService) {
 int targetVersion =
 runUnderMonitor(
@@ -187,13 +202,11 @@ public class TableSchemaCache {
 "Cannot call refreshSchema after the object has been 
stopped!");
   }
   String key = tableKey(tableReference);
-  SchemaHolder schemaHolder = cachedSchemas.get

[beam] 01/01: Merge pull request #17401: [BEAM-14326] mark static thread as a daemon thread

2022-04-19 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 374566d80e96f6163ebeb4a893f6193d42ebec1d
Merge: 1808a343b36 54d3b27a898
Author: Reuven Lax 
AuthorDate: Tue Apr 19 14:51:43 2022 -0700

Merge pull request #17401: [BEAM-14326] mark static thread as a daemon 
thread

 .../main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java  | 1 +
 1 file changed, 1 insertion(+)



[beam] branch master updated (1808a343b36 -> 374566d80e9)

2022-04-19 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 1808a343b36 Populate actual dataflow job id to bigquery write trace id 
(#17130)
 add 54d3b27a898 mark static thread as a daemon thread
 new 374566d80e9 Merge pull request #17401: [BEAM-14326] mark static thread 
as a daemon thread

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java  | 1 +
 1 file changed, 1 insertion(+)



[beam] branch master updated: Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables

2022-04-18 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 4b709d5456b Merge pull request #17382: [BEAM-12356] Close 
DatasetService leak as local variables
4b709d5456b is described below

commit 4b709d5456b105ffcc251da7a0a4a0b560491b1c
Author: Minbo Bae <49642083+baemi...@users.noreply.github.com>
AuthorDate: Tue Apr 19 11:50:24 2022 +0900

Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local 
variables
---
 .../sdk/io/gcp/bigquery/BigQueryQueryHelper.java   | 195 +++--
 .../io/gcp/bigquery/BigQueryQuerySourceDef.java|  17 +-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java|  55 +++---
 3 files changed, 138 insertions(+), 129 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
index b78afc514ae..a42eea20052 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
@@ -96,105 +96,112 @@ class BigQueryQueryHelper {
   throws InterruptedException, IOException {
 // Step 1: Find the effective location of the query.
 String effectiveLocation = location;
-DatasetService tableService = bqServices.getDatasetService(options);
-if (effectiveLocation == null) {
-  List referencedTables =
-  dryRunQueryIfNeeded(
-  bqServices,
-  options,
-  dryRunJobStats,
-  query,
-  flattenResults,
-  useLegacySql,
-  location)
-  .getQuery()
-  .getReferencedTables();
-  if (referencedTables != null && !referencedTables.isEmpty()) {
-TableReference referencedTable = referencedTables.get(0);
-effectiveLocation =
-tableService
-.getDataset(referencedTable.getProjectId(), 
referencedTable.getDatasetId())
-.getLocation();
+try (DatasetService tableService = bqServices.getDatasetService(options)) {
+  if (effectiveLocation == null) {
+List referencedTables =
+dryRunQueryIfNeeded(
+bqServices,
+options,
+dryRunJobStats,
+query,
+flattenResults,
+useLegacySql,
+location)
+.getQuery()
+.getReferencedTables();
+if (referencedTables != null && !referencedTables.isEmpty()) {
+  TableReference referencedTable = referencedTables.get(0);
+  effectiveLocation =
+  tableService
+  .getDataset(referencedTable.getProjectId(), 
referencedTable.getDatasetId())
+  .getLocation();
+}
   }
-}
 
-// Step 2: Create a temporary dataset in the query location only if the 
user has not specified a
-// temp dataset.
-String queryJobId =
-BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), 
stepUuid, JobType.QUERY);
-Optional queryTempDatasetOpt = 
Optional.ofNullable(queryTempDatasetId);
-TableReference queryResultTable =
-createTempTableReference(
-options.getBigQueryProject() == null
-? options.getProject()
-: options.getBigQueryProject(),
-queryJobId,
-queryTempDatasetOpt);
-
-boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
-// Create dataset only if it has not been set by the user
-if (beamToCreateTempDataset) {
-  LOG.info("Creating temporary dataset {} for query results", 
queryResultTable.getDatasetId());
-
-  tableService.createDataset(
-  queryResultTable.getProjectId(),
-  queryResultTable.getDatasetId(),
-  effectiveLocation,
-  "Temporary tables for query results of job " + options.getJobName(),
-  TimeUnit.DAYS.toMillis(1));
-} else { // If the user specified a temp dataset, check that the 
destination table does not
-  // exist
-  Table destTable = tableService.getTable(queryResultTable);
-  checkArgument(
-  destTable == null,
-  "Refusing to write on existing table {} in the specified temp 
dataset {}",
-  queryResultTable.getTableId(),
-  queryResultTable.getDatasetId());
-}
+  // Step 2: Create a temporary dataset in the query location only if the 
user has not specified
+  // a temp dataset.
+  String queryJobId =

[beam] branch master updated (fbf16e5412d -> 1dd98718ab5)

2022-04-13 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from fbf16e5412d [BEAM-14287] Clean up staticcheck warnings in graph/coder 
(#17337)
 add 227fca623ad handle changing schemas in Storage API sink
 add 1dd98718ab5 Merge pull request #17070: Handle changing schemas in 
Storage API sink

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java   |  13 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |   6 +
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java|  29 ++
 ...ateTables.java => CreateTableDestinations.java} | 101 ---
 .../sdk/io/gcp/bigquery/CreateTableHelpers.java|  10 +-
 .../sdk/io/gcp/bigquery/SplittingIterable.java |  37 ++-
 .../io/gcp/bigquery/StorageApiConvertMessages.java |  13 +-
 .../bigquery/StorageApiDynamicDestinations.java|  23 +-
 .../StorageApiDynamicDestinationsBeamRow.java  |  27 +-
 .../StorageApiDynamicDestinationsTableRow.java | 162 ---
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  73 +++--
 .../io/gcp/bigquery/StorageApiWritePayload.java}   |  16 +-
 .../StorageApiWriteRecordsInconsistent.java|  26 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 138 +-
 .../bigquery/StorageApiWritesShardedRecords.java   |  58 ++--
 .../io/gcp/bigquery/TableRowToStorageApiProto.java |  88 --
 .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 296 +
 .../bigquery/TwoLevelMessageConverterCache.java|   4 +
 .../sdk/io/gcp/testing/FakeDatasetService.java |  80 --
 .../beam/sdk/io/gcp/testing/FakeJobService.java|  18 +-
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java|   5 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 126 -
 .../bigquery/TableRowToStorageApiProtoTest.java|  11 +-
 23 files changed, 1027 insertions(+), 333 deletions(-)
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/{CreateTables.java
 => CreateTableDestinations.java} (52%)
 copy 
sdks/java/io/{jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteResult.java
 => 
google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java}
 (66%)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java



  1   2   3   4   5   >