(beam) branch master updated (a85b0a636dc -> 7d5c97320b9)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from a85b0a636dc Filter out old actions runs from dashboard (#32347) add 7d5c97320b9 Add predicate to control which columns are propagated when propagating successful rows (#32312) No new revisions were added by this update. Summary of changes: .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 6 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 31 ++- .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 11 +++ .../StorageApiDynamicDestinationsProto.java| 4 +- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 7 ++ .../StorageApiWriteRecordsInconsistent.java| 7 ++ .../bigquery/StorageApiWriteUnshardedRecords.java | 24 - .../bigquery/StorageApiWritesShardedRecords.java | 17 +++- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 30 -- .../sdk/io/gcp/testing/FakeDatasetService.java | 5 +- .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 3 + .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 103 + 12 files changed, 229 insertions(+), 19 deletions(-)
(beam) branch master updated: Merge pull request #28050: Don't invalidate streams on quota errors
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 8afdda64983 Merge pull request #28050: Don't invalidate streams on quota errors 8afdda64983 is described below commit 8afdda64983d92e78370ee2bd08176519e63150a Author: Reuven Lax AuthorDate: Thu Nov 2 13:13:56 2023 -0700 Merge pull request #28050: Don't invalidate streams on quota errors --- .../src/main/java/org/apache/beam/sdk/io/FileIO.java | 2 +- .../io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 16 +--- .../io/gcp/bigquery/StorageApiWritesShardedRecords.java | 12 +--- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 2d28279f90b..76fc1a70b78 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -227,7 +227,7 @@ import org.slf4j.LoggerFactory; * {@link Sink}, e.g. write different elements to Avro files in different directories with different * schemas. * - * This feature is supported by {@link #writeDynamic}. Use {@link Write#by} to specify how to + * This feature is supported by {@link #writeDynamic}. Use {@link Write#by} to specify how too * partition the elements into groups ("destinations"). Then elements will be grouped by * destination, and {@link Write#withNaming(Contextful)} and {@link Write#via(Contextful)} will be * applied separately within each group, i.e. different groups will be written using the file naming diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index df79ece207f..21c2a485c27 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -711,7 +711,19 @@ public class StorageApiWriteUnshardedRecords retrieveErrorDetails(contexts)); failedContext.failureCount += 1; - invalidateWriteStream(); + boolean quotaError = false; + Throwable error = failedContext.getError(); + Status.Code statusCode = Status.Code.OK; + if (error != null) { +statusCode = Status.fromThrowable(error).getCode(); +quotaError = statusCode.equals(Status.Code.RESOURCE_EXHAUSTED); + } + + if (!quotaError) { +// This forces us to close and reopen all gRPC connections to Storage API on error, +// which empirically fixes random stuckness issues. +invalidateWriteStream(); + } // Maximum number of times we retry before we fail the work item. if (failedContext.failureCount > 5) { @@ -720,8 +732,6 @@ public class StorageApiWriteUnshardedRecords // The following errors are known to be persistent, so always fail the work item in // this case. - Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); - Status.Code statusCode = Status.fromThrowable(error).getCode(); if (statusCode.equals(Status.Code.OUT_OF_RANGE) || statusCode.equals(Status.Code.ALREADY_EXISTS)) { throw new RuntimeException( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 77d9e8023a0..f4982396e9d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -689,7 +689,7 @@ public class StorageApiWritesShardedRecords
(beam) branch master updated (2e52a6559a8 -> 2a96cedca15)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 2e52a6559a8 Fix bigtable_it_test instance naming (#29249) add 97354ee83c2 Change WindmillStateReader to not batch OrderedListFetches for the same family and tag. Fix issue with MultimapState delayed fetches due to batching. new 2a96cedca15 Merge pull request #28371: Change WindmillStateReader to not batch OrderedListFetches for the same family and tag. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../worker/windmill/state/WindmillStateReader.java | 112 ++--- .../worker/windmill/state/WrappedFuture.java | 4 +- .../windmill/state/WindmillStateReaderTest.java| 96 +++--- 3 files changed, 137 insertions(+), 75 deletions(-)
(beam) 01/01: Merge pull request #28371: Change WindmillStateReader to not batch OrderedListFetches for the same family and tag.
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 2a96cedca1516d11d3faa3a10387cd6c34a85b8c Merge: 2e52a6559a8 97354ee83c2 Author: Reuven Lax AuthorDate: Wed Nov 1 13:38:07 2023 -0700 Merge pull request #28371: Change WindmillStateReader to not batch OrderedListFetches for the same family and tag. .../worker/windmill/state/WindmillStateReader.java | 112 ++--- .../worker/windmill/state/WrappedFuture.java | 4 +- .../windmill/state/WindmillStateReaderTest.java| 96 +++--- 3 files changed, 137 insertions(+), 75 deletions(-)
(beam) branch master updated (d5fc02479c2 -> 8648e583c84)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from d5fc02479c2 [YAML] Add a basic aggregating transform to Beam Yaml. (#29167) add 8648e583c84 Merge pull request #29114: Support default values in storage-api sink No new revisions were added by this update. Summary of changes: .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 7 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 26 +- .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 6 +- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 7 +- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 17 +- .../StorageApiWriteRecordsInconsistent.java| 9 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 17 +- .../bigquery/StorageApiWritesShardedRecords.java | 24 +- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 23 +- .../sdk/io/gcp/testing/FakeDatasetService.java | 6 +- .../bigquery/StorageApiSinkDefaultValuesIT.java| 317 + 11 files changed, 432 insertions(+), 27 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkDefaultValuesIT.java
[beam] 01/01: Merge pull request #28171: Properly handle in-flight deletes followed by adds in OrderedListState
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 5e4b9bbae5ea508c9298fdfd644d1fe3b6adfbd3 Merge: f86d9e2d012 f2f5bf6a293 Author: Reuven Lax AuthorDate: Sun Sep 10 17:24:08 2023 -0700 Merge pull request #28171: Properly handle in-flight deletes followed by adds in OrderedListState .../dataflow/worker/WindmillStateInternals.java| 9 +-- .../worker/WindmillStateInternalsTest.java | 65 ++ 2 files changed, 70 insertions(+), 4 deletions(-)
[beam] branch master updated (f86d9e2d012 -> 5e4b9bbae5e)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from f86d9e2d012 update concurrency group definition (#28373) add f2f5bf6a293 Don't improperly filter newly-added elements that overlap with a delete. new 5e4b9bbae5e Merge pull request #28171: Properly handle in-flight deletes followed by adds in OrderedListState The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../dataflow/worker/WindmillStateInternals.java| 9 +-- .../worker/WindmillStateInternalsTest.java | 65 ++ 2 files changed, 70 insertions(+), 4 deletions(-)
[beam] branch master updated: Merge pull request #28124: Allow using CREATE_IF_NEEDED when writing deletes or updates to BigQuery
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new fef4cf8a028 Merge pull request #28124: Allow using CREATE_IF_NEEDED when writing deletes or updates to BigQuery fef4cf8a028 is described below commit fef4cf8a0289e060ca05b6808beaf5734707d8a2 Author: Reuven Lax AuthorDate: Thu Aug 24 15:45:15 2023 -0700 Merge pull request #28124: Allow using CREATE_IF_NEEDED when writing deletes or updates to BigQuery --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 39 .../sdk/io/gcp/bigquery/CreateTableHelpers.java| 9 + .../beam/sdk/io/gcp/bigquery/CreateTables.java | 5 +++ .../sdk/io/gcp/bigquery/DynamicDestinations.java | 9 + .../gcp/bigquery/DynamicDestinationsHelpers.java | 30 .../bigquery/StorageApiDynamicDestinations.java| 36 ++- .../bigquery/StorageApiWriteUnshardedRecords.java | 1 + .../bigquery/StorageApiWritesShardedRecords.java | 4 ++- .../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 42 +- 9 files changed, 100 insertions(+), 75 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 96da67321cb..58d76931244 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -30,6 +30,7 @@ import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableCell; +import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; @@ -510,7 +511,8 @@ import org.slf4j.LoggerFactory; *.apply(BigQueryIO.applyRowMutations() * .to(my_project:my_dataset.my_table) * .withSchema(schema) - * .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER)); + * .withPrimaryKey(ImmutableList.of("field1", "field2")) + * .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)); * } * * If writing a type other than TableRow (e.g. using {@link BigQueryIO#writeGenericRecords} or @@ -523,12 +525,17 @@ import org.slf4j.LoggerFactory; * cdcEvent.apply(BigQueryIO.write() * .to("my-project:my_dataset.my_table") * .withSchema(schema) + * .withPrimaryKey(ImmutableList.of("field1", "field2")) * .withFormatFunction(CdcEvent::getTableRow) * .withRowMutationInformationFn(cdc -> RowMutationInformation.of(cdc.getChangeType(), * cdc.getSequenceNumber())) * .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE) - * .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER)); + * .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)); * } + * + * Note that in order to use inserts or deletes, the table must bet set up with a primary key. If + * the table is not previously created and CREATE_IF_NEEDED is used, a primary key must be + * specified. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20506) @@ -2318,6 +2325,8 @@ public class BigQueryIO { abstract @Nullable String getKmsKey(); +abstract @Nullable List getPrimaryKey(); + abstract Boolean getOptimizeWrites(); abstract Boolean getUseBeamSchema(); @@ -2416,7 +2425,9 @@ public class BigQueryIO { abstract Builder setIgnoreInsertIds(Boolean ignoreInsertIds); - abstract Builder setKmsKey(String kmsKey); + abstract Builder setKmsKey(@Nullable String kmsKey); + + abstract Builder setPrimaryKey(@Nullable List primaryKey); abstract Builder setOptimizeWrites(Boolean optimizeWrites); @@ -2947,6 +2958,10 @@ public class BigQueryIO { return toBuilder().setKmsKey(kmsKey).build(); } +public Write withPrimaryKey(List primaryKey) { + return toBuilder().setPrimaryKey(primaryKey).build(); +} + /** * If true, enables new codepaths that are expected to use less resources while writing to * BigQuery. Not enabled by default in order to maintain backwards compatibility. @@ -3241,6 +3256,7 @@ public class BigQueryIO { LOG.warn("Setting the numb
[beam] branch master updated: Merge pull request #27866: Allow writing protos directly to the storage API without conversion
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ba04f6098f7 Merge pull request #27866: Allow writing protos directly to the storage API without conversion ba04f6098f7 is described below commit ba04f6098f7472a123c6cd7b8d4a991ce16d1958 Author: Reuven Lax AuthorDate: Thu Aug 10 09:15:58 2023 -0700 Merge pull request #27866: Allow writing protos directly to the storage API without conversion --- .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 32 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 96 + .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 5 +- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 8 +- .../bigquery/StorageApiDynamicDestinations.java| 3 + .../StorageApiDynamicDestinationsBeamRow.java | 6 + ...StorageApiDynamicDestinationsGenericRecord.java | 7 + .../StorageApiDynamicDestinationsProto.java| 100 + .../StorageApiDynamicDestinationsTableRow.java | 7 + .../bigquery/StorageApiWriteUnshardedRecords.java | 66 +++- .../bigquery/StorageApiWritesShardedRecords.java | 29 +- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 83 +++- .../sdk/io/gcp/testing/FakeDatasetService.java | 7 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 111 +- .../bigquery/StorageApiDirectWriteProtosIT.java| 197 ++ .../bigquery/TableRowToStorageApiProtoTest.java| 418 - 16 files changed, 1109 insertions(+), 66 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index b8a7fc760bc..46c25d47e7a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -22,9 +22,9 @@ import com.google.auto.value.AutoValue; import com.google.auto.value.extension.memoized.Memoized; import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.protobuf.ByteString; +import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import java.util.function.Consumer; import java.util.function.Supplier; @@ -49,7 +49,7 @@ abstract class AppendClientInfo { abstract @Nullable String getStreamName(); - abstract Descriptors.Descriptor getDescriptor(); + abstract DescriptorProtos.DescriptorProto getDescriptor(); @AutoValue.Builder abstract static class Builder { @@ -63,7 +63,7 @@ abstract class AppendClientInfo { abstract Builder setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value); -abstract Builder setDescriptor(Descriptors.Descriptor value); +abstract Builder setDescriptor(DescriptorProtos.DescriptorProto value); abstract Builder setStreamName(@Nullable String name); @@ -74,8 +74,8 @@ abstract class AppendClientInfo { static AppendClientInfo of( TableSchema tableSchema, - Consumer closeAppendClient, - boolean includeCdcColumns) + DescriptorProtos.DescriptorProto descriptor, + Consumer closeAppendClient) throws Exception { return new AutoValue_AppendClientInfo.Builder() .setTableSchema(tableSchema) @@ -83,12 +83,22 @@ abstract class AppendClientInfo { .setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema)) .setSchemaInformation( TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema)) -.setDescriptor( -TableRowToStorageApiProto.getDescriptorFromTableSchema( -tableSchema, true, includeCdcColumns)) +.setDescriptor(descriptor) .build(); } + static AppendClientInfo of( + TableSchema tableSchema, + Consumer closeAppendClient, + boolean includeCdcColumns) + throws Exception { +return of( +tableSchema, +TableRowToStorageApiProto.descriptorSchemaFromTableSchema( +tableSchema, true, includeCdcColumns), +closeAppendClient); + } + public AppendClientInfo withNoAppendClient() { return toBuilder().setStreamAppendClient(null).build(); } @@ -149,8 +159,10 @@ abstract class AppendClientInfo { public TableRow toTableRow(ByteString protoBytes) { try { return TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom(getDescriptor(), protoBytes), true); -} catch (InvalidProtocolBufferException e
[beam] branch master updated (488279c236e -> 1f90261aef4)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 488279c236e Bump commons-io to 2.13.0 (#27776) add b40ca796d8d propagate fewKeys to hot-key path new 1f90261aef4 Merge pull request #27845: propagate fewKeys to hot-key path The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/beam/sdk/transforms/Combine.java| 23 +- 1 file changed, 18 insertions(+), 5 deletions(-)
[beam] 01/01: Merge pull request #27845: propagate fewKeys to hot-key path
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 1f90261aef435d947db5eff30172afa3e7d4fd27 Merge: 488279c236e b40ca796d8d Author: Reuven Lax AuthorDate: Fri Aug 4 00:15:46 2023 -0700 Merge pull request #27845: propagate fewKeys to hot-key path .../org/apache/beam/sdk/transforms/Combine.java| 23 +- 1 file changed, 18 insertions(+), 5 deletions(-)
[beam] branch master updated: Merge pull request #27835: Enable combiner lifting for the Group transform
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 86426c814db Merge pull request #27835: Enable combiner lifting for the Group transform 86426c814db is described below commit 86426c814db19736f485d7be9a24ac571d4a2b1a Author: Reuven Lax AuthorDate: Thu Aug 3 14:38:38 2023 -0700 Merge pull request #27835: Enable combiner lifting for the Group transform --- .../apache/beam/sdk/schemas/transforms/Group.java| 20 +++- .../java/org/apache/beam/sdk/transforms/Combine.java | 8 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java index fe8933d24d5..0115c4900ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java @@ -998,6 +998,8 @@ public class Group { abstract @Nullable Fanout getFanout(); +abstract Boolean getFewKeys(); + abstract ByFields getByFields(); abstract SchemaAggregateFn.Inner getSchemaAggregateFn(); @@ -1012,6 +1014,8 @@ public class Group { abstract static class Builder { public abstract Builder setFanout(@Nullable Fanout value); + public abstract Builder setFewKeys(Boolean fewKeys); + abstract Builder setByFields(ByFields byFields); abstract Builder setSchemaAggregateFn(SchemaAggregateFn.Inner schemaAggregateFn); @@ -1033,6 +1037,9 @@ public class Group { .setSchemaAggregateFn(schemaAggregateFn) .setKeyField(keyField) .setValueField(valueField) + .setFewKeys( + true) // We are often selecting only certain fields, so combiner lifting usually + // helps. .build(); } @@ -1046,6 +1053,17 @@ public class Group { return toBuilder().setValueField(valueField).build(); } +/** + * Enable precombining. + * + * This is on by default. In certain cases (e.g. if there are many unique field values and + * the combiner's intermediate state is larger than the average row size) precombining makes + * things worse, in which case it can be turned off. + */ +public CombineFieldsByFields withPrecombining(boolean value) { + return toBuilder().setFewKeys(value).build(); +} + public CombineFieldsByFields withHotKeyFanout(int n) { return toBuilder().setFanout(Fanout.of(n)).build(); } @@ -1267,7 +1285,7 @@ public class Group { throw new RuntimeException("Unexpected kind: " + fanout.getKind()); } } - return Combine.perKey(fn); + return getFewKeys() ? Combine.fewKeys(fn) : Combine.perKey(fn); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 422450c89d0..ff5a8268ab7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -202,6 +203,13 @@ public class Combine { return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/); } + /** Returns a {@link PerKey Combine.PerKey}, and set fewKeys in {@link GroupByKey}. */ + @Internal + public static PerKey fewKeys( + GlobalCombineFn fn) { +return new PerKey<>(fn, displayDataForFn(fn), true /*fewKeys*/); + } + /** Returns a {@link PerKey Combine.PerKey}, and set fewKeys in {@link GroupByKey}. */ private static PerKey fewKeys( GlobalCombineFn fn,
[beam] branch master updated (0c15645a561 -> 42cdd61f8a5)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 0c15645a561 Explicitly load go license container locally (#27824) add 4a8fc73792d override encoding positions for nested schemas new 42cdd61f8a5 Merge pull request #27774: Override encoding positions for nested schemas The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../util/RowCoderCloudObjectTranslator.java| 7 +--- .../util/SchemaCoderCloudObjectTranslator.java | 44 -- 2 files changed, 41 insertions(+), 10 deletions(-)
[beam] 01/01: Merge pull request #27774: Override encoding positions for nested schemas
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 42cdd61f8a522869709f729d4f4b0dcc470263f7 Merge: 0c15645a561 4a8fc73792d Author: Reuven Lax AuthorDate: Wed Aug 2 23:35:19 2023 -0700 Merge pull request #27774: Override encoding positions for nested schemas .../util/RowCoderCloudObjectTranslator.java| 7 +--- .../util/SchemaCoderCloudObjectTranslator.java | 44 -- 2 files changed, 41 insertions(+), 10 deletions(-)
[beam] branch master updated (66f0592f8b4 -> 90f2b3445ff)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 66f0592f8b4 Add flags to build multiarch Go SDK containers and fix Go Dataflow ARM github actions test (#27716) add 5989a662dc9 populate name map during snapshot restore new 90f2b3445ff Merge pull request #27638: Populate name map during snapshot restore The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 1 + 1 file changed, 1 insertion(+)
[beam] 01/01: Merge pull request #27638: Populate name map during snapshot restore
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 90f2b3445ff4b0da50ecc022b46f5da90b2183f3 Merge: 66f0592f8b4 5989a662dc9 Author: Reuven Lax AuthorDate: Thu Jul 27 19:05:23 2023 -0700 Merge pull request #27638: Populate name map during snapshot restore .../src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 1 + 1 file changed, 1 insertion(+)
[beam] branch master updated (d241eaeeb94 -> 1f4defab42a)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from d241eaeeb94 Make all Github Action tests to run at minute 0 of every 6 hours (#27701) add 2e958499e5d lowercase everything new 1f4defab42a Merge pull request #27699: Don't make StorageAPI tablerow names case-sensitive The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java | 6 +++--- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java| 2 +- 2 files changed, 4 insertions(+), 4 deletions(-)
[beam] 01/01: Merge pull request #27699: Don't make StorageAPI tablerow names case-sensitive
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 1f4defab42a67cba13154b46ad15a87c63ba3ec9 Merge: d241eaeeb94 2e958499e5d Author: Reuven Lax AuthorDate: Wed Jul 26 15:40:04 2023 -0700 Merge pull request #27699: Don't make StorageAPI tablerow names case-sensitive .../apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java | 6 +++--- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java| 2 +- 2 files changed, 4 insertions(+), 4 deletions(-)
[beam] 01/01: Merge pull request #27590: Properly translate TimestampedValueCoder on runnerv1
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit cbe2e0f5ad5e45a7f6d380b1b1ac36d5dede8f54 Merge: 05305ede453 52807b6e1b6 Author: Reuven Lax AuthorDate: Sat Jul 22 18:59:04 2023 -0700 Merge pull request #27590: Properly translate TimestampedValueCoder on runnerv1 .../dataflow/util/CloudObjectTranslators.java | 34 ++ ...DefaultCoderCloudObjectTranslatorRegistrar.java | 1 + .../runners/dataflow/util/CloudObjectsTest.java| 2 ++ 3 files changed, 37 insertions(+)
[beam] branch master updated (05305ede453 -> cbe2e0f5ad5)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 05305ede453 Merge pull request #27617: Support withFanout and withHotKeyFanout on schema group transform add 52807b6e1b6 properly translate TimestampedValueCoder on runnerv1 new cbe2e0f5ad5 Merge pull request #27590: Properly translate TimestampedValueCoder on runnerv1 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../dataflow/util/CloudObjectTranslators.java | 34 ++ ...DefaultCoderCloudObjectTranslatorRegistrar.java | 1 + .../runners/dataflow/util/CloudObjectsTest.java| 2 ++ 3 files changed, 37 insertions(+)
[beam] branch master updated: Merge pull request #27617: Support withFanout and withHotKeyFanout on schema group transform
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 05305ede453 Merge pull request #27617: Support withFanout and withHotKeyFanout on schema group transform 05305ede453 is described below commit 05305ede45366f158f27fc2b83b9ce00db4df2ab Author: Reuven Lax AuthorDate: Sat Jul 22 10:40:43 2023 -0700 Merge pull request #27617: Support withFanout and withHotKeyFanout on schema group transform --- .../apache/beam/sdk/schemas/transforms/Group.java | 162 - .../beam/sdk/schemas/transforms/GroupTest.java | 96 +--- .../sdk/extensions/sql/impl/rel/BeamWindowRel.java | 5 +- 3 files changed, 201 insertions(+), 62 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java index fb48ceed311..fe8933d24d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java @@ -17,7 +17,9 @@ */ package org.apache.beam.sdk.schemas.transforms; +import com.google.auto.value.AutoOneOf; import com.google.auto.value.AutoValue; +import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.schemas.FieldAccessDescriptor; @@ -34,6 +36,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -42,6 +45,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.checkerframework.checker.nullness.qual.Nullable; /** * A generic grouping transform for schema {@link PCollection}s. @@ -153,7 +157,7 @@ public class Group { */ public CombineGlobally aggregate( CombineFn combineFn) { - return new CombineGlobally<>(combineFn); + return new CombineGlobally<>(combineFn, 0); } /** @@ -169,10 +173,8 @@ public class Group { return new CombineFieldsGlobally<>( SchemaAggregateFn.create() .aggregateFields( - FieldAccessDescriptor.withFieldNames(inputFieldName), - false, - fn, - outputFieldName)); + FieldAccessDescriptor.withFieldNames(inputFieldName), false, fn, outputFieldName), + 0); } public @@ -183,7 +185,8 @@ public class Group { return new CombineFieldsGlobally<>( SchemaAggregateFn.create() .aggregateFields( - FieldAccessDescriptor.withFieldNames(inputFieldName), true, fn, outputFieldName)); + FieldAccessDescriptor.withFieldNames(inputFieldName), true, fn, outputFieldName), + 0); } /** The same as {@link #aggregateField} but using field id. */ @@ -194,7 +197,8 @@ public class Group { return new CombineFieldsGlobally<>( SchemaAggregateFn.create() .aggregateFields( - FieldAccessDescriptor.withFieldIds(inputFieldId), false, fn, outputFieldName)); + FieldAccessDescriptor.withFieldIds(inputFieldId), false, fn, outputFieldName), + 0); } public @@ -205,7 +209,8 @@ public class Group { return new CombineFieldsGlobally<>( SchemaAggregateFn.create() .aggregateFields( - FieldAccessDescriptor.withFieldIds(inputFieldId), true, fn, outputFieldName)); + FieldAccessDescriptor.withFieldIds(inputFieldId), true, fn, outputFieldName), + 0); } /** @@ -221,7 +226,8 @@ public class Group { return new CombineFieldsGlobally<>( SchemaAggregateFn.create() .aggregateFields( - FieldAccessDescriptor.withFieldNames(inputFieldName), false, fn, outputField)); + FieldAccessDescriptor.withFieldNames(inputFieldName), false, fn, outputField), + 0); } public @@ -232,7 +238,8 @@ public class Group { return new CombineFieldsGlobally<>( SchemaAggregateFn.create() .aggregateFields( - FieldAccessDescriptor.withFieldNames(inputFieldName), true, fn, outputField)); +
[beam] branch master updated: Merge pull request #26849: add attribute support to writeAvros and writeProtos
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new d998ba3475f Merge pull request #26849: add attribute support to writeAvros and writeProtos d998ba3475f is described below commit d998ba3475f67f66716d54f784ba46f5efa88cd7 Author: Reuven Lax AuthorDate: Tue Jun 13 14:48:35 2023 -0700 Merge pull request #26849: add attribute support to writeAvros and writeProtos --- .../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 14 - .../sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java | 12 ++--- .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java| 63 ++ 3 files changed, 73 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java index 4479323d66f..7c63b9023e3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java @@ -26,8 +26,10 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; @@ -72,7 +74,7 @@ public final class ExternalWrite implements ExternalTransformRegistrar { @Override public PTransform, PDone> buildExternal(Configuration config) { PubsubIO.Write.Builder writeBuilder = - PubsubIO.Write.newBuilder(new ParsePubsubMessageProtoAsPayload()); + PubsubIO.Write.newBuilder(new ParsePubsubMessageProtoAsPayloadFromWindowedValue()); if (config.topic != null) { StaticValueProvider topic = StaticValueProvider.of(config.topic); writeBuilder.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)); @@ -87,4 +89,14 @@ public final class ExternalWrite implements ExternalTransformRegistrar { return writeBuilder.build(); } } + + public static class ParsePubsubMessageProtoAsPayloadFromWindowedValue + implements SerializableFunction, PubsubMessage> { +static final ParsePubsubMessageProtoAsPayload INNER = new ParsePubsubMessageProtoAsPayload(); + +@Override +public PubsubMessage apply(ValueInSingleWindow input) { + return INNER.apply(input.getValue()); +} + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java index c082b2007aa..26f4fb5d076 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java @@ -38,7 +38,7 @@ public class PreparePubsubWriteDoFn extends DoFn private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6; private int maxPublishBatchSize; - private SerializableFunction formatFunction; + private SerializableFunction, PubsubMessage> formatFunction; @Nullable SerializableFunction, PubsubIO.PubsubTopic> topicFunction; static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchSize) @@ -110,7 +110,7 @@ public class PreparePubsubWriteDoFn extends DoFn } PreparePubsubWriteDoFn( - SerializableFunction formatFunction, + SerializableFunction, PubsubMessage> formatFunction, @Nullable SerializableFunction, PubsubIO.PubsubTopic> topicFunction, int maxPublishBatchSize) { @@ -126,11 +126,11 @@ public class PreparePubsubWriteDoFn extends DoFn BoundedWindow window, PaneInfo paneInfo, OutputReceiver o) { -PubsubMessage message = formatFunction.apply(element); +ValueInSingleWindow valueInSingleWindow = +ValueInSingleWindow.of(element, ts, window, paneInfo); +PubsubMessage message = formatFunction.apply(valueInSingleWindow); if (topicFunction != null) { - message = - message.withTopic( - topicFunction.apply(ValueInSingleWindow.of(element, ts, window, paneInfo)).asPath()); + message =
[beam] branch master updated (92e33e8e6a8 -> 1e49c512ac1)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 92e33e8e6a8 Bump google.golang.org/api from 0.126.0 to 0.127.0 in /sdks (#27112) add 1e49c512ac1 Merge pull request #26975: Add upsert and delete support to BigQueryIO No new revisions were added by this update. Summary of changes: ...DefaultCoderCloudObjectTranslatorRegistrar.java | 2 + .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 17 +- .../AvroGenericRecordToStorageApiProto.java| 24 +- .../io/gcp/bigquery/BeamRowToStorageApiProto.java | 18 +- .../bigquery/BigQueryCoderProviderRegistrar.java | 4 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 111 ++- .../beam/sdk/io/gcp/bigquery/RowMutation.java | 69 .../io/gcp/bigquery/RowMutationInformation.java| 46 +++ .../beam/sdk/io/gcp/bigquery/StorageApiCDC.java} | 26 +- .../io/gcp/bigquery/StorageApiConvertMessages.java | 26 +- .../bigquery/StorageApiDynamicDestinations.java| 7 +- .../StorageApiDynamicDestinationsBeamRow.java | 39 ++- ...StorageApiDynamicDestinationsGenericRecord.java | 38 ++- .../StorageApiDynamicDestinationsTableRow.java | 40 ++- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 26 +- .../StorageApiWriteRecordsInconsistent.java| 8 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 62 +++- .../bigquery/StorageApiWritesShardedRecords.java | 7 +- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 106 +-- .../beam/sdk/io/gcp/testing/BigqueryClient.java| 4 + .../sdk/io/gcp/testing/FakeDatasetService.java | 100 +- .../beam/sdk/io/gcp/testing/TableContainer.java| 120 ++- .../AvroGenericRecordToStorageApiProtoTest.java| 43 ++- .../gcp/bigquery/BeamRowToStorageApiProtoTest.java | 34 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 353 + .../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 161 ++ .../bigquery/TableRowToStorageApiProtoTest.java| 86 +++-- 27 files changed, 1407 insertions(+), 170 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java copy sdks/java/{extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryPerfTableProvider.java => io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java} (70%) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
[beam] branch master updated: Merge pull request #27047: Enable pubsub dynamic destinations by default
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c3b330c66c6 Merge pull request #27047: Enable pubsub dynamic destinations by default c3b330c66c6 is described below commit c3b330c66c6e098a3a3a9505d3b4eaddc9ccb9c0 Author: Reuven Lax AuthorDate: Wed Jun 7 14:45:18 2023 -0700 Merge pull request #27047: Enable pubsub dynamic destinations by default --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java| 9 + .../beam/runners/dataflow/options/DataflowPipelineOptions.java | 7 --- .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 1 - 3 files changed, 1 insertion(+), 16 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 2951b48bab4..60ebe8d8846 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1916,14 +1916,7 @@ public class DataflowRunner extends PipelineRunner { ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName()); } } else { -DataflowPipelineOptions options = -input.getPipeline().getOptions().as(DataflowPipelineOptions.class); -if (options.getEnableDynamicPubsubDestinations()) { - stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true); -} else { - throw new RuntimeException( - "Dynamic Pubsub destinations not yet supported. Topic must be set."); -} +stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true); } if (overriddenTransform.getTimestampAttribute() != null) { stepContext.addInput( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index bf1027cdd3a..f87af28ca61 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -153,13 +153,6 @@ public interface DataflowPipelineOptions void setDataflowWorkerJar(String dataflowWorkerJafr); - // Disable this support for now until the Dataflow backend fully supports this option. - @Description("Whether to allow dynamic pubsub destinations. Temporary option: will be removed.") - @Default.Boolean(false) - Boolean getEnableDynamicPubsubDestinations(); - - void setEnableDynamicPubsubDestinations(Boolean enable); - /** Set of available Flexible Resource Scheduling goals. */ enum FlexResourceSchedulingGoal { /** No goal specified. */ diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index d16afeb91e0..84f1069f592 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -2505,7 +2505,6 @@ public class DataflowRunnerTest implements Serializable { PipelineOptions options = buildPipelineOptions(); DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); dataflowOptions.setStreaming(true); -dataflowOptions.setEnableDynamicPubsubDestinations(true); Pipeline p = Pipeline.create(options); List testValues =
[beam] branch master updated: Merge pull request #26919: warn if BigQuery failed rows collection is not processed
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new fbeae980e93 Merge pull request #26919: warn if BigQuery failed rows collection is not processed fbeae980e93 is described below commit fbeae980e93ea64fc8cc3ad074cbc8aebd157691 Author: Reuven Lax AuthorDate: Sun May 28 20:06:27 2023 -0700 Merge pull request #26919: warn if BigQuery failed rows collection is not processed --- .../beam/runners/dataflow/DataflowRunner.java | 56 ++ .../beam/runners/dataflow/DataflowRunnerTest.java | 89 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 +- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 5 ++ .../sdk/io/gcp/bigquery/StreamingWriteTables.java | 8 +- 5 files changed, 157 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a2f6298ab10..2951b48bab4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -105,6 +105,8 @@ import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.WriteFilesResult; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.gcp.bigquery.StorageApiLoads; +import org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteTables; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; @@ -169,6 +171,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Utf8; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files; @@ -1069,6 +1072,7 @@ public class DataflowRunner extends PipelineRunner { } logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); +logWarningIfBigqueryDLQUnused(pipeline); if (shouldActAsStreaming(pipeline)) { options.setStreaming(true); @@ -1569,6 +1573,58 @@ public class DataflowRunner extends PipelineRunner { / + private void logWarningIfBigqueryDLQUnused(Pipeline pipeline) { +Map, String> unconsumedDLQ = Maps.newHashMap(); +pipeline.traverseTopologically( +new PipelineVisitor.Defaults() { + @Override + public CompositeBehavior enterCompositeTransform(Node node) { +PTransform transform = node.getTransform(); +if (transform != null) { + TupleTag failedTag = null; + String rootBigQueryTransform = ""; + if (transform.getClass().equals(StorageApiLoads.class)) { +StorageApiLoads storageLoads = (StorageApiLoads) transform; +failedTag = storageLoads.getFailedRowsTag(); +// For storage API the transform that outputs failed rows is nested one layer below +// BigQueryIO. +rootBigQueryTransform = node.getEnclosingNode().getFullName(); + } else if (transform.getClass().equals(StreamingWriteTables.class)) { +StreamingWriteTables streamingInserts = (StreamingWriteTables) transform; +failedTag = streamingInserts.getFailedRowsTupleTag(); +// For streaming inserts the transform that outputs failed rows is nested two layers +// below BigQueryIO. +rootBigQueryTransform = node.getEnclosingNode().getEnclosingNode().getFullName(); + } + if (failedTag != null) { +PCollection dlq = node.getOutputs().get(failedTag); +if (dlq != null) { + unconsumedDLQ.put(dlq, rootBigQueryTransform); +} + } +} + +for (PCollection input : node.getInputs().values()) { + unconsumedDLQ.remove(input); +} +return CompositeBehavior.EN
[beam] branch master updated (05c5b1b5bb7 -> d91ea8e4491)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 05c5b1b5bb7 minor fix introduction (#25587) add d91ea8e4491 Merge pull request #26878: make sure that we capture all writes to the failed records in a metric No new revisions were added by this update. Summary of changes: .../io/gcp/bigquery/StorageApiWriteUnshardedRecords.java| 2 ++ .../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java | 13 - 2 files changed, 10 insertions(+), 5 deletions(-)
[beam] branch master updated: Merge pull request #25113: #25112 No longer use GetTable to implement CREATE_IF_NEEDED to avoid low quotas
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2ee12027627 Merge pull request #25113: #25112 No longer use GetTable to implement CREATE_IF_NEEDED to avoid low quotas 2ee12027627 is described below commit 2ee12027627dd842c52053bec833e0c56d9ebf51 Author: Reuven Lax AuthorDate: Mon May 22 15:34:53 2023 -0700 Merge pull request #25113: #25112 No longer use GetTable to implement CREATE_IF_NEEDED to avoid low quotas --- .../io/gcp/bigquery/CreateTableDestinations.java | 128 .../sdk/io/gcp/bigquery/CreateTableHelpers.java| 50 ++- .../beam/sdk/io/gcp/bigquery/CreateTables.java | 2 +- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 77 +- .../StorageApiWriteRecordsInconsistent.java| 12 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 165 ++--- .../bigquery/StorageApiWritesShardedRecords.java | 47 +- .../sdk/io/gcp/testing/FakeDatasetService.java | 42 -- .../bigquery/StorageApiSinkCreateIfNeededIT.java | 140 + 9 files changed, 404 insertions(+), 259 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java deleted file mode 100644 index 0c2babdeef8..000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigquery; - -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import com.google.api.services.bigquery.model.TableSchema; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; -import org.checkerframework.checker.nullness.qual.Nullable; - -/** - * Creates any tables needed before performing writes to the tables. This is a side-effect {@link - * DoFn}, and returns the original collection unchanged. - */ -public class CreateTableDestinations -extends PTransform< -PCollection>, PCollection>> { - private final CreateDisposition createDisposition; - private final BigQueryServices bqServices; - private final DynamicDestinations dynamicDestinations; - private final @Nullable String kmsKey; - - public CreateTableDestinations( - CreateDisposition createDisposition, - DynamicDestinations dynamicDestinations) { -this(createDisposition, new BigQueryServicesImpl(), dynamicDestinations, null); - } - - public CreateTableDestinations( - CreateDisposition createDisposition, - BigQueryServices bqServices, - DynamicDestinations dynamicDestinations, - @Nullable String kmsKey) { -this.createDisposition = createDisposition; -this.bqServices = bqServices; -this.dynamicDestinations = dynamicDestinations; -this.kmsKey = kmsKey; - } - - CreateTableDestinations withKmsKey(String kmsKey) { -return new CreateTableDestinations<>( -createDisposition, bqServices, dynamicDestinations, kmsKey); - } - - CreateTableDestinations withTestServices(BigQueryServices bqServices) { -return new CreateTableDestinations<>( -createDisposition, bqServices, dynam
[beam] branch master updated: Merge pull request #26794: #26789 Fix auto schema update when schema order has changed.
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ff166391e2b Merge pull request #26794: #26789 Fix auto schema update when schema order has changed. ff166391e2b is described below commit ff166391e2bcada4d846b5d74b1168dd248ab4ff Author: Reuven Lax AuthorDate: Sun May 21 14:18:46 2023 -0700 Merge pull request #26794: #26789 Fix auto schema update when schema order has changed. --- .../bigquery/StorageApiWriteUnshardedRecords.java | 19 +- .../bigquery/StorageApiWritesShardedRecords.java | 22 +- .../io/gcp/bigquery/TableSchemaUpdateUtils.java| 122 ++ .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 8 +- .../gcp/bigquery/TableSchemaUpdateUtilsTest.java | 253 + 5 files changed, 409 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index f9f9b1b0b92..46b542a84a5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -685,16 +686,22 @@ public class StorageApiWriteUnshardedRecords void postFlush() { // If we got a response indicating an updated schema, recreate the client. -if (this.appendClientInfo != null) { +if (this.appendClientInfo != null && autoUpdateSchema) { @Nullable StreamAppendClient streamAppendClient = appendClientInfo.getStreamAppendClient(); @Nullable - TableSchema updatedTableSchema = + TableSchema updatedTableSchemaReturned = (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null; - if (updatedTableSchema != null && autoUpdateSchema) { -invalidateWriteStream(); -appendClientInfo = -Preconditions.checkStateNotNull(getAppendClientInfo(false, updatedTableSchema)); + if (updatedTableSchemaReturned != null) { +Optional updatedTableSchema = +TableSchemaUpdateUtils.getUpdatedSchema( +this.initialTableSchema, updatedTableSchemaReturned); +if (updatedTableSchema.isPresent()) { + invalidateWriteStream(); + appendClientInfo = + Preconditions.checkStateNotNull( + getAppendClientInfo(false, updatedTableSchema.get())); +} } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index cd23b7be2c5..e0353bf9a90 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -36,6 +36,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -751,16 +752,23 @@ public class StorageApiWritesShardedRecords newSchema = +TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, updatedSchemaReturned); +if (newSchema.isPresent()) { + appendClientInfo.set( + AppendClientInfo.of( + newSchema.get(), appendClientInfo.get().getCloseAppendClient())); + APPEND_CLIENTS.invalidate(element.getKey()); + APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get()); + updatedSchema.write(newSchema.get()); +} } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtils.java new file mode 100644 index 000..cba394abc89 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk
[beam] 01/01: Merge pull request #26752: Only use updated schema if auto update is enabled
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 809d4b3efcc84babeb8b9d206bf1032ccfa070f2 Merge: f2400c828c3 11de4424aa2 Author: Reuven Lax AuthorDate: Thu May 18 22:42:07 2023 -0700 Merge pull request #26752: Only use updated schema if auto update is enabled .../beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated (f2400c828c3 -> 809d4b3efcc)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from f2400c828c3 Don't fail if creating a venv didn't succeed. Also provide a way to disable creating separate venvs. (#26753) add 11de4424aa2 Only use updated schema if auto update is enabled new 809d4b3efcc Merge pull request #26752: Only use updated schema if auto update is enabled The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated (14c24f92d95 -> 85d90fbd757)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 14c24f92d95 Update cloud profile download to use curl instead of wget (#26720) add 85d90fbd757 Merge pull request #26707: Properly tag test so it is excluded on unsupported runners No new revisions were added by this update. Summary of changes: runners/portability/java/build.gradle | 1 + .../core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java| 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-)
[beam] branch master updated (69367347a41 -> 9b1448bcce4)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 69367347a41 Capture build scans on ge.apache.org to benefit from deep build insights (#26351) add 9b1448bcce4 Merge pull request #25940: Fix triggered side inputs No new revisions were added by this update. Summary of changes: .../runners/direct/DirectWriteViewVisitor.java | 28 +++-- runners/flink/flink_runner.gradle | 1 + runners/flink/job-server/flink_job_server.gradle | 1 + .../runners/dataflow/StreamingViewOverrides.java | 22 +++- runners/jet/build.gradle | 3 + runners/samza/build.gradle | 1 + runners/samza/job-server/build.gradle | 3 +- runners/spark/job-server/spark_job_server.gradle | 1 + runners/spark/spark_runner.gradle | 3 + runners/twister2/build.gradle | 1 + .../org/apache/beam/sdk/io/GenerateSequence.java | 2 +- ...reMessage.java => UsesTriggeredSideInputs.java} | 6 +- .../java/org/apache/beam/sdk/transforms/View.java | 1 - .../apache/beam/sdk/values/PCollectionViews.java | 8 +- .../org/apache/beam/sdk/transforms/ParDoTest.java | 8 +- .../org/apache/beam/sdk/transforms/ViewTest.java | 114 + 16 files changed, 164 insertions(+), 39 deletions(-) copy sdks/java/core/src/main/java/org/apache/beam/sdk/testing/{UsesFailureMessage.java => UsesTriggeredSideInputs.java} (80%)
[beam] branch master updated: Merge pull request #26503: fix dataloss bug in batch Storage API sink.
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 57ee20902dd Merge pull request #26503: fix dataloss bug in batch Storage API sink. 57ee20902dd is described below commit 57ee20902dd1be275dc4cf7a752dff3e59db1b31 Author: Reuven Lax AuthorDate: Tue May 2 15:31:24 2023 -0700 Merge pull request #26503: fix dataloss bug in batch Storage API sink. --- .../bigquery/StorageApiWriteUnshardedRecords.java | 67 ++ 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index f0015cddc38..3c082752449 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -31,11 +31,13 @@ import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.ByteString; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; +import io.grpc.Status; import java.io.IOException; import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -224,11 +226,14 @@ public class StorageApiWriteUnshardedRecords ProtoRows protoRows; List timestamps; + int failureCount; + public AppendRowsContext( long offset, ProtoRows protoRows, List timestamps) { this.offset = offset; this.protoRows = protoRows; this.timestamps = timestamps; +this.failureCount = 0; } } @@ -301,17 +306,20 @@ public class StorageApiWriteUnshardedRecords } String getOrCreateStreamName() { -try { - if (!useDefaultStream) { -this.streamName = -Preconditions.checkStateNotNull(maybeDatasetService) -.createWriteStream(tableUrn, Type.PENDING) -.getName(); - } else { -this.streamName = getDefaultStreamName(); +if (Strings.isNullOrEmpty(this.streamName)) { + try { +if (!useDefaultStream) { + this.streamName = + Preconditions.checkStateNotNull(maybeDatasetService) + .createWriteStream(tableUrn, Type.PENDING) + .getName(); + this.currentOffset = 0; +} else { + this.streamName = getDefaultStreamName(); +} + } catch (Exception e) { +throw new RuntimeException(e); } -} catch (Exception e) { - throw new RuntimeException(e); } return this.streamName; } @@ -376,7 +384,6 @@ public class StorageApiWriteUnshardedRecords // This pin is "owned" by the current DoFn. Preconditions.checkStateNotNull(newAppendClientInfo.getStreamAppendClient()).pin(); } -this.currentOffset = 0; nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); this.appendClientInfo = newAppendClientInfo; } @@ -507,6 +514,7 @@ public class StorageApiWriteUnshardedRecords long offset = -1; if (!this.useDefaultStream) { + getOrCreateStreamName(); // Force creation of the stream before we get offsets. offset = this.currentOffset; this.currentOffset += inserts.getSerializedRowsCount(); } @@ -598,7 +606,42 @@ public class StorageApiWriteUnshardedRecords streamName, clientNumber, retrieveErrorDetails(contexts)); + failedContext.failureCount += 1; + + // Maximum number of times we retry before we fail the work item. + if (failedContext.failureCount > 5) { +throw new RuntimeException("More than 5 attempts to call AppendRows failed."); + } + + // The following errors are known to be persistent, so always fail the work item in + // this case. + Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); + Status.Code statusCode = Status.fromThrowable(error).getCode(); + if (statusCode.equals(Status.Code.OUT_OF_RANGE) + || statusCode.equals(Status.Code.ALREADY_EXISTS)) { +
[beam] branch master updated: Merge pull request #26063: #21431 Pubsub dynamic topic destinations
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f549fd33abd Merge pull request #26063: #21431 Pubsub dynamic topic destinations f549fd33abd is described below commit f549fd33abdc672143ccbe3f0f66104995d30fe6 Author: Reuven Lax AuthorDate: Fri Apr 21 21:01:11 2023 -0700 Merge pull request #26063: #21431 Pubsub dynamic topic destinations --- .../core/construction/PTransformTranslation.java | 2 + .../beam/runners/dataflow/DataflowRunner.java | 26 +- .../dataflow/options/DataflowPipelineOptions.java | 9 +- .../beam/runners/dataflow/util/PropertyNames.java | 4 + .../beam/runners/dataflow/DataflowRunnerTest.java | 74 + .../runners/dataflow/worker/PubsubDynamicSink.java | 163 ++ .../dataflow/worker/PubsubDynamicSinkTest.java | 163 ++ .../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 1 + .../sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java | 142 .../io/gcp/pubsub/PubSubPayloadTranslation.java| 39 ++- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 19 +- .../beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java | 4 +- .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java| 357 - .../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java | 13 +- .../beam/sdk/io/gcp/pubsub/PubsubMessage.java | 27 +- .../io/gcp/pubsub/PubsubMessageWithTopicCoder.java | 72 + .../beam/sdk/io/gcp/pubsub/PubsubTestClient.java | 27 +- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 194 --- .../io/gcp/pubsub/PreparePubsubWriteDoFnTest.java | 135 .../pubsub/PubSubWritePayloadTranslationTest.java | 41 +++ .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java| 3 +- .../sdk/io/gcp/pubsub/PubsubIOExternalTest.java| 4 +- .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 183 +-- .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java| 9 +- .../sdk/io/gcp/pubsub/PubsubTestClientTest.java| 9 +- .../sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java | 82 - 26 files changed, 1472 insertions(+), 330 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index e701ae60bb5..485350715c9 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -107,6 +107,8 @@ public class PTransformTranslation { public static final String PUBSUB_READ = "beam:transform:pubsub_read:v1"; public static final String PUBSUB_WRITE = "beam:transform:pubsub_write:v1"; + public static final String PUBSUB_WRITE_DYNAMIC = "beam:transform:pubsub_write:v2"; + // CombineComponents public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN = "beam:transform:combine_per_key_precombine:v1"; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index dfa3d37a400..2c24df1852b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1797,8 +1797,7 @@ public class DataflowRunner extends PipelineRunner { * Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we can * instead defer to Windmill's implementation. */ - private static class StreamingPubsubIOWrite - extends PTransform, PDone> { + static class StreamingPubsubIOWrite extends PTransform, PDone> { private final PubsubUnboundedSink transform; @@ -1850,13 +1849,24 @@ public class DataflowRunner extends PipelineRunner { StepTranslationContext stepContext, PCollection input) { stepContext.addInput(PropertyNames.FORMAT, "pubsub"); - if (overriddenTransform.getTopicProvider().isAccessible()) { -stepContext.addInput( -PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getFullPath()); + if (overriddenTransform.getTopicProvider() != null) { +if (overriddenTransform.getTopicProvider().isAccessible()) { + stepContext.addInput( + PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getFullPath()); +} else { + stepContext.addInput( + PropertyName
[beam] branch master updated: Merge pull request #26284: Fix GroupIntoBatches hold
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f5f7a471321 Merge pull request #26284: Fix GroupIntoBatches hold f5f7a471321 is described below commit f5f7a471321c903174b452a1982c5183a79ac6bc Author: Reuven Lax AuthorDate: Fri Apr 14 19:24:25 2023 -0700 Merge pull request #26284: Fix GroupIntoBatches hold --- .../java/org/apache/beam/sdk/transforms/GroupIntoBatches.java | 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index 311a3dac6ca..3616cc2e59f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -580,6 +580,7 @@ public class GroupIntoBatches timerTs, minBufferedTs); bufferingTimer.clear(); +holdTimer.clear(); } } @@ -593,13 +594,18 @@ public class GroupIntoBatches @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState storedBatchSizeBytes, @StateId(TIMER_TIMESTAMP) ValueState timerTs, @StateId(MIN_BUFFERED_TS) CombiningState minBufferedTs, -@TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer) { +@TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer, +@TimerId(TIMER_HOLD_ID) Timer holdTimer) { LOG.debug( "*** END OF BUFFERING *** for timer timestamp {} with buffering duration {}", timestamp, maxBufferingDuration); flushBatch( receiver, key, batch, storedBatchSize, storedBatchSizeBytes, timerTs, minBufferedTs); + // Generally this is a noop, since holdTimer is not set if bufferingTimer is set. However we + // delete the holdTimer + // here in order to allow users to modify this policy on pipeline update. + holdTimer.clear(); } @OnWindowExpiration
[beam] branch master updated (5a9ab685d9b -> 241e40f2e9e)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 5a9ab685d9b Extract BundleManager to an Interface in SamzaRunner (#26268) add b6d8ac3d3a3 Optimize counters by reducing allocations - use forEach instead of requiring Iterator/UnmodifiableIterator/Iterables.concat overhead - optimize shortId cache hashing by just using MetricName as stepName is fixed - reduce MetricKey objects - avoid MetricsUpdate construction in getMonitoringData by directly processing maps new 241e40f2e9e Merge pull request #25930: Optimize counters by reducing allocations The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runners/core/metrics/MetricsContainerImpl.java | 129 ++--- .../core/metrics/MetricsContainerStepMap.java | 57 + .../beam/runners/core/metrics/MetricsMap.java | 15 +++ 3 files changed, 102 insertions(+), 99 deletions(-)
[beam] 01/01: Merge pull request #25930: Optimize counters by reducing allocations
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 241e40f2e9e53470f675e33eb6511e67d506e1e8 Merge: 5a9ab685d9b b6d8ac3d3a3 Author: Reuven Lax AuthorDate: Thu Apr 13 17:48:17 2023 -0700 Merge pull request #25930: Optimize counters by reducing allocations .../runners/core/metrics/MetricsContainerImpl.java | 129 ++--- .../core/metrics/MetricsContainerStepMap.java | 57 + .../beam/runners/core/metrics/MetricsMap.java | 15 +++ 3 files changed, 102 insertions(+), 99 deletions(-)
[beam] branch master updated: Merge pull request #25723: #25722 Add option to propagate successful storage-api writes
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ca0787642a6 Merge pull request #25723: #25722 Add option to propagate successful storage-api writes ca0787642a6 is described below commit ca0787642a6b3804a742326147281c99ae8d08d2 Author: Reuven Lax AuthorDate: Tue Mar 21 15:57:38 2023 -0700 Merge pull request #25723: #25722 Add option to propagate successful storage-api writes --- .../beam/sdk/transforms/GroupIntoBatches.java | 64 --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 + .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 26 - .../sdk/io/gcp/bigquery/SplittingIterable.java | 46 ++-- .../io/gcp/bigquery/StorageApiConvertMessages.java | 5 +- .../StorageApiDynamicDestinationsBeamRow.java | 4 +- ...StorageApiDynamicDestinationsGenericRecord.java | 4 +- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 78 +++-- .../io/gcp/bigquery/StorageApiWritePayload.java| 26 - .../StorageApiWriteRecordsInconsistent.java| 18 ++- .../bigquery/StorageApiWriteUnshardedRecords.java | 123 + .../bigquery/StorageApiWritesShardedRecords.java | 73 +--- .../sdk/io/gcp/bigquery/StreamingWriteTables.java | 2 + .../beam/sdk/io/gcp/bigquery/WriteResult.java | 40 ++- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 6 + 15 files changed, 434 insertions(+), 83 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index 78ce549c3b0..311a3dac6ca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -329,7 +329,7 @@ public class GroupIntoBatches @Override public PCollection>> expand(PCollection> input) { - +Duration allowedLateness = input.getWindowingStrategy().getAllowedLateness(); checkArgument( input.getCoder() instanceof KvCoder, "coder specified in the input PCollection is not a KvCoder"); @@ -344,6 +344,7 @@ public class GroupIntoBatches params.getBatchSizeBytes(), weigher, params.getMaxBufferingDuration(), +allowedLateness, valueCoder))); } @@ -357,12 +358,20 @@ public class GroupIntoBatches @Nullable private final SerializableFunction weigher; private final Duration maxBufferingDuration; +private final Duration allowedLateness; + // The following timer is no longer set. We maintain the spec for update compatibility. private static final String END_OF_WINDOW_ID = "endOFWindow"; @TimerId(END_OF_WINDOW_ID) private final TimerSpec windowTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); +// This timer manages the watermark hold if there is no buffering timer. +private static final String TIMER_HOLD_ID = "watermarkHold"; + +@TimerId(TIMER_HOLD_ID) +private final TimerSpec holdTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + // This timer expires when it's time to batch and output the buffered data. private static final String END_OF_BUFFERING_ID = "endOfBuffering"; @@ -410,11 +419,13 @@ public class GroupIntoBatches long batchSizeBytes, @Nullable SerializableFunction weigher, Duration maxBufferingDuration, +Duration allowedLateness, Coder inputValueCoder) { this.batchSize = batchSize; this.batchSizeBytes = batchSizeBytes; this.weigher = weigher; this.maxBufferingDuration = maxBufferingDuration; + this.allowedLateness = allowedLateness; this.batchSpec = StateSpecs.bag(inputValueCoder); Combine.BinaryCombineLongFn sumCombineFn = @@ -452,9 +463,18 @@ public class GroupIntoBatches this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5); } +@Override +public Duration getAllowedTimestampSkew() { + // This is required since flush is sometimes called from processElement. This is safe because + // a watermark hold + // will always be set using timer.withOutputTimestamp. + return Duration.millis(Long.MAX_VALUE); +} + @ProcessElement public void processElement( @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer, +@TimerId(TIMER_HOLD_ID) Timer holdTimer, @StateId(BATCH_ID) BagState batch, @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState storedBatchSize, @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState storedBatchSize
[beam] branch master updated (04c2de61e5d -> 4189320e49e)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 04c2de61e5d Fix OutputSampler's coder. (#25805) add dafc0e0 [BEAM-14307] Re-Fix Slow Side input pattern bug in sample add 4189320e49e Merge pull request #25829: [BEAM-14307] Re-Fix Slow Side input pattern bug in sample No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/beam/examples/snippets/Snippets.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[beam] branch master updated: Merge pull request #25804: Reuse client when constructing StreamWriter
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 227a846dc2d Merge pull request #25804: Reuse client when constructing StreamWriter 227a846dc2d is described below commit 227a846dc2dd0b78129739d0501d3dae31b17484 Author: Reuven Lax AuthorDate: Sat Mar 11 09:55:13 2023 -0800 Merge pull request #25804: Reuse client when constructing StreamWriter --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 11 ++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 3687a163d76..4e2e7aec0ec 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1338,7 +1338,7 @@ class BigQueryServicesImpl implements BigQueryServices { .build(); StreamWriter streamWriter = - StreamWriter.newBuilder(streamName) + StreamWriter.newBuilder(streamName, newWriteClient) .setExecutorProvider( FixedExecutorProvider.create( options.as(ExecutorOptions.class).getScheduledExecutorService())) @@ -1514,9 +1514,18 @@ class BigQueryServicesImpl implements BigQueryServices { private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions options) { try { + TransportChannelProvider transportChannelProvider = + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .setChannelsPerCpu(2) + .build(); + return BigQueryWriteClient.create( BigQueryWriteSettings.newBuilder() .setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential()) + .setTransportChannelProvider(transportChannelProvider) .setBackgroundExecutorProvider( FixedExecutorProvider.create( options.as(ExecutorOptions.class).getScheduledExecutorService()))
[beam] branch master updated: Merge pull request #25790: update to new BOM
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new fb040ff9c10 Merge pull request #25790: update to new BOM fb040ff9c10 is described below commit fb040ff9c1059f97a29096597b42e864d8c87a82 Author: Reuven Lax AuthorDate: Sat Mar 11 08:53:21 2023 -0800 Merge pull request #25790: update to new BOM --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 10 +++ .../container/license_scripts/dep_urls_java.yaml | 2 +- .../io/gcp/pubsublite/internal/PubsubLiteSink.java | 3 +-- .../pubsublite/internal/SubscribeTransform.java| 6 + .../pubsublite/internal/PubsubLiteSinkTest.java| 31 -- 5 files changed, 25 insertions(+), 27 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 8c4071b70c7..7e8ef20953e 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -466,15 +466,15 @@ class BeamModulePlugin implements Plugin { def dbcp2_version = "2.8.0" def errorprone_version = "2.10.0" // Try to keep gax_version consistent with gax-grpc version in google_cloud_platform_libraries_bom -def gax_version = "2.23.0" +def gax_version = "2.23.2" def google_clients_version = "2.0.0" def google_cloud_bigdataoss_version = "2.2.6" // Try to keep google_cloud_spanner_version consistent with google_cloud_spanner_bom in google_cloud_platform_libraries_bom -def google_cloud_spanner_version = "6.36.0" +def google_cloud_spanner_version = "6.37.0" def google_code_gson_version = "2.9.1" def google_oauth_clients_version = "1.34.1" // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom -def grpc_version = "1.52.1" +def grpc_version = "1.53.0" def guava_version = "31.1-jre" def hadoop_version = "2.10.2" def hamcrest_version = "2.1" @@ -609,9 +609,9 @@ class BeamModulePlugin implements Plugin { google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version // The release notes shows the versions set by the BOM: -// https://github.com/googleapis/java-cloud-bom/releases/tag/v26.8.0 +// https://github.com/googleapis/java-cloud-bom/releases/tag/v26.10.0 // Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml -google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.8.0", +google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.10.0", google_cloud_spanner: "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests", google_code_gson: "com.google.code.gson:gson:$google_code_gson_version", diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml b/sdks/java/container/license_scripts/dep_urls_java.yaml index 2a46fd84477..acf8a6bef23 100644 --- a/sdks/java/container/license_scripts/dep_urls_java.yaml +++ b/sdks/java/container/license_scripts/dep_urls_java.yaml @@ -42,7 +42,7 @@ jaxen: '1.1.6': type: "3-Clause BSD" libraries-bom: - '26.8.0': + '26.10.0': license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE"; type: "Apache License 2.0" paranamer: diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java index f22a3204021..dbf312ae5a4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java @@ -22,7 +22,6 @@ import static java.util.concurrent.TimeUnit.MINUTES; import com.goo
[beam] branch master updated (4bbd7b89944 -> e4ddb861756)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 4bbd7b89944 Bump golang.org/x/net from 0.7.0 to 0.8.0 in /sdks (#25729) add be4cbba4b1d add logging add 237c088cf9a add logging add e4ddb861756 Merge pull request #25744: Add more logging to Storage API writes No new revisions were added by this update. Summary of changes: .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 4 ++-- .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 8 +++- .../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java| 10 +- 3 files changed, 18 insertions(+), 4 deletions(-)
[beam] 01/01: Merge pull request #25639: Fix paginated windmill sorted list state
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 62e9a1239bb6f9ef2f1469d0c1af192697f8b10c Merge: 170c4184ce1 1087abeb734 Author: Reuven Lax AuthorDate: Mon Feb 27 14:39:32 2023 -0800 Merge pull request #25639: Fix paginated windmill sorted list state .../dataflow/worker/WindmillStateReader.java | 15 -- .../dataflow/worker/WindmillStateReaderTest.java | 58 ++ 2 files changed, 58 insertions(+), 15 deletions(-)
[beam] branch master updated (170c4184ce1 -> 62e9a1239bb)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 170c4184ce1 Merge pull request #25633 from andreykot/patch-1 add 1087abeb734 Fix paginated windmill sorted list state. new 62e9a1239bb Merge pull request #25639: Fix paginated windmill sorted list state The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../dataflow/worker/WindmillStateReader.java | 15 -- .../dataflow/worker/WindmillStateReaderTest.java | 58 ++ 2 files changed, 58 insertions(+), 15 deletions(-)
[beam] branch master updated (fabd1f6d7a6 -> a3d82e64f31)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from fabd1f6d7a6 Add ReadFrom/WriteTo Csv/Json as top-level transforms to the Python SDK. (#25614) add a3d82e64f31 Merge pull request #25642: #25626 Revert to old constructor. No new revisions were added by this update. Summary of changes: .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-)
[beam] branch master updated: Merge pull request #25627: #25626 Use the correct constructor when creating StreamWriter
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 124e1d6a5f1 Merge pull request #25627: #25626 Use the correct constructor when creating StreamWriter 124e1d6a5f1 is described below commit 124e1d6a5f14dd7b194a98d43dbc771cbad05065 Author: Reuven Lax AuthorDate: Fri Feb 24 14:52:47 2023 -0800 Merge pull request #25627: #25626 Use the correct constructor when creating StreamWriter --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index bc798c47573..040346bb910 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1338,7 +1338,9 @@ class BigQueryServicesImpl implements BigQueryServices { .build(); StreamWriter streamWriter = - StreamWriter.newBuilder(streamName) + StreamWriter.newBuilder( + streamName, + org.apache.beam.sdk.util.Preconditions.checkStateNotNull(newWriteClient)) .setWriterSchema(protoSchema) .setChannelProvider(transportChannelProvider) .setEnableConnectionPool(useConnectionPool)
[beam] branch master updated (957301519bb -> bc2895c99a2)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 957301519bb Fix Debezium expansion service fail to start (#25243) add bc2895c99a2 Merge pull request #25094: Externalizing the StreamWriter parameters for StorageWrites No new revisions were added by this update. Summary of changes: .../apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java| 13 + .../beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 10 ++ 2 files changed, 23 insertions(+)
[beam] branch release-2.45.0 updated (7d07ffe68ba -> ade83b355ee)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch release-2.45.0 in repository https://gitbox.apache.org/repos/asf/beam.git from 7d07ffe68ba update cloudpickle version (#25143) add ab5cebf5525 Handle schema updates in Storage API writes. add 7f6c93ea039 Fix inconsistent cache bug new ade83b355ee Merge pull request #25205: Cherry pick/fix bigquery inconsistent cache The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 120 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +- .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 8 + .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 11 ++ .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 29 --- .../sdk/io/gcp/bigquery/SplittingIterable.java | 61 +- .../bigquery/StorageApiDynamicDestinations.java| 2 + .../StorageApiDynamicDestinationsBeamRow.java | 8 +- .../StorageApiDynamicDestinationsTableRow.java | 24 ++- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 20 +- .../io/gcp/bigquery/StorageApiWritePayload.java| 25 ++- .../StorageApiWriteRecordsInconsistent.java| 12 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 146 +++--- .../bigquery/StorageApiWritesShardedRecords.java | 173 +++-- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 110 +-- .../sdk/io/gcp/testing/FakeDatasetService.java | 63 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 216 + .../bigquery/TableRowToStorageApiProtoTest.java| 95 - 18 files changed, 968 insertions(+), 173 deletions(-)
[beam] 01/01: Merge pull request #25205: Cherry pick/fix bigquery inconsistent cache
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch release-2.45.0 in repository https://gitbox.apache.org/repos/asf/beam.git commit ade83b355eed496c7e23606ab63e86eddd1d3988 Merge: 7d07ffe68ba 7f6c93ea039 Author: Reuven Lax AuthorDate: Fri Jan 27 08:41:58 2023 -0800 Merge pull request #25205: Cherry pick/fix bigquery inconsistent cache .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 120 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +- .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 8 + .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 11 ++ .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 29 --- .../sdk/io/gcp/bigquery/SplittingIterable.java | 61 +- .../bigquery/StorageApiDynamicDestinations.java| 2 + .../StorageApiDynamicDestinationsBeamRow.java | 8 +- .../StorageApiDynamicDestinationsTableRow.java | 24 ++- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 20 +- .../io/gcp/bigquery/StorageApiWritePayload.java| 25 ++- .../StorageApiWriteRecordsInconsistent.java| 12 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 146 +++--- .../bigquery/StorageApiWritesShardedRecords.java | 173 +++-- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 110 +-- .../sdk/io/gcp/testing/FakeDatasetService.java | 63 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 216 + .../bigquery/TableRowToStorageApiProtoTest.java| 95 - 18 files changed, 968 insertions(+), 173 deletions(-)
[beam] 01/01: Merge pull request #25194: update GCP cloud libraries BOM to 26.5.0
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 3bf17574c6f18942d1015d5a837af23ae83c4d1a Merge: 61045b670bf 40243ffc284 Author: Reuven Lax AuthorDate: Fri Jan 27 06:00:57 2023 -0800 Merge pull request #25194: update GCP cloud libraries BOM to 26.5.0 .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 12 ++-- sdks/java/container/license_scripts/dep_urls_java.yaml | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-)
[beam] branch master updated (61045b670bf -> 3bf17574c6f)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 61045b670bf Update Dataflow container versions (#25192) add 40243ffc284 update GCP cloud libraries BOM to 26.5.0 new 3bf17574c6f Merge pull request #25194: update GCP cloud libraries BOM to 26.5.0 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 12 ++-- sdks/java/container/license_scripts/dep_urls_java.yaml | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-)
[beam] branch master updated (91efbd3a933 -> 1ecb4ef6d6f)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 91efbd3a933 Fix intruction -> instruction typo add e8484a790e3 Fix inconsistent cache bug add 1ecb4ef6d6f Merge pull request #25159: Fix inconsistent cache bug No new revisions were added by this update. Summary of changes: .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 9 ++- .../bigquery/StorageApiWritesShardedRecords.java | 82 -- 2 files changed, 53 insertions(+), 38 deletions(-)
[beam] 01/01: Merge pull request #24145: Handle updates to table schema when using Storage API writes.
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit f5020e7ac2bdd3363619aafbf257f2ebf8e3fe2b Merge: 428ec97e30c 7ad44c84585 Author: Reuven Lax AuthorDate: Thu Jan 19 23:53:33 2023 -0800 Merge pull request #24145: Handle updates to table schema when using Storage API writes. .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 115 +-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +- .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 8 + .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 11 ++ .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 29 --- .../sdk/io/gcp/bigquery/SplittingIterable.java | 61 +- .../bigquery/StorageApiDynamicDestinations.java| 2 + .../StorageApiDynamicDestinationsBeamRow.java | 8 +- .../StorageApiDynamicDestinationsTableRow.java | 24 ++- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 20 +- .../io/gcp/bigquery/StorageApiWritePayload.java| 25 ++- .../StorageApiWriteRecordsInconsistent.java| 12 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 146 +++--- .../bigquery/StorageApiWritesShardedRecords.java | 173 +++-- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 110 +-- .../sdk/io/gcp/testing/FakeDatasetService.java | 63 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 216 + .../bigquery/TableRowToStorageApiProtoTest.java| 95 - 18 files changed, 958 insertions(+), 178 deletions(-)
[beam] branch master updated (428ec97e30c -> f5020e7ac2b)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 428ec97e30c improve error message for mismatched pipelines (#24834) add 7ad44c84585 Handle schema updates in Storage API writes. new f5020e7ac2b Merge pull request #24145: Handle updates to table schema when using Storage API writes. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 115 +-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +- .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 8 + .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 11 ++ .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 29 --- .../sdk/io/gcp/bigquery/SplittingIterable.java | 61 +- .../bigquery/StorageApiDynamicDestinations.java| 2 + .../StorageApiDynamicDestinationsBeamRow.java | 8 +- .../StorageApiDynamicDestinationsTableRow.java | 24 ++- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 20 +- .../io/gcp/bigquery/StorageApiWritePayload.java| 25 ++- .../StorageApiWriteRecordsInconsistent.java| 12 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 146 +++--- .../bigquery/StorageApiWritesShardedRecords.java | 173 +++-- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 110 +-- .../sdk/io/gcp/testing/FakeDatasetService.java | 63 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 216 + .../bigquery/TableRowToStorageApiProtoTest.java| 95 - 18 files changed, 958 insertions(+), 178 deletions(-)
[beam] branch master updated (1c4e4241c2a -> 876ce5bcafc)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 1c4e4241c2a [Playground][Frontend] Run timer fix; CodeRunner and OutputType extraction. (#24871) add 876ce5bcafc Merge pull request #25073: Improves StorageWrite API error logging No new revisions were added by this update. Summary of changes: .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-)
[beam] branch master updated (f40057b7fb6 -> 482c3107674)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from f40057b7fb6 Bump json5 from 1.0.1 to 1.0.2 in /sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel (#24895) add e22afb090fc Properly synchronize pins add 6847255514d typo new 482c3107674 Merge pull request #25047: Properly synchronize pins The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-)
[beam] 01/01: Merge pull request #25047: Properly synchronize pins
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 482c3107674749b8362130ac9df331c5f9062f99 Merge: f40057b7fb6 6847255514d Author: Reuven Lax AuthorDate: Tue Jan 17 17:54:51 2023 -0800 Merge pull request #25047: Properly synchronize pins .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-)
[beam] 01/01: Merge pull request #25044: Add public qualifier to FillGaps methods
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 5a2a3a4dfb26b7e8a7f2b132e1913fb4504216b2 Merge: 3dcb08510fd 120558cb9f7 Author: Reuven Lax AuthorDate: Tue Jan 17 12:08:51 2023 -0800 Merge pull request #25044: Add public qualifier to FillGaps methods .../java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java | 8 1 file changed, 4 insertions(+), 4 deletions(-)
[beam] branch master updated (3dcb08510fd -> 5a2a3a4dfb2)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 3dcb08510fd Merge pull request #24951: Update website metadata to 2.44.0 and blog post add 120558cb9f7 add public qualifier new 5a2a3a4dfb2 Merge pull request #25044: Add public qualifier to FillGaps methods The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java | 8 1 file changed, 4 insertions(+), 4 deletions(-)
[beam] branch master updated (92223e3ea2f -> c0e689331c2)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 92223e3ea2f [WebSite] Add new Go quickstart (#24885) add 5a57d3f75ce fix pinning bug in storage-api writes new c0e689331c2 Merge pull request #24968: Fix pinning bug in storage-api writes The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 19 ++- .../gcp/bigquery/StorageApiWritesShardedRecords.java | 13 +++-- 2 files changed, 25 insertions(+), 7 deletions(-)
[beam] 01/01: Merge pull request #24968: Fix pinning bug in storage-api writes
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit c0e689331c2a6573ecf267b9bef133a85ea8a36c Merge: 92223e3ea2f 5a57d3f75ce Author: Reuven Lax AuthorDate: Tue Jan 10 16:00:21 2023 -0800 Merge pull request #24968: Fix pinning bug in storage-api writes .../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 19 ++- .../gcp/bigquery/StorageApiWritesShardedRecords.java | 13 +++-- 2 files changed, 25 insertions(+), 7 deletions(-)
[beam] 01/01: Merge pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit c162f4c13de69d4c62eb017169b104b57d4e212a Merge: f9d5de34ae1 de0cf60eb28 Author: Reuven Lax AuthorDate: Fri Jan 6 22:25:04 2023 -0800 Merge pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default .../options/StreamingDataflowWorkerOptions.java| 9 +++ .../worker/windmill/DirectStreamObserver.java | 77 ++ .../worker/windmill/GrpcWindmillServer.java| 3 +- .../worker/windmill/StreamObserverFactory.java | 12 ++-- 4 files changed, 68 insertions(+), 33 deletions(-)
[beam] branch master updated (f9d5de34ae1 -> c162f4c13de)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from f9d5de34ae1 update Java version from 11 to 17 (#24839) add de0cf60eb28 Modify windmill DirectStreamObserver to call isReady only every 10 messages by default. This provides more output buffering which ensures that output is not throttled on synchronization when message sizes exceed 32KB grpc isready limit. new c162f4c13de Merge pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../options/StreamingDataflowWorkerOptions.java| 9 +++ .../worker/windmill/DirectStreamObserver.java | 77 ++ .../worker/windmill/GrpcWindmillServer.java| 3 +- .../worker/windmill/StreamObserverFactory.java | 12 ++-- 4 files changed, 68 insertions(+), 33 deletions(-)
[beam] branch master updated (6ad75405c97 -> e54871d76f0)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 6ad75405c97 TFT Criteo benchmarks (#24382) add db1f9d23b5f Add thread prefix for StreamingDataflowWorker work executor new e54871d76f0 Merge pull request #24646: Add thread prefix for StreamingDataflowWorker work executor The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 +--- 1 file changed, 5 insertions(+), 7 deletions(-)
[beam] 01/01: Merge pull request #24646: Add thread prefix for StreamingDataflowWorker work executor
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit e54871d76f09292f8a61a2639b10568fd901f5b5 Merge: 6ad75405c97 db1f9d23b5f Author: Reuven Lax AuthorDate: Fri Jan 6 11:39:04 2023 -0800 Merge pull request #24646: Add thread prefix for StreamingDataflowWorker work executor .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 +--- 1 file changed, 5 insertions(+), 7 deletions(-)
[beam] 01/01: Merge pull request #24546: Fix null pointer exception caused by clearing member variable
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit ed29ceb90fa384d8daf0f9604643c031539b5fc2 Merge: 8ec0568d12d 40aca90f8d0 Author: Reuven Lax AuthorDate: Tue Dec 6 11:09:16 2022 -0800 Merge pull request #24546: Fix null pointer exception caused by clearing member variable .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 12 ++--- .../bigquery/StorageApiWritesShardedRecords.java | 62 +- 2 files changed, 41 insertions(+), 33 deletions(-)
[beam] branch master updated (8ec0568d12d -> ed29ceb90fa)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 8ec0568d12d Disallow using the JRH with Python streaming pipelines (#24513) add 40aca90f8d0 fix null pointer exception caused by clearing member variable new ed29ceb90fa Merge pull request #24546: Fix null pointer exception caused by clearing member variable The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 12 ++--- .../bigquery/StorageApiWritesShardedRecords.java | 62 +- 2 files changed, 41 insertions(+), 33 deletions(-)
[beam] branch master updated (32673a9d553 -> d979d88741a)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 32673a9d553 Bump cloud.google.com/go/profiler from 0.3.0 to 0.3.1 in /sdks (#24498) add 1743dc59073 fix exception new d979d88741a Merge pull request #24512:Fix exception caused by null StreamAppendClient The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../bigquery/StorageApiWriteUnshardedRecords.java | 25 +- 1 file changed, 10 insertions(+), 15 deletions(-)
[beam] 01/01: Merge pull request #24512:Fix exception caused by null StreamAppendClient
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit d979d88741abf4a4a6b79b2240460bd2969f465a Merge: 32673a9d553 1743dc59073 Author: Reuven Lax AuthorDate: Fri Dec 2 21:16:09 2022 -0800 Merge pull request #24512:Fix exception caused by null StreamAppendClient .../bigquery/StorageApiWriteUnshardedRecords.java | 25 +- 1 file changed, 10 insertions(+), 15 deletions(-)
[beam] branch master updated: Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5bb13fa35b9 Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395 5bb13fa35b9 is described below commit 5bb13fa35b9bc36764895c57f23d3890f0f1b567 Author: Reuven Lax AuthorDate: Mon Nov 28 20:00:52 2022 -0800 Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395 --- .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 68 + .../io/gcp/bigquery/BeamRowToStorageApiProto.java | 144 - .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 +- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 6 - .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 2 +- .../sdk/io/gcp/bigquery/SplittingIterable.java | 31 +- .../bigquery/StorageApiDynamicDestinations.java| 21 +- .../StorageApiDynamicDestinationsBeamRow.java | 50 ++- .../StorageApiDynamicDestinationsTableRow.java | 201 +--- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 12 +- .../io/gcp/bigquery/StorageApiWritePayload.java| 5 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 155 +- .../bigquery/StorageApiWritesShardedRecords.java | 214 +++-- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 337 - .../gcp/bigquery/BeamRowToStorageApiProtoTest.java | 28 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 273 - .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 26 +- .../bigquery/TableRowToStorageApiProtoTest.java| 82 - 18 files changed, 849 insertions(+), 818 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java new file mode 100644 index 000..1680ef48e4d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.protobuf.Descriptors; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** + * Container class used by {@link StorageApiWritesShardedRecords} and {@link + * StorageApiWritesShardedRecords} to enapsulate a destination {@link TableSchema} along with a + * {@link BigQueryServices.StreamAppendClient} and other objects needed to write records. + */ +class AppendClientInfo { + @Nullable BigQueryServices.StreamAppendClient streamAppendClient; + @Nullable TableSchema tableSchema; + Consumer closeAppendClient; + Descriptors.Descriptor descriptor; + + public AppendClientInfo( + TableSchema tableSchema, Consumer closeAppendClient) + throws Exception { +this.tableSchema = tableSchema; +this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true); +this.closeAppendClient = closeAppendClient; + } + + public AppendClientInfo clearAppendClient() { +if (streamAppendClient != null) { + closeAppendClient.accept(streamAppendClient); + this.streamAppendClient = null; +} +return this; + } + + public AppendClientInfo createAppendClient( + BigQueryServices.DatasetService datasetService, + Supplier getStreamName, + boolean useConnectionPool) + throws Exception { +if (streamAppendClient == null) { + this.streamAppendClient = + datasetService.getStreamAppendClient(getStreamName.get(), descriptor, useConnectionPool); +} +return this; + } + + public void close() { +clearAppendClient(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/googl
[beam] branch master updated: Merge pull request #24320: update bom to the latest one
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ec6d163cf36 Merge pull request #24320: update bom to the latest one ec6d163cf36 is described below commit ec6d163cf36a5d9951764b951e8fccd7b298aa2d Author: Reuven Lax AuthorDate: Mon Nov 28 18:42:49 2022 -0800 Merge pull request #24320: update bom to the latest one --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 12 ++-- sdks/java/container/license_scripts/dep_urls_java.yaml | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index abb32698ddb..107fbefd164 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -466,15 +466,15 @@ class BeamModulePlugin implements Plugin { def dbcp2_version = "2.8.0" def errorprone_version = "2.10.0" // Try to keep gax_version consistent with gax-grpc version in google_cloud_platform_libraries_bom -def gax_version = "2.19.2" +def gax_version = "2.19.5" def google_clients_version = "2.0.0" def google_cloud_bigdataoss_version = "2.2.6" // Try to keep google_cloud_spanner_version consistent with google_cloud_spanner_bom in google_cloud_platform_libraries_bom -def google_cloud_spanner_version = "6.31.2" +def google_cloud_spanner_version = "6.33.0" def google_code_gson_version = "2.9.1" def google_oauth_clients_version = "1.34.1" // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom -def grpc_version = "1.49.2" +def grpc_version = "1.50.2" def guava_version = "31.1-jre" def hadoop_version = "2.10.2" def hamcrest_version = "2.1" @@ -490,7 +490,7 @@ class BeamModulePlugin implements Plugin { def postgres_version = "42.2.16" def powermock_version = "2.0.9" // Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom -def protobuf_version = "3.21.7" +def protobuf_version = "3.21.9" def quickcheck_version = "1.0" def sbe_tool_version = "1.25.1" def singlestore_jdbc_version = "1.1.4" @@ -607,9 +607,9 @@ class BeamModulePlugin implements Plugin { google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version // The GCP Libraries BOM dashboard shows the versions set by the BOM: -// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.1.3/artifact_details.html +// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.1.5/artifact_details.html // Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml -google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.1.3", +google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.1.5", google_cloud_spanner: "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests", google_code_gson: "com.google.code.gson:gson:$google_code_gson_version", diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml b/sdks/java/container/license_scripts/dep_urls_java.yaml index 54ee423..0fcf580320a 100644 --- a/sdks/java/container/license_scripts/dep_urls_java.yaml +++ b/sdks/java/container/license_scripts/dep_urls_java.yaml @@ -42,7 +42,7 @@ jaxen: '1.1.6': type: "3-Clause BSD" libraries-bom: - '26.1.3': + '26.1.5': license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE"; type: "Apache License 2.0" paranamer:
[beam] 01/01: Merge pull request #24215: Fix OrderedListState for Dataflow Streaming pipelines on SE.
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit a04682a71a957ec35f876b345caa5dd37cd52ab5 Merge: 36b79851359 10a9dc22318 Author: Reuven Lax AuthorDate: Sat Nov 19 21:59:24 2022 -0800 Merge pull request #24215: Fix OrderedListState for Dataflow Streaming pipelines on SE. .../beam/runners/dataflow/worker/WindmillStateInternals.java | 11 --- .../runners/dataflow/worker/WindmillStateInternalsTest.java | 10 +- 2 files changed, 13 insertions(+), 8 deletions(-)
[beam] branch master updated (36b79851359 -> a04682a71a9)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 36b79851359 Merge pull request #24270: Update Java Multi-lang quickstart after the Beam 2.43.0 release add 10a9dc22318 Fix OrderedListState for Dataflow Streaming pipelines on SE. The id field is unsigned so setting LONG.MIN resulted in a larger id than supported. new a04682a71a9 Merge pull request #24215: Fix OrderedListState for Dataflow Streaming pipelines on SE. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/runners/dataflow/worker/WindmillStateInternals.java | 11 --- .../runners/dataflow/worker/WindmillStateInternalsTest.java | 10 +- 2 files changed, 13 insertions(+), 8 deletions(-)
[beam] branch master updated (9c83de646ab -> 48c70cc3074)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 9c83de646ab Add more tests for S3 filesystem (#24138) add 48c70cc3074 Merge pull request #2: Track time on Cloud Dataflow streaming data reads and export via heartbeats No new revisions were added by this update. Summary of changes: .../dataflow/worker/StreamingDataflowWorker.java | 237 ++-- .../dataflow/worker/WindmillStateReader.java | 25 +- .../dataflow/worker/FakeWindmillServer.java| 122 ++- .../worker/StreamingDataflowWorkerTest.java| 1147 ++-- 4 files changed, 1100 insertions(+), 431 deletions(-)
[beam] branch master updated: Merge pull request #23556: Forward failed storage-api row inserts to the failedStorageApiInserts PCollection addresses #23628
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 8df6f67c65b Merge pull request #23556: Forward failed storage-api row inserts to the failedStorageApiInserts PCollection addresses #23628 8df6f67c65b is described below commit 8df6f67c65b4888c45c31e088fb463972c4ec76b Author: Reuven Lax AuthorDate: Sat Oct 22 10:37:18 2022 -0700 Merge pull request #23556: Forward failed storage-api row inserts to the failedStorageApiInserts PCollection addresses #23628 --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 6 + .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 100 +++--- .../StorageApiWriteRecordsInconsistent.java| 50 +-- .../bigquery/StorageApiWriteUnshardedRecords.java | 277 + .../bigquery/StorageApiWritesShardedRecords.java | 342 ++--- .../beam/sdk/io/gcp/testing/BigqueryClient.java| 4 +- .../sdk/io/gcp/testing/FakeDatasetService.java | 32 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 21 +- .../io/gcp/bigquery/BigQueryNestedRecordsIT.java | 5 +- .../gcp/bigquery/StorageApiSinkFailedRowsIT.java | 266 .../gcp/bigquery/TableRowToStorageApiProtoIT.java | 8 +- 12 files changed, 865 insertions(+), 248 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 1f1fe4589ff..7f6ac755d6b 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -603,7 +603,7 @@ class BeamModulePlugin implements Plugin { google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version // The GCP Libraries BOM dashboard shows the versions set by the BOM: -// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/25.2.0/artifact_details.html +// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.1.3/artifact_details.html // Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.1.3", google_cloud_spanner: "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 953d1237d9c..53cb2713641 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -150,4 +150,10 @@ public interface BigQueryOptions Integer getStorageApiAppendThresholdRecordCount(); void setStorageApiAppendThresholdRecordCount(Integer value); + + @Description("Maximum request size allowed by the storage write API. ") + @Default.Long(10 * 1000 * 1000) + Long getStorageWriteApiMaxRequestSize(); + + void setStorageWriteApiMaxRequestSize(Long value); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index e48b9a19690..20ab251c9c0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -32,6 +33,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.values.KV; impo
[beam] branch master updated: Merge pull request #23795: Revert 23234: issue #23794
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new d38f577624e Merge pull request #23795: Revert 23234: issue #23794 d38f577624e is described below commit d38f577624eaf8b5f4e31fec43ca8cfba132a132 Author: Reuven Lax AuthorDate: Sat Oct 22 00:09:20 2022 -0700 Merge pull request #23795: Revert 23234: issue #23794 --- .../apache/beam/sdk/options/ExecutorOptions.java | 59 ++ .../sdk/extensions/gcp/options/GcsOptions.java | 29 +-- 2 files changed, 71 insertions(+), 17 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java new file mode 100644 index 000..2037d217422 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; + +/** + * Options for configuring the {@link ScheduledExecutorService} used throughout the Java runtime. + */ +public interface ExecutorOptions extends PipelineOptions { + + /** + * The {@link ScheduledExecutorService} instance to use to create threads, can be overridden to + * specify a {@link ScheduledExecutorService} that is compatible with the user's environment. If + * unset, the default is to create an {@link UnboundedScheduledExecutorService}. + */ + @JsonIgnore + @Description( + "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " + + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " + + "the default is to create an UnboundedScheduledExecutorService.") + @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) + @Hidden + ScheduledExecutorService getScheduledExecutorService(); + + void setScheduledExecutorService(ScheduledExecutorService value); + + /** Returns the default {@link ScheduledExecutorService} to use within the Apache Beam SDK. */ + class ScheduledExecutorServiceFactory implements DefaultValueFactory { +@Override +public ScheduledExecutorService create(PipelineOptions options) { + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ + return new UnboundedScheduledExecutorService(); +} + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 0b14b244da5..fea7be7f5c7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -29,10 +29,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.InstanceBuilder; -import or
[beam] branch master updated: Revert "Update BQIO to a single scheduled executor service reduce threads (#23234)" (#23793)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 01da3fcb3e3 Revert "Update BQIO to a single scheduled executor service reduce threads (#23234)" (#23793) 01da3fcb3e3 is described below commit 01da3fcb3e312ea0b5a62bd67b1b221074105a70 Author: Reuven Lax AuthorDate: Fri Oct 21 19:40:56 2022 -0700 Revert "Update BQIO to a single scheduled executor service reduce threads (#23234)" (#23793) This reverts commit 8e2431c0e55237af4bd00a9786e4c150e20d4e14. --- .../apache/beam/sdk/options/ExecutorOptions.java | 59 -- .../sdk/extensions/gcp/options/GcsOptions.java | 29 ++- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 27 -- 3 files changed, 17 insertions(+), 98 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java deleted file mode 100644 index 2037d217422..000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import java.util.concurrent.ScheduledExecutorService; -import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; - -/** - * Options for configuring the {@link ScheduledExecutorService} used throughout the Java runtime. - */ -public interface ExecutorOptions extends PipelineOptions { - - /** - * The {@link ScheduledExecutorService} instance to use to create threads, can be overridden to - * specify a {@link ScheduledExecutorService} that is compatible with the user's environment. If - * unset, the default is to create an {@link UnboundedScheduledExecutorService}. - */ - @JsonIgnore - @Description( - "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " - + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " - + "the default is to create an UnboundedScheduledExecutorService.") - @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) - @Hidden - ScheduledExecutorService getScheduledExecutorService(); - - void setScheduledExecutorService(ScheduledExecutorService value); - - /** Returns the default {@link ScheduledExecutorService} to use within the Apache Beam SDK. */ - class ScheduledExecutorServiceFactory implements DefaultValueFactory { -@Override -public ScheduledExecutorService create(PipelineOptions options) { - /* The SDK requires an unbounded thread pool because a step may create X writers - * each requiring their own thread to perform the writes otherwise a writer may - * block causing deadlock for the step because the writers buffer is full. - * Also, the MapTaskExecutor launches the steps in reverse order and completes - * them in forward order thus requiring enough threads so that each step's writers - * can be active. - */ - return new UnboundedScheduledExecutorService(); -} - } -} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index fea7be7f5c7..0b14b244da5 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -29,10 +29,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.
[beam] branch master updated: Merge pull request #23510: Vortex multiplexing streams
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 171d3e3d9d7 Merge pull request #23510: Vortex multiplexing streams 171d3e3d9d7 is described below commit 171d3e3d9d700aa90c498719a0a925916c1cb56e Author: Reuven Lax AuthorDate: Tue Oct 18 21:12:15 2022 -0700 Merge pull request #23510: Vortex multiplexing streams --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 6 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 5 +++ .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 4 +-- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 5 +-- .../bigquery/StorageApiWriteUnshardedRecords.java | 36 -- .../bigquery/StorageApiWritesShardedRecords.java | 4 +-- .../sdk/io/gcp/testing/FakeDatasetService.java | 3 +- 7 files changed, 43 insertions(+), 20 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index a81dd6b9055..26a14e86012 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -576,12 +576,12 @@ class BeamModulePlugin implements Plugin { google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version", google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version", google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version -google_api_services_bigquery: "com.google.apis:google-api-services-bigquery:v2-rev20220827-$google_clients_version", +google_api_services_bigquery: "com.google.apis:google-api-services-bigquery:v2-rev20220924-$google_clients_version", google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20220318-$google_clients_version", google_api_services_cloudresourcemanager: "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20220828-$google_clients_version", -google_api_services_dataflow: "com.google.apis:google-api-services-dataflow:v1b3-rev20220812-$google_clients_version", +google_api_services_dataflow: "com.google.apis:google-api-services-dataflow:v1b3-rev20220920-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20220818-$google_clients_version", -google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220829-$google_clients_version", +google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20220705-$google_clients_version", google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index cfebb754216..953d1237d9c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -109,6 +109,11 @@ public interface BigQueryOptions void setNumStorageWriteApiStreamAppendClients(Integer value); + @Default.Boolean(false) + Boolean getUseStorageApiConnectionPool(); + + void setUseStorageApiConnectionPool(Boolean value); + @Description( "If set, then BigQueryIO.Write will default to triggering the Storage Write API writes this often.") Integer getStorageWriteApiTriggeringFrequencySec(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index a0484ccf972..adf02ed31a6 100644 --- a/sdks/java/io/googl
[beam] branch master updated: Merge pull request #23547: update bom to the latest one.
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new b94cff209cc Merge pull request #23547: update bom to the latest one. b94cff209cc is described below commit b94cff209cc8d1ae61cc916ff6b0b68561dc34c8 Author: Reuven Lax AuthorDate: Sat Oct 8 08:19:56 2022 -0700 Merge pull request #23547: update bom to the latest one. --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 10 +- sdks/java/container/license_scripts/dep_urls_java.yaml | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index f27888fb441..8541cf75ae6 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -464,15 +464,15 @@ class BeamModulePlugin implements Plugin { def classgraph_version = "4.8.104" def errorprone_version = "2.10.0" // Try to keep gax_version consistent with gax-grpc version in google_cloud_platform_libraries_bom -def gax_version = "2.18.7" +def gax_version = "2.19.2" def google_clients_version = "2.0.0" def google_cloud_bigdataoss_version = "2.2.8" // Try to keep google_cloud_spanner_version consistent with google_cloud_spanner_bom in google_cloud_platform_libraries_bom -def google_cloud_spanner_version = "6.29.0" +def google_cloud_spanner_version = "6.31.2" def google_code_gson_version = "2.9.1" def google_oauth_clients_version = "1.34.1" // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom -def grpc_version = "1.48.0" +def grpc_version = "1.49.2" def guava_version = "31.1-jre" def hadoop_version = "2.10.2" def hamcrest_version = "2.1" @@ -488,7 +488,7 @@ class BeamModulePlugin implements Plugin { def postgres_version = "42.2.16" def powermock_version = "2.0.9" // Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom -def protobuf_version = "3.21.4" +def protobuf_version = "3.21.7" def quickcheck_version = "1.0" def sbe_tool_version = "1.25.1" def slf4j_version = "1.7.30" @@ -600,7 +600,7 @@ class BeamModulePlugin implements Plugin { // The GCP Libraries BOM dashboard shows the versions set by the BOM: // https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/25.2.0/artifact_details.html // Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml -google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.1.1", +google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.1.3", google_cloud_spanner: "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests", google_code_gson: "com.google.code.gson:gson:$google_code_gson_version", diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml b/sdks/java/container/license_scripts/dep_urls_java.yaml index 0765594e154..54ee423 100644 --- a/sdks/java/container/license_scripts/dep_urls_java.yaml +++ b/sdks/java/container/license_scripts/dep_urls_java.yaml @@ -42,7 +42,7 @@ jaxen: '1.1.6': type: "3-Clause BSD" libraries-bom: - '26.1.1': + '26.1.3': license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE"; type: "Apache License 2.0" paranamer:
[beam] branch master updated: Merge pull request #23505: opt in for schema update. addresses #23504
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 7980cb9a35c Merge pull request #23505: opt in for schema update. addresses #23504 7980cb9a35c is described below commit 7980cb9a35cdb37ffd57a43ec9a1bfa5e204f5d7 Author: Reuven Lax AuthorDate: Wed Oct 5 14:03:12 2022 -0700 Merge pull request #23505: opt in for schema update. addresses #23504 --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 25 +- .../StorageApiDynamicDestinationsTableRow.java | 7 -- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 1 + 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3a6280c3038..024dcb053b5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1804,6 +1804,7 @@ public class BigQueryIO { .setUseBeamSchema(false) .setAutoSharding(false) .setPropagateSuccessful(true) +.setAutoSchemaUpdate(false) .setDeterministicRecordIdFn(null) .build(); } @@ -1947,6 +1948,8 @@ public class BigQueryIO { abstract Boolean getPropagateSuccessful(); +abstract Boolean getAutoSchemaUpdate(); + @Experimental abstract @Nullable SerializableFunction getDeterministicRecordIdFn(); @@ -2038,6 +2041,8 @@ public class BigQueryIO { abstract Builder setPropagateSuccessful(Boolean propagateSuccessful); + abstract Builder setAutoSchemaUpdate(Boolean autoSchemaUpdate); + @Experimental abstract Builder setDeterministicRecordIdFn( SerializableFunction toUniqueIdFunction); @@ -2555,6 +2560,17 @@ public class BigQueryIO { } /** + * If true, enables automatically detecting BigQuery table schema updates. If a message with + * unknown fields is processed, the BigQuery table is tabled to see if the schema has been + * updated. This is intended for scenarios in which unknown fields are rare, otherwise calls to + * BigQuery will throttle the pipeline. only supported when using one of the STORAGE_API insert + * methods. + */ +public Write withAutoSchemaUpdate(boolean autoSchemaUpdate) { + return toBuilder().setAutoSchemaUpdate(autoSchemaUpdate).build(); +} + +/* * Provides a function which can serve as a source of deterministic unique ids for each record * to be written, replacing the unique ids generated with the default scheme. When used with * {@link Method#STREAMING_INSERTS} This also elides the re-shuffle from the BigQueryIO Write by @@ -2750,6 +2766,12 @@ public class BigQueryIO { method); } + if (method != Method.STORAGE_WRITE_API && method != Method.STORAGE_API_AT_LEAST_ONCE) { +checkArgument( +!getAutoSchemaUpdate(), +"withAutoSchemaUpdate only supported when using storage-api writes."); + } + if (method != Write.Method.FILE_LOADS) { // we only support writing avro for FILE_LOADS checkArgument( @@ -3045,7 +3067,8 @@ public class BigQueryIO { tableRowWriterFactory.getToRowFn(), getCreateDisposition(), getIgnoreUnknownValues(), - bqOptions.getSchemaUpdateRetries()); + bqOptions.getSchemaUpdateRetries(), + getAutoSchemaUpdate()); } StorageApiLoads storageApiLoads = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java index b33b220de5a..b025d01f02b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java @@ -40,6 +40,7 @@ public class StorageApiDynamicDestinationsTableRow formatFunction, CreateDisposition createDisposition, boolean ignoreUnknownValues, - int schemaUpdateRetries) { + int schemaUpdateRetries, + boolean autoSchemaUpdates) { super(inner); this.formatFunction = formatFunction; this.createDisposition = createDisposition; this.ignoreUnknownValues = ignoreUnknownValues; this.schemaUpdateRetries = s
[beam] branch master updated: Merge pull request #22347: [22188]Set allowed timestamp skew
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new afb7dc93221 Merge pull request #22347: [22188]Set allowed timestamp skew afb7dc93221 is described below commit afb7dc93221c1988af03fb01b2db267240a9dd4e Author: Reuven Lax AuthorDate: Thu Aug 4 10:32:20 2022 -0700 Merge pull request #22347: [22188]Set allowed timestamp skew --- .../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java| 10 ++ 1 file changed, 10 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 1ccd527497b..afecc966955 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -553,6 +553,11 @@ public class StorageApiWritesShardedRecords> o, BoundedWindow window) { // Stream is idle - clear it. + // Note: this is best effort. We are explicitly emiting a timestamp that is before + // the default output timestamp, which means that in some cases (usually when draining + // a pipeline) this finalize element will be dropped as late. This is usually ok as + // BigQuery will eventually garbage collect the stream. We attempt to finalize idle streams + // merely to remove the pressure of large numbers of orphaned streams from BigQuery. finalizeStream(streamName, streamOffset, o, window.maxTimestamp()); streamsIdle.inc(); } @@ -567,5 +572,10 @@ public class StorageApiWritesShardedRecords
[beam] 01/01: Merge pull request #22461: Fixes #22438. Ensure that WindmillStateReader completes all batched read futures on exceptions
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit c581b4dc6bd33360af890c9342e5239b8fcc7561 Merge: 505b5b723b0 9e7cc7cfd76 Author: Reuven Lax AuthorDate: Wed Jul 27 16:09:35 2022 -0700 Merge pull request #22461: Fixes #22438. Ensure that WindmillStateReader completes all batched read futures on exceptions .../dataflow/worker/WindmillStateReader.java | 109 -- .../dataflow/worker/WindmillStateReaderTest.java | 161 - 2 files changed, 222 insertions(+), 48 deletions(-)
[beam] branch master updated (505b5b723b0 -> c581b4dc6bd)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 505b5b723b0 Restrict google-api-core and regenerate container dependencies. #22481 add 9e7cc7cfd76 Fixes #22438. Ensure that WindmillStateReader completes all batched read futures on exceptions. Also improve decoder exceptions to only affect the corresponding state not the rest of batched reads. new c581b4dc6bd Merge pull request #22461: Fixes #22438. Ensure that WindmillStateReader completes all batched read futures on exceptions The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../dataflow/worker/WindmillStateReader.java | 109 -- .../dataflow/worker/WindmillStateReaderTest.java | 161 - 2 files changed, 222 insertions(+), 48 deletions(-)
[beam] branch master updated: Merge pull request #15786: Add gap-filling transform for timeseries
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new b78a080e932 Merge pull request #15786: Add gap-filling transform for timeseries b78a080e932 is described below commit b78a080e93282c83b56a788459090530069033f2 Author: Reuven Lax AuthorDate: Wed Jul 13 13:11:34 2022 -0700 Merge pull request #15786: Add gap-filling transform for timeseries --- .github/autolabeler.yml| 1 + build.gradle.kts | 1 + .../org/apache/beam/sdk/coders/SortedMapCoder.java | 197 .../beam/sdk/schemas/transforms/WithKeys.java | 79 +++ sdks/java/extensions/timeseries/build.gradle | 32 ++ .../beam/sdk/extensions/timeseries/FillGaps.java | 535 + .../sdk/extensions/timeseries/package-info.java| 20 + .../sdk/extensions/timeseries/FillGapsTest.java| 355 ++ settings.gradle.kts| 1 + 9 files changed, 1221 insertions(+) diff --git a/.github/autolabeler.yml b/.github/autolabeler.yml index 715abeddecf..888c5d4f3a4 100644 --- a/.github/autolabeler.yml +++ b/.github/autolabeler.yml @@ -42,6 +42,7 @@ extensions: ["sdks/java/extensions/**/*", "runners/extensions-java/**/*"] "sketching": ["sdks/java/extensions/sketching/**/*"] "sorter": ["sdks/java/extensions/sorter/**/*"] "sql": ["sdks/java/extensions/sql/**/*"] +"timeseries": ["sdks/java/extensions/timeseries/*"] "zetasketch": ["sdks/java/extensions/zetasketch/**/*"] # IO diff --git a/build.gradle.kts b/build.gradle.kts index aa5f8949fa5..7ea18895f77 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -195,6 +195,7 @@ tasks.register("javaPreCommitPortabilityApi") { tasks.register("javaPostCommit") { dependsOn(":sdks:java:extensions:google-cloud-platform-core:postCommit") + dependsOn(":sdks:java:extensions:timeseries:postCommit") dependsOn(":sdks:java:extensions:zetasketch:postCommit") dependsOn(":sdks:java:extensions:ml:postCommit") } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java new file mode 100644 index 000..8448c915654 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SortedMapCoder.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; + +/** + * A {@link Coder} for {@link SortedMap Maps} that encodes them according to provided coders for + * keys and values. + * + * @param the type of the keys of the KVs being transcoded + * @param the type of the values of the KVs being transcoded + */ +public class SortedMapCoder, V> +extends StructuredCoder> { + /** Produces a MapCoder with the given keyCoder and valueCoder. */ + public static , V> SortedMapCoder of( + Coder keyCoder, Coder valueCoder) { +return new SortedMapCoder<>(keyCoder, valueCoder); + } + + public Coder getKeyCoder() { +return keyCoder; + } + + public Coder getValueCoder() { +return valueCoder; + } + + / + + private Coder keyCoder; + private Coder valueCoder; + + priv
[beam] branch master updated (c7e77f92248 -> 029a2e3d5aa)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from c7e77f92248 Adding VladMatyunin as collaborator (#22239) add d69b217f0a3 add new pubsub urn new 029a2e3d5aa Merge pull request #21966: add new pubsub urn The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto | 4 1 file changed, 4 insertions(+)
[beam] 01/01: Merge pull request #21966: add new pubsub urn
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 029a2e3d5aa8a1d71f855fab3bdecf46533d5d57 Merge: c7e77f92248 d69b217f0a3 Author: Reuven Lax AuthorDate: Tue Jul 12 10:25:26 2022 -0700 Merge pull request #21966: add new pubsub urn .../proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto | 4 1 file changed, 4 insertions(+)
[beam] branch master updated (ad25b8774b5 -> a23eca67718)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from ad25b8774b5 Merge pull request #22158 from Declarative theming, Remove duplicate PlaygroundState for embedded page, Do not re-create CodeController on language change (#22147) add 493b8866cbe set timestamp when outputting finalize element add a23eca67718 Merge pull request #22119: [22188] Set timestamp when outputting finalize element No new revisions were added by this update. Summary of changes: .../io/gcp/bigquery/StorageApiWritesShardedRecords.java | 17 +++-- 1 file changed, 11 insertions(+), 6 deletions(-)
[beam] branch master updated: Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow.
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ba01293ba31 Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow. ba01293ba31 is described below commit ba01293ba31fdac351f93521443dd91eef56a2ff Author: Reuven Lax AuthorDate: Tue Jun 14 21:32:12 2022 -0700 Merge pull request #21793: [21794 ] Fix output timestamp in Dataflow. --- .../dataflow/worker/WindmillTimerInternals.java| 55 -- .../worker/StreamingDataflowWorkerTest.java| 1 + .../worker/windmill/src/main/proto/windmill.proto | 2 + 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index 658e74ee6c1..c1f89c679d5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -210,22 +210,31 @@ class WindmillTimerInternals implements TimerInternals { // Setting the timer. If it is a user timer, set a hold. // Only set a hold if it's needed and if the hold is before the end of the global window. -if (needsWatermarkHold(timerData) -&& timerData -.getOutputTimestamp() - .isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1 { - // Setting a timer, clear any prior hold and set to the new value - outputBuilder - .addWatermarkHoldsBuilder() - .setTag(timerHoldTag(prefix, timerData)) - .setStateFamily(stateFamily) - .setReset(true) - .addTimestamps( - WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp())); +if (needsWatermarkHold(timerData)) { + if (timerData + .getOutputTimestamp() + .isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1 { +// Setting a timer, clear any prior hold and set to the new value +outputBuilder +.addWatermarkHoldsBuilder() +.setTag(timerHoldTag(prefix, timerData)) +.setStateFamily(stateFamily) +.setReset(true) +.addTimestamps( + WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp())); + } else { +// Clear the hold in case a previous iteration of this timer set one. +outputBuilder +.addWatermarkHoldsBuilder() +.setTag(timerHoldTag(prefix, timerData)) +.setStateFamily(stateFamily) +.setReset(true); + } } } else { // Deleting a timer. If it is a user timer, clear the hold timer.clearTimestamp(); +timer.clearMetadataTimestamp(); // Clear the hold even if it's the end of the global window in order to maintain update // compatibility. if (needsWatermarkHold(timerData)) { @@ -276,6 +285,9 @@ class WindmillTimerInternals implements TimerInternals { builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp())); +// Store the output timestamp in the metadata timestamp. +builder.setMetadataTimestamp( + WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp())); return builder; } @@ -344,8 +356,10 @@ class WindmillTimerInternals implements TimerInternals { ? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8() : ""; -// Parse the output timestamp. Not using '+' as a terminator because the output timestamp is the -// last segment in the tag and the timestamp encoding itself may contain '+'. +// For backwards compatibility, parse the output timestamp from the tag. Not using '+' as a +// terminator because the +// output timestamp is the last segment in the tag and the timestamp encoding itself may contain +// '+'. int outputTimestampStart = timerFamilyEnd + 1; int outputTimestampEnd = tag.size(); @@ -360,6 +374,8 @@ class WindmillTimerInternals implements TimerInternals { } catch (IOException e) { throw new RuntimeException(e); } +} else if (timer.hasMetadataTimestamp()) { + outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
[beam] branch master updated (6c6b5f74f23 -> b0d964c4309)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 6c6b5f74f23 Merge pull request #21736 from damccorm/users/damccorm/metrics add b0d964c4309 Merge pull request: [Beam-14528]: Add ISO time format support for Timestamp, Date, DateTime, Time field. No new revisions were added by this update. Summary of changes: .../io/gcp/bigquery/TableRowToStorageApiProto.java | 33 - .../bigquery/TableRowToStorageApiProtoTest.java| 86 +- 2 files changed, 113 insertions(+), 6 deletions(-)
[beam] branch master updated: Merge pull request #17779: [BEAM-14529] Add integer to float64 conversion support
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 871a98b97fb Merge pull request #17779: [BEAM-14529] Add integer to float64 conversion support 871a98b97fb is described below commit 871a98b97fb52169a245a8800b92586a0026a565 Author: Yiru Tang AuthorDate: Fri Jun 3 15:58:08 2022 -0700 Merge pull request #17779: [BEAM-14529] Add integer to float64 conversion support --- .../beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java | 2 +- .../beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java | 8 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 7c1aefd4b85..442bc57eb0d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -403,7 +403,7 @@ public class TableRowToStorageApiProto { case "FLOAT": if (value instanceof String) { return Double.valueOf((String) value); -} else if (value instanceof Double || value instanceof Float) { +} else if (value instanceof Number) { return ((Number) value).doubleValue(); } break; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index 459fed3a168..c2d6cf23984 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -457,7 +457,7 @@ public class TableRowToStorageApiProtoTest { new TableCell().setV("42"), new TableCell().setV("43"), new TableCell().setV("2.8168"), - new TableCell().setV("2.817"), + new TableCell().setV("2"), new TableCell().setV("true"), new TableCell().setV("true"), new TableCell().setV("1970-01-01T00:00:00.43Z"), @@ -476,7 +476,7 @@ public class TableRowToStorageApiProtoTest { .set("int64Value", "42") .set("intValue", "43") .set("float64Value", "2.8168") - .set("floatValue", "2.817") + .set("floatValue", "2") .set("boolValue", "true") .set("booleanValue", "true") .set("timestampValue", "1970-01-01T00:00:00.43Z") @@ -495,7 +495,7 @@ public class TableRowToStorageApiProtoTest { .put("int64value", (long) 42) .put("intvalue", (long) 43) .put("float64value", (double) 2.8168) - .put("floatvalue", (double) 2.817) + .put("floatvalue", (double) 2) .put("boolvalue", true) .put("booleanvalue", true) .put("timestampvalue", 43L) @@ -518,7 +518,7 @@ public class TableRowToStorageApiProtoTest { .put("int64value", (long) 42) .put("intvalue", (long) 43) .put("float64value", (double) 2.8168) - .put("floatvalue", (double) 2.817) + .put("floatvalue", (double) 2) .put("boolvalue", true) .put("booleanvalue", true) .put("timestampvalue", 43L)
[beam] 01/01: Merge pull request #17423: Handle invalid rows in the Storage Api sink
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 25039a8aea8ad549e8ba9fdbfcd67c9f8fc2f9bf Merge: 92b8dc75286 c4af119545c Author: Reuven Lax AuthorDate: Thu May 26 17:38:53 2022 -0700 Merge pull request #17423: Handle invalid rows in the Storage Api sink .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 4 +- .../bigquery/BigQueryStorageApiInsertError.java| 56 ++ .../BigQueryStorageApiInsertErrorCoder.java| 53 + .../io/gcp/bigquery/StorageApiConvertMessages.java | 64 --- .../bigquery/StorageApiDynamicDestinations.java| 3 + .../StorageApiDynamicDestinationsBeamRow.java | 6 + .../StorageApiDynamicDestinationsTableRow.java | 6 +- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 124 +++-- .../sdk/io/gcp/bigquery/StreamingWriteTables.java | 2 + .../io/gcp/bigquery/TableRowToStorageApiProto.java | 107 ++ .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 1 + .../beam/sdk/io/gcp/bigquery/WriteResult.java | 103 +++-- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 94 +++- .../gcp/bigquery/TableRowToStorageApiProtoIT.java | 1 - 14 files changed, 513 insertions(+), 111 deletions(-)
[beam] branch master updated (92b8dc75286 -> 25039a8aea8)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 92b8dc75286 [BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK (#17747) add c4af119545c DLQ for BQ Storage Api writes new 25039a8aea8 Merge pull request #17423: Handle invalid rows in the Storage Api sink The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 4 +- ...nfo.java => BigQueryStorageApiInsertError.java} | 39 +-- .../BigQueryStorageApiInsertErrorCoder.java} | 38 +++ .../io/gcp/bigquery/StorageApiConvertMessages.java | 64 --- .../bigquery/StorageApiDynamicDestinations.java| 3 + .../StorageApiDynamicDestinationsBeamRow.java | 6 + .../StorageApiDynamicDestinationsTableRow.java | 6 +- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 124 +++-- .../sdk/io/gcp/bigquery/StreamingWriteTables.java | 2 + .../io/gcp/bigquery/TableRowToStorageApiProto.java | 107 ++ .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 1 + .../beam/sdk/io/gcp/bigquery/WriteResult.java | 103 +++-- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 94 +++- .../gcp/bigquery/TableRowToStorageApiProtoIT.java | 1 - 14 files changed, 454 insertions(+), 138 deletions(-) copy sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/{TableRowInfo.java => BigQueryStorageApiInsertError.java} (56%) copy sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/{bigtable/BigtableWriteResultCoder.java => bigquery/BigQueryStorageApiInsertErrorCoder.java} (51%)
[beam] branch master updated: Merge pull request #17359: [BEAM-14303] Add a way to exclude output timestamp watermark holds
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new fa01615e520 Merge pull request #17359: [BEAM-14303] Add a way to exclude output timestamp watermark holds fa01615e520 is described below commit fa01615e5207b53a7f73ff6a5ff55c336717c88f Author: Reuven Lax AuthorDate: Thu May 5 14:23:48 2022 -0700 Merge pull request #17359: [BEAM-14303] Add a way to exclude output timestamp watermark holds --- .../apache/beam/runners/core/SimpleDoFnRunner.java | 170 + .../beam/runners/core/SimpleDoFnRunnerTest.java| 10 +- .../dataflow/worker/WindmillTimerInternals.java| 10 +- .../main/java/org/apache/beam/sdk/state/Timer.java | 6 + .../apache/beam/sdk/transforms/Deduplicate.java| 8 +- .../org/apache/beam/sdk/transforms/ParDoTest.java | 50 ++ .../apache/beam/fn/harness/FnApiDoFnRunner.java| 65 +--- .../bigquery/StorageApiWritesShardedRecords.java | 11 +- 8 files changed, 186 insertions(+), 144 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index a73dd521f86..a7d67bf7526 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -246,6 +246,30 @@ public class SimpleDoFnRunner implements DoFnRunner= Integer.MAX_VALUE + ? fn.getAllowedTimestampSkew() + : PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), + BoundedWindow.TIMESTAMP_MAX_VALUE)); +} + } + private void outputWindowedValue(TupleTag tag, WindowedValue windowedElem) { checkArgument(outputTags.contains(tag), "Unknown output tag %s", tag); outputManager.output(tag, windowedElem); @@ -389,7 +413,7 @@ public class SimpleDoFnRunner implements DoFnRunner implements DoFnRunner void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); - checkTimestamp(timestamp); + checkTimestamp(elem.getTimestamp(), timestamp); outputWindowedValue( tag, WindowedValue.of(output, timestamp, elem.getWindows(), elem.getPane())); } @@ -416,30 +440,6 @@ public class SimpleDoFnRunner implements DoFnRunner= Integer.MAX_VALUE -? fn.getAllowedTimestampSkew() -: PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), -BoundedWindow.TIMESTAMP_MAX_VALUE)); - } -} - @Override public BoundedWindow window() { return Iterables.getOnlyElement(elem.getWindows()); @@ -834,18 +834,19 @@ public class SimpleDoFnRunner implements DoFnRunner void output(TupleTag tag, T output) { + checkTimestamp(timestamp(), timestamp); outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - checkTimestamp(timestamp); + checkTimestamp(timestamp(), timestamp); outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @@ -854,30 +855,6 @@ public class SimpleDoFnRunner implements DoFnRunner= Integer.MAX_VALUE -? fn.getAllowedTimestampSkew() -: PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), -BoundedWindow.TIMESTAMP_MAX_VALUE)); - } -} } /** @@ -1064,17 +1041,19 @@ public class SimpleDoFnRunner implements DoFnRunner void output(TupleTag tag, T output) { + checkTimestamp(this.timestamp, timestamp); outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - checkTimestamp(timestamp); + checkTimestamp(this.timestamp, timestamp); outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @@ -1083,30 +1062,6 @@ public class SimpleDoFnRunner implements DoFnRunner= Integer.MAX_VALUE -? fn.getAllowedTimestampSkew() -: PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), -BoundedWindow.TIMESTAMP_MAX_VALUE)); - } -} } private class TimerInternalsTimer implements Timer { @@ -1121,7 +1076,8 @@ public class SimpleDoFnRunner implements DoFnRunner implements DoFnRunner implements DoFnRunner implements DoFnRunner element, -
[beam] branch master updated: Merge pull request #17417: [BEAM-14388] Address some performance problems with the storage API
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 52c3f07aa56 Merge pull request #17417: [BEAM-14388] Address some performance problems with the storage API 52c3f07aa56 is described below commit 52c3f07aa56de738db3130283716d38206de42b7 Author: Reuven Lax AuthorDate: Wed May 4 13:14:07 2022 -0700 Merge pull request #17417: [BEAM-14388] Address some performance problems with the storage API --- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 6 ++ .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 8 +++ .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 15 .../StorageApiWriteRecordsInconsistent.java| 7 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 84 ++ 5 files changed, 106 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 85437f267d0..a9beb5cbd7c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -123,4 +123,10 @@ public interface BigQueryOptions Integer getSchemaUpdateRetries(); void setSchemaUpdateRetries(Integer value); + + @Description("Maximum (best effort) size of a single append to the storage API.") + @Default.Integer(2 * 1024 * 1024) + Integer getStorageApiAppendThresholdBytes(); + + void setStorageApiAppendThresholdBytes(Integer value); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 8a7eff378c6..23fce8ba6ff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -206,6 +206,14 @@ public interface BigQueryServices extends Serializable { /** Append rows to a Storage API write stream at the given offset. */ ApiFuture appendRows(long offset, ProtoRows rows) throws Exception; +/** + * If the previous call to appendRows blocked due to flow control, returns how long the call + * blocked for. + */ +default long getInflightWaitSeconds() { + return 0; +} + /** * Pin this object. If close() is called before all pins are removed, the underlying resources * will not be freed until all pins are removed. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 85a53487f88..2949150c9ee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -33,6 +33,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.Bigquery.Tables; @@ -1223,9 +1224,18 @@ class BigQueryServicesImpl implements BigQueryServices { ProtoSchema protoSchema = ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build(); + TransportChannelProvider transportChannelProvider = + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .setChannelsPerCpu(2) + .build(); + StreamWriter streamWriter = StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) + .setChannelProvider(transportChannelProvider) .setTraceId( "Dataflow:" + (bqIOMetadata.getBeamJobId() != null @@ -1275,6 +1285,11 @@ class BigQueryServicesImpl implements BigQueryServices { throws Exception { return streamWriter.append(rows, offset); }
[beam] branch master updated: Merge pull request #17404: [BEAM-13990] support date and timestamp fields
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 58b4d762eec Merge pull request #17404: [BEAM-13990] support date and timestamp fields 58b4d762eec is described below commit 58b4d762eece66774a5df6ca54e6f91c49057c9b Author: Reuven Lax AuthorDate: Fri Apr 29 22:02:56 2022 -0700 Merge pull request #17404: [BEAM-13990] support date and timestamp fields --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 + sdks/java/io/google-cloud-platform/build.gradle| 2 + .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 + .../StorageApiDynamicDestinationsTableRow.java | 13 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 4 + .../bigquery/StorageApiWritesShardedRecords.java | 4 + .../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 8 +- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 339 - .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 8 - .../gcp/bigquery/TableRowToStorageApiProtoIT.java | 407 + .../bigquery/TableRowToStorageApiProtoTest.java| 140 --- .../beam/sdk/io/gcp/spanner/StructUtilsTest.java | 3 +- 12 files changed, 777 insertions(+), 154 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 5d5b1e9377f..0f0c11dc2cb 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -631,6 +631,7 @@ class BeamModulePlugin implements Plugin { jackson_dataformat_xml : "com.fasterxml.jackson.dataformat:jackson-dataformat-xml:$jackson_version", jackson_dataformat_yaml : "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jackson_version", jackson_datatype_joda : "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jackson_version", +jackson_datatype_jsr310 : "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jackson_version", jackson_module_scala_2_11 : "com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version", jackson_module_scala_2_12 : "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jackson_version", // Swap to use the officially published version of 0.4.x once available diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 8d1d0a1776d..cd71fc35e21 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -116,6 +116,8 @@ dependencies { implementation library.java.http_core implementation library.java.jackson_core implementation library.java.jackson_databind + implementation library.java.jackson_datatype_joda + implementation library.java.jackson_datatype_jsr310 implementation library.java.joda_time implementation library.java.junit implementation library.java.netty_handler diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 54f485a9d71..2a3e595adc1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3085,6 +3085,8 @@ public class BigQueryIO { CreateTables.clearCreatedTables(); TwoLevelMessageConverterCache.clear(); StorageApiDynamicDestinationsTableRow.clearSchemaCache(); +StorageApiWriteUnshardedRecords.clearCache(); +StorageApiWritesShardedRecords.clearCache(); } / diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java index c48dbe0dedb..3c9f676cfad 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java @@ -70,6 +70,7 @@ public class StorageApiDynamicDestinationsTableRow DestinationT destination, DatasetService datasetService) throws Exception { re
[beam] branch master updated: Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 00320aa7793 Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness 00320aa7793 is described below commit 00320aa7793ed4322a3fd029864dd300ac8bec00 Author: Reuven Lax AuthorDate: Fri Apr 22 10:14:34 2022 -0700 Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness --- .../StorageApiDynamicDestinationsTableRow.java | 10 +- .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 42 ++ 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java index 281a7adb529..c48dbe0dedb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaTooNarrowException; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,10 +75,10 @@ public class StorageApiDynamicDestinationsTableRow { tableSchema = getSchema(destination); +TableReference tableReference = getTable(destination).getTableReference(); if (tableSchema == null) { // If the table already exists, then try and fetch the schema from the existing // table. - TableReference tableReference = getTable(destination).getTableReference(); tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService); if (tableSchema == null) { if (createDisposition == CreateDisposition.CREATE_NEVER) { @@ -95,7 +96,14 @@ public class StorageApiDynamicDestinationsTableRow + "using a create disposition of CREATE_IF_NEEDED."); } } +} else { + // Make sure we register this schema with the cache, unless there's already a more + // up-to-date schema. + tableSchema = + MoreObjects.firstNonNull( + SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema); } + descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema); descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java index 83246b26de2..e1775af2289 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java @@ -178,6 +178,21 @@ public class TableSchemaCache { return schemaHolder.map(SchemaHolder::getTableSchema).orElse(null); } + /** + * Registers schema for a table if one is not already present. If a schema is already in the + * cache, returns the existing schema, otherwise returns null. + */ + @Nullable + public TableSchema putSchemaIfAbsent(TableReference tableReference, TableSchema tableSchema) { +final String key = tableKey(tableReference); +Optional existing = +runUnderMonitor( +() -> +Optional.ofNullable( +this.cachedSchemas.putIfAbsent(key, SchemaHolder.of(tableSchema, 0; +return existing.map(SchemaHolder::getTableSchema).orElse(null); + } + public void refreshSchema(TableReference tableReference, DatasetService datasetService) { int targetVersion = runUnderMonitor( @@ -187,13 +202,11 @@ public class TableSchemaCache { "Cannot call refreshSchema after the object has been stopped!"); } String key = tableKey(tableReference); - SchemaHolder schemaHolder = cachedSchemas.get
[beam] 01/01: Merge pull request #17401: [BEAM-14326] mark static thread as a daemon thread
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 374566d80e96f6163ebeb4a893f6193d42ebec1d Merge: 1808a343b36 54d3b27a898 Author: Reuven Lax AuthorDate: Tue Apr 19 14:51:43 2022 -0700 Merge pull request #17401: [BEAM-14326] mark static thread as a daemon thread .../main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 1 + 1 file changed, 1 insertion(+)
[beam] branch master updated (1808a343b36 -> 374566d80e9)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 1808a343b36 Populate actual dataflow job id to bigquery write trace id (#17130) add 54d3b27a898 mark static thread as a daemon thread new 374566d80e9 Merge pull request #17401: [BEAM-14326] mark static thread as a daemon thread The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 1 + 1 file changed, 1 insertion(+)
[beam] branch master updated: Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 4b709d5456b Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables 4b709d5456b is described below commit 4b709d5456b105ffcc251da7a0a4a0b560491b1c Author: Minbo Bae <49642083+baemi...@users.noreply.github.com> AuthorDate: Tue Apr 19 11:50:24 2022 +0900 Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables --- .../sdk/io/gcp/bigquery/BigQueryQueryHelper.java | 195 +++-- .../io/gcp/bigquery/BigQueryQuerySourceDef.java| 17 +- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java| 55 +++--- 3 files changed, 138 insertions(+), 129 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java index b78afc514ae..a42eea20052 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java @@ -96,105 +96,112 @@ class BigQueryQueryHelper { throws InterruptedException, IOException { // Step 1: Find the effective location of the query. String effectiveLocation = location; -DatasetService tableService = bqServices.getDatasetService(options); -if (effectiveLocation == null) { - List referencedTables = - dryRunQueryIfNeeded( - bqServices, - options, - dryRunJobStats, - query, - flattenResults, - useLegacySql, - location) - .getQuery() - .getReferencedTables(); - if (referencedTables != null && !referencedTables.isEmpty()) { -TableReference referencedTable = referencedTables.get(0); -effectiveLocation = -tableService -.getDataset(referencedTable.getProjectId(), referencedTable.getDatasetId()) -.getLocation(); +try (DatasetService tableService = bqServices.getDatasetService(options)) { + if (effectiveLocation == null) { +List referencedTables = +dryRunQueryIfNeeded( +bqServices, +options, +dryRunJobStats, +query, +flattenResults, +useLegacySql, +location) +.getQuery() +.getReferencedTables(); +if (referencedTables != null && !referencedTables.isEmpty()) { + TableReference referencedTable = referencedTables.get(0); + effectiveLocation = + tableService + .getDataset(referencedTable.getProjectId(), referencedTable.getDatasetId()) + .getLocation(); +} } -} -// Step 2: Create a temporary dataset in the query location only if the user has not specified a -// temp dataset. -String queryJobId = -BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.QUERY); -Optional queryTempDatasetOpt = Optional.ofNullable(queryTempDatasetId); -TableReference queryResultTable = -createTempTableReference( -options.getBigQueryProject() == null -? options.getProject() -: options.getBigQueryProject(), -queryJobId, -queryTempDatasetOpt); - -boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent(); -// Create dataset only if it has not been set by the user -if (beamToCreateTempDataset) { - LOG.info("Creating temporary dataset {} for query results", queryResultTable.getDatasetId()); - - tableService.createDataset( - queryResultTable.getProjectId(), - queryResultTable.getDatasetId(), - effectiveLocation, - "Temporary tables for query results of job " + options.getJobName(), - TimeUnit.DAYS.toMillis(1)); -} else { // If the user specified a temp dataset, check that the destination table does not - // exist - Table destTable = tableService.getTable(queryResultTable); - checkArgument( - destTable == null, - "Refusing to write on existing table {} in the specified temp dataset {}", - queryResultTable.getTableId(), - queryResultTable.getDatasetId()); -} + // Step 2: Create a temporary dataset in the query location only if the user has not specified + // a temp dataset. + String queryJobId =
[beam] branch master updated (fbf16e5412d -> 1dd98718ab5)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from fbf16e5412d [BEAM-14287] Clean up staticcheck warnings in graph/coder (#17337) add 227fca623ad handle changing schemas in Storage API sink add 1dd98718ab5 Merge pull request #17070: Handle changing schemas in Storage API sink No new revisions were added by this update. Summary of changes: .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 13 +- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 6 + .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java| 29 ++ ...ateTables.java => CreateTableDestinations.java} | 101 --- .../sdk/io/gcp/bigquery/CreateTableHelpers.java| 10 +- .../sdk/io/gcp/bigquery/SplittingIterable.java | 37 ++- .../io/gcp/bigquery/StorageApiConvertMessages.java | 13 +- .../bigquery/StorageApiDynamicDestinations.java| 23 +- .../StorageApiDynamicDestinationsBeamRow.java | 27 +- .../StorageApiDynamicDestinationsTableRow.java | 162 --- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 73 +++-- .../io/gcp/bigquery/StorageApiWritePayload.java} | 16 +- .../StorageApiWriteRecordsInconsistent.java| 26 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 138 +- .../bigquery/StorageApiWritesShardedRecords.java | 58 ++-- .../io/gcp/bigquery/TableRowToStorageApiProto.java | 88 -- .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 296 + .../bigquery/TwoLevelMessageConverterCache.java| 4 + .../sdk/io/gcp/testing/FakeDatasetService.java | 80 -- .../beam/sdk/io/gcp/testing/FakeJobService.java| 18 +- .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java| 5 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 126 - .../bigquery/TableRowToStorageApiProtoTest.java| 11 +- 23 files changed, 1027 insertions(+), 333 deletions(-) copy sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/{CreateTables.java => CreateTableDestinations.java} (52%) copy sdks/java/io/{jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteResult.java => google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java} (66%) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java