reuvenlax commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3125235042


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -425,18 +523,41 @@ public interface ThrowingBiFunction<FirstInputT, 
SecondInputT, OutputT> {
                   })
               .put(
                   TableFieldSchema.Type.STRING,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .put(
                   TableFieldSchema.Type.JSON,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .put(
                   TableFieldSchema.Type.GEOGRAPHY,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .build();
 
+  static final HashFunction SCHEMA_HASH_FUNCTION = Hashing.goodFastHash(32);
+
+  public static byte[] tableSchemaHash(TableSchema tableSchema) {
+    return tableSchemaHash("", tableSchema.getFieldsList()).asBytes();
+  }
+
+  public static HashCode tableSchemaHash(String prefix, List<TableFieldSchema> 
fields) {
+    List<HashCode> hashCodes = Lists.newArrayList();
+    for (TableFieldSchema tableFieldSchema : fields) {
+      String name =
+          prefix.isEmpty()
+              ? tableFieldSchema.getName()
+              : String.join(".", prefix, tableFieldSchema.getName());
+      hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), 
StandardCharsets.UTF_8));
+      hashCodes.add(
+          SCHEMA_HASH_FUNCTION.hashString(
+              tableFieldSchema.getType().toString().toLowerCase(), 
StandardCharsets.UTF_8));

Review Comment:
   ah yes - I forgot that I was using the proto representation here :)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4233,13 +4250,18 @@ private <DestinationT> WriteResult continueExpandTyped(
               !getIgnoreUnknownValues(),
               "ignoreUnknownValues not supported when using writeProtos."
                   + " Try setting withDirectWriteProtos(false)");
+          checkArgument(!useSchemaUpdate);
+

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -89,6 +93,45 @@
  * with the Storage write API.
  */
 public class TableRowToStorageApiProto {
+
+  public static class ErrorCollector {
+    private final List<SchemaConversionException> exceptions = 
Lists.newArrayList();
+    private final Predicate<SchemaConversionException> shouldCollect;
+
+    public static final ErrorCollector DONT_COLLECT = new 
ErrorCollector(Predicates.alwaysFalse());
+
+    public ErrorCollector(Predicate<SchemaConversionException> shouldCollect) {
+      this.shouldCollect = shouldCollect;
+    }
+
+    // Returns true if the exception was collected.
+    void collect(SchemaConversionException exception) throws 
SchemaConversionException {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to