reuvenlax commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3121490027
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -425,18 +519,38 @@ public interface ThrowingBiFunction<FirstInputT,
SecondInputT, OutputT> {
})
.put(
TableFieldSchema.Type.STRING,
- (schemaInformation, value) ->
- Preconditions.checkArgumentNotNull(value).toString())
+ (fullName, value) ->
Preconditions.checkArgumentNotNull(value).toString())
.put(
TableFieldSchema.Type.JSON,
- (schemaInformation, value) ->
- Preconditions.checkArgumentNotNull(value).toString())
+ (fullName, value) ->
Preconditions.checkArgumentNotNull(value).toString())
.put(
TableFieldSchema.Type.GEOGRAPHY,
- (schemaInformation, value) ->
- Preconditions.checkArgumentNotNull(value).toString())
+ (fullName, value) ->
Preconditions.checkArgumentNotNull(value).toString())
.build();
+ static final HashFunction SCHEMA_HASH_FUNCTION = Hashing.goodFastHash(32);
+
+ public static byte[] tableSchemaHash(TableSchema tableSchema) {
+ return tableSchemaHash("", tableSchema.getFieldsList()).asBytes();
+ }
+
+ public static HashCode tableSchemaHash(String prefix, List<TableFieldSchema>
fields) {
Review Comment:
Done and done.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -89,6 +93,45 @@
* with the Storage write API.
*/
public class TableRowToStorageApiProto {
+
+ public static class ErrorCollector {
+ private final List<SchemaConversionException> exceptions =
Lists.newArrayList();
+ private final Predicate<SchemaConversionException> shouldCollect;
+
+ public static final ErrorCollector DONT_COLLECT = new
ErrorCollector(Predicates.alwaysFalse());
+
+ public ErrorCollector(Predicate<SchemaConversionException> shouldCollect) {
+ this.shouldCollect = shouldCollect;
+ }
+
+ // Returns true if the exception was collected.
+ void collect(SchemaConversionException exception) throws
SchemaConversionException {
Review Comment:
Done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4202,9 +4206,19 @@ private <DestinationT> WriteResult continueExpandTyped(
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method ==
Method.STORAGE_API_AT_LEAST_ONCE) {
+ boolean useSchemaUpdate =
+ getSchemaUpdateOptions() != null &&
!getSchemaUpdateOptions().isEmpty();
+ if (useSchemaUpdate) {
+ checkArgument(
+ !getAutoSchemaUpdate(),
+ "Schema update options are not supported when using auto schema
update");
+ checkArgument(!getIgnoreUnknownValues());
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4202,9 +4206,19 @@ private <DestinationT> WriteResult continueExpandTyped(
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method ==
Method.STORAGE_API_AT_LEAST_ONCE) {
+ boolean useSchemaUpdate =
+ getSchemaUpdateOptions() != null &&
!getSchemaUpdateOptions().isEmpty();
+ if (useSchemaUpdate) {
+ checkArgument(
+ !getAutoSchemaUpdate(),
+ "Schema update options are not supported when using auto schema
update");
+ checkArgument(!getIgnoreUnknownValues());
+ }
BigQueryOptions bqOptions =
input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiDynamicDestinations<T, DestinationT>
storageApiDynamicDestinations;
if (getUseBeamSchema()) {
+ checkArgument(
+ !useSchemaUpdate, "SchemaUpdateOptions are not supported when
using Beam schemas");
Review Comment:
Shouldn't. this is inside the file-loads block if the if statement.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4233,13 +4250,18 @@ private <DestinationT> WriteResult continueExpandTyped(
!getIgnoreUnknownValues(),
"ignoreUnknownValues not supported when using writeProtos."
+ " Try setting withDirectWriteProtos(false)");
+ checkArgument(!useSchemaUpdate);
+
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]