reuvenlax commented on code in PR #24145:
URL: https://github.com/apache/beam/pull/24145#discussion_r1067486236
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java:
##########
@@ -28,35 +35,111 @@
* StorageApiWritesShardedRecords} to enapsulate a destination {@link
TableSchema} along with a
* {@link BigQueryServices.StreamAppendClient} and other objects needed to
write records.
*/
-class AppendClientInfo {
- @Nullable BigQueryServices.StreamAppendClient streamAppendClient;
- @Nullable TableSchema tableSchema;
- Consumer<BigQueryServices.StreamAppendClient> closeAppendClient;
- Descriptors.Descriptor descriptor;
+@AutoValue
+abstract class AppendClientInfo {
+ abstract @Nullable BigQueryServices.StreamAppendClient
getStreamAppendClient();
- public AppendClientInfo(
+ abstract TableSchema getTableSchema();
+
+ abstract Consumer<BigQueryServices.StreamAppendClient>
getCloseAppendClient();
+
+ abstract com.google.api.services.bigquery.model.TableSchema
getJsonTableSchema();
+
+ abstract TableRowToStorageApiProto.SchemaInformation getSchemaInformation();
+
+ abstract Descriptors.Descriptor getDescriptor();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setStreamAppendClient(@Nullable
BigQueryServices.StreamAppendClient value);
+
+ abstract Builder setTableSchema(TableSchema value);
+
+ abstract Builder
setCloseAppendClient(Consumer<BigQueryServices.StreamAppendClient> value);
+
+ abstract Builder
setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value);
+
+ abstract Builder
setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value);
+
+ abstract Builder setDescriptor(Descriptors.Descriptor value);
+
+ abstract AppendClientInfo build();
+ };
+
+ abstract Builder toBuilder();
+
+ static AppendClientInfo of(
TableSchema tableSchema, Consumer<BigQueryServices.StreamAppendClient>
closeAppendClient)
throws Exception {
- this.tableSchema = tableSchema;
- this.descriptor =
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
- this.closeAppendClient = closeAppendClient;
+ return new AutoValue_AppendClientInfo.Builder()
+ .setTableSchema(tableSchema)
+ .setCloseAppendClient(closeAppendClient)
+
.setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema))
+ .setSchemaInformation(
+
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema))
+
.setDescriptor(TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema,
true))
+ .build();
}
- public AppendClientInfo createAppendClient(
+ public AppendClientInfo withNoAppendClient() {
+ return toBuilder().setStreamAppendClient(null).build();
+ }
+
+ public AppendClientInfo withAppendClient(
BigQueryServices.DatasetService datasetService,
Supplier<String> getStreamName,
boolean useConnectionPool)
throws Exception {
- if (streamAppendClient == null) {
- this.streamAppendClient =
- datasetService.getStreamAppendClient(getStreamName.get(),
descriptor, useConnectionPool);
+ if (getStreamAppendClient() != null) {
+ return this;
+ } else {
+ return toBuilder()
+ .setStreamAppendClient(
+ datasetService.getStreamAppendClient(
+ getStreamName.get(), getDescriptor(), useConnectionPool))
+ .build();
}
- return this;
}
public void close() {
- if (streamAppendClient != null) {
- closeAppendClient.accept(streamAppendClient);
+ BigQueryServices.StreamAppendClient client = getStreamAppendClient();
+ if (client != null) {
+ getCloseAppendClient().accept(client);
+ }
+ }
+
+ boolean hasSchemaChanged(TableSchema updatedTableSchema) {
+ return updatedTableSchema.hashCode() != getTableSchema().hashCode();
+ }
+
+ public ByteString encodeUnknownFields(TableRow unknown, boolean
ignoreUnknownValues)
Review Comment:
No - these are fields that are unknown to the prior step. They make actually
end up being known to the current step due to the updated schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]