[ 
https://issues.apache.org/jira/browse/BEAM-13990?focusedWorklogId=749827&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-749827
 ]

ASF GitHub Bot logged work on BEAM-13990:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Mar/22 03:37
            Start Date: 30/Mar/22 03:37
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on a change in pull request #16926:
URL: https://github.com/apache/beam/pull/16926#discussion_r838077903



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -182,116 +180,164 @@ private static void fieldDescriptorFromTableField(
     descriptorBuilder.addField(fieldDescriptorBuilder.build());
   }
 
+  private static DynamicMessage messageFromMap(
+      StorageApiDynamicDestinationsTableRow.BqSchema bqSchema,
+      Descriptor descriptor,
+      AbstractMap<String, Object> map) {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+
+      @Nullable
+      FieldDescriptor fieldDescriptor = 
descriptor.findFieldByName(entry.getKey().toLowerCase());
+      if (fieldDescriptor == null) {
+        throw new RuntimeException(
+            "TableRow contained unexpected field with name " + entry.getKey());
+      }
+      StorageApiDynamicDestinationsTableRow.BqSchema subBqSchema =
+          bqSchema.getSubFieldByName(entry.getKey());
+      @Nullable
+      Object value = messageValueFromFieldValue(subBqSchema, fieldDescriptor, 
entry.getValue());
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
   @Nullable
   private static Object messageValueFromFieldValue(
-      FieldDescriptor fieldDescriptor, Object bqValue) {
+      StorageApiDynamicDestinationsTableRow.BqSchema bqSchema,
+      FieldDescriptor fieldDescriptor,
+      Object bqValue) {
+    // handle null
     if (bqValue == null) {
       if (fieldDescriptor.isOptional()) {
         return null;
-      } else if (fieldDescriptor.isRepeated()) {
+      } else if (fieldDescriptor.isRepeated()) { // repeated field cannot be 
null
         return Collections.emptyList();
-      }
-      {
+      } else {
         throw new IllegalArgumentException(
             "Received null value for non-nullable field " + 
fieldDescriptor.getName());
       }
     }
-    return toProtoValue(fieldDescriptor, bqValue, 
fieldDescriptor.isRepeated());
-  }
 
-  private static final Map<FieldDescriptor.Type, Function<String, Object>>
-      JSON_PROTO_STRING_PARSERS =
-          ImmutableMap.<FieldDescriptor.Type, Function<String, 
Object>>builder()
-              .put(FieldDescriptor.Type.INT32, Integer::valueOf)
-              .put(FieldDescriptor.Type.INT64, Long::valueOf)
-              .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
-              .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
-              .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
-              .put(FieldDescriptor.Type.STRING, str -> str)
-              .put(
-                  FieldDescriptor.Type.BYTES,
-                  b64 -> 
ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
-              .build();
-
-  @Nullable
-  @SuppressWarnings({"nullness"})
-  @VisibleForTesting
-  static Object toProtoValue(
-      FieldDescriptor fieldDescriptor, Object jsonBQValue, boolean isRepeated) 
{
-    if (isRepeated) {
-      return ((List<Object>) jsonBQValue)
-          .stream().map(v -> toProtoValue(fieldDescriptor, v, 
false)).collect(toList());
+    // handle repeated value
+    if (fieldDescriptor.isRepeated()) {
+      return ((List<Object>) bqValue)
+          .stream()
+              .filter(Objects::nonNull) // repeated field cannot contain null
+              .map(v -> scalarToProtoValue(bqSchema, fieldDescriptor, v))
+              .collect(toList());
     }
 
-    if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
-      if (jsonBQValue instanceof TableRow) {
-        TableRow tableRow = (TableRow) jsonBQValue;
-        return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow);
-      } else if (jsonBQValue instanceof AbstractMap) {
-        // This will handle nested rows.
-        AbstractMap<String, Object> map = ((AbstractMap<String, Object>) 
jsonBQValue);
-        return messageFromMap(fieldDescriptor.getMessageType(), map);
-      } else {
-        throw new RuntimeException("Unexpected value " + jsonBQValue + " 
Expected a JSON map.");
-      }
-    }
-    @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor, 
jsonBQValue);
-    if (scalarValue == null) {
-      return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated);
-    } else {
-      return scalarValue;
-    }
+    // handle scalar non nullable value
+    return scalarToProtoValue(bqSchema, fieldDescriptor, bqValue);
   }
 
   @VisibleForTesting
-  @Nullable
-  static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object 
jsonBQValue) {
-    if (jsonBQValue instanceof String) {
-      Function<String, Object> mapper = 
JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
-      if (mapper == null) {
-        throw new UnsupportedOperationException(
-            "Converting BigQuery type '"
-                + jsonBQValue.getClass()
-                + "' to '"
-                + fieldDescriptor
-                + "' is not supported");
-      }
-      return mapper.apply((String) jsonBQValue);
-    }
-
-    switch (fieldDescriptor.getType()) {
-      case BOOL:
-        if (jsonBQValue instanceof Boolean) {
-          return jsonBQValue;
+  static Object scalarToProtoValue(

Review comment:
       I'm not sure  I consider a message type  a scalar

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -88,29 +99,14 @@ public static Descriptor 
getDescriptorFromTableSchema(TableSchema jsonSchema)
     return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
   }
 
-  public static DynamicMessage messageFromMap(
-      Descriptor descriptor, AbstractMap<String, Object> map) {
-    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
-    for (Map.Entry<String, Object> entry : map.entrySet()) {
-      @Nullable
-      FieldDescriptor fieldDescriptor = 
descriptor.findFieldByName(entry.getKey().toLowerCase());
-      if (fieldDescriptor == null) {
-        throw new RuntimeException(
-            "TableRow contained unexpected field with name " + entry.getKey());
-      }
-      @Nullable Object value = messageValueFromFieldValue(fieldDescriptor, 
entry.getValue());
-      if (value != null) {
-        builder.setField(fieldDescriptor, value);
-      }
-    }
-    return builder.build();
-  }
-
   /**
    * Given a BigQuery TableRow, returns a protocol-buffer message that can be 
used to write data
    * using the BigQuery Storage API.
    */
-  public static DynamicMessage messageFromTableRow(Descriptor descriptor, 
TableRow tableRow) {
+  public static DynamicMessage messageFromTableRow(
+      StorageApiDynamicDestinationsTableRow.BqSchema bqSchema,

Review comment:
       This shouldn't be referencing StorageApiDynamicDestinationsTableRow -  
if anything the class belongs in this  file instead

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
##########
@@ -68,7 +71,13 @@ public long getEncodedElementByteSize(TableRow value) throws 
Exception {
   // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
   // TableRow.
   private static final ObjectMapper MAPPER =
-      new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+      JsonMapper.builder()
+          .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
+          .addModule(new JavaTimeModule())
+          .addModule(new JodaModule())
+          // serialize date/time to string instead of arrays, instant to 
string instead of floats
+          .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+          .build();

Review comment:
       again,   is this backwards compatible?  i.e. can  a record serialized 
with an old version of this coder be deserialized by  this code? If not, we 
can't make this change

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -182,116 +180,164 @@ private static void fieldDescriptorFromTableField(
     descriptorBuilder.addField(fieldDescriptorBuilder.build());
   }
 
+  private static DynamicMessage messageFromMap(
+      StorageApiDynamicDestinationsTableRow.BqSchema bqSchema,
+      Descriptor descriptor,
+      AbstractMap<String, Object> map) {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+
+      @Nullable
+      FieldDescriptor fieldDescriptor = 
descriptor.findFieldByName(entry.getKey().toLowerCase());
+      if (fieldDescriptor == null) {
+        throw new RuntimeException(
+            "TableRow contained unexpected field with name " + entry.getKey());
+      }
+      StorageApiDynamicDestinationsTableRow.BqSchema subBqSchema =
+          bqSchema.getSubFieldByName(entry.getKey());
+      @Nullable
+      Object value = messageValueFromFieldValue(subBqSchema, fieldDescriptor, 
entry.getValue());
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
   @Nullable
   private static Object messageValueFromFieldValue(
-      FieldDescriptor fieldDescriptor, Object bqValue) {
+      StorageApiDynamicDestinationsTableRow.BqSchema bqSchema,
+      FieldDescriptor fieldDescriptor,
+      Object bqValue) {
+    // handle null
     if (bqValue == null) {
       if (fieldDescriptor.isOptional()) {
         return null;
-      } else if (fieldDescriptor.isRepeated()) {
+      } else if (fieldDescriptor.isRepeated()) { // repeated field cannot be 
null
         return Collections.emptyList();
-      }
-      {
+      } else {
         throw new IllegalArgumentException(
             "Received null value for non-nullable field " + 
fieldDescriptor.getName());
       }
     }
-    return toProtoValue(fieldDescriptor, bqValue, 
fieldDescriptor.isRepeated());
-  }
 
-  private static final Map<FieldDescriptor.Type, Function<String, Object>>
-      JSON_PROTO_STRING_PARSERS =
-          ImmutableMap.<FieldDescriptor.Type, Function<String, 
Object>>builder()
-              .put(FieldDescriptor.Type.INT32, Integer::valueOf)
-              .put(FieldDescriptor.Type.INT64, Long::valueOf)
-              .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
-              .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
-              .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
-              .put(FieldDescriptor.Type.STRING, str -> str)
-              .put(
-                  FieldDescriptor.Type.BYTES,
-                  b64 -> 
ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
-              .build();
-
-  @Nullable
-  @SuppressWarnings({"nullness"})
-  @VisibleForTesting
-  static Object toProtoValue(
-      FieldDescriptor fieldDescriptor, Object jsonBQValue, boolean isRepeated) 
{
-    if (isRepeated) {
-      return ((List<Object>) jsonBQValue)
-          .stream().map(v -> toProtoValue(fieldDescriptor, v, 
false)).collect(toList());
+    // handle repeated value
+    if (fieldDescriptor.isRepeated()) {
+      return ((List<Object>) bqValue)
+          .stream()
+              .filter(Objects::nonNull) // repeated field cannot contain null
+              .map(v -> scalarToProtoValue(bqSchema, fieldDescriptor, v))
+              .collect(toList());
     }
 
-    if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
-      if (jsonBQValue instanceof TableRow) {
-        TableRow tableRow = (TableRow) jsonBQValue;
-        return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow);
-      } else if (jsonBQValue instanceof AbstractMap) {
-        // This will handle nested rows.
-        AbstractMap<String, Object> map = ((AbstractMap<String, Object>) 
jsonBQValue);
-        return messageFromMap(fieldDescriptor.getMessageType(), map);
-      } else {
-        throw new RuntimeException("Unexpected value " + jsonBQValue + " 
Expected a JSON map.");
-      }
-    }
-    @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor, 
jsonBQValue);
-    if (scalarValue == null) {
-      return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated);
-    } else {
-      return scalarValue;
-    }
+    // handle scalar non nullable value
+    return scalarToProtoValue(bqSchema, fieldDescriptor, bqValue);
   }
 
   @VisibleForTesting
-  @Nullable
-  static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object 
jsonBQValue) {
-    if (jsonBQValue instanceof String) {
-      Function<String, Object> mapper = 
JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
-      if (mapper == null) {
-        throw new UnsupportedOperationException(
-            "Converting BigQuery type '"
-                + jsonBQValue.getClass()
-                + "' to '"
-                + fieldDescriptor
-                + "' is not supported");
-      }
-      return mapper.apply((String) jsonBQValue);
-    }
-
-    switch (fieldDescriptor.getType()) {
-      case BOOL:
-        if (jsonBQValue instanceof Boolean) {
-          return jsonBQValue;
+  static Object scalarToProtoValue(
+      StorageApiDynamicDestinationsTableRow.BqSchema bqSchema,
+      FieldDescriptor fieldDescriptor,
+      Object value) {
+    switch (bqSchema.getBqType()) {
+      case "INT64":
+      case "INTEGER":
+        if (value instanceof String) {
+          return Long.valueOf((String) value);
+        } else if (value instanceof Integer || value instanceof Long) {
+          return ((Number) value).longValue();
         }
         break;
-      case BYTES:
+      case "FLOAT64":
+      case "FLOAT":
+        if (value instanceof String) {
+          return Double.valueOf((String) value);
+        } else if (value instanceof Double || value instanceof Float) {
+          return ((Number) value).doubleValue();
+        }
         break;
-      case INT64:
-        if (jsonBQValue instanceof Integer) {
-          return Long.valueOf((Integer) jsonBQValue);
-        } else if (jsonBQValue instanceof Long) {
-          return jsonBQValue;
+      case "BOOLEAN":
+      case "BOOL":
+        if (value instanceof String) {
+          return Boolean.valueOf((String) value);
+        } else if (value instanceof Boolean) {
+          return value;
         }
         break;
-      case INT32:
-        if (jsonBQValue instanceof Integer) {
-          return jsonBQValue;
+      case "BYTES":
+        if (value instanceof String) {
+          return ByteString.copyFrom(BaseEncoding.base64().decode((String) 
value));
+        } else if (value instanceof byte[]) {
+          return ByteString.copyFrom((byte[]) value);
+        } else if (value instanceof ByteString) {
+          return value;
         }
         break;
-      case STRING:
+      case "TIMESTAMP":
+        if (value instanceof String) {
+          return ChronoUnit.MICROS.between(Instant.EPOCH, 
Instant.parse((String) value));
+        } else if (value instanceof Instant) {
+          return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value);
+        } else if (value instanceof org.joda.time.Instant) {
+          // joda instant precision is millisecond
+          return ((org.joda.time.Instant) value).getMillis() * 1000L;
+        } else if (value instanceof Integer || value instanceof Long) {
+          return ((Number) value).longValue();

Review comment:
       why here you assume the number is micros but below you assume it's 
seconds?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 749827)
    Remaining Estimate: 104h 40m  (was: 104h 50m)
            Time Spent: 15h 20m  (was: 15h 10m)

> BigQueryIO cannot write to DATE and TIMESTAMP columns when using Storage 
> Write API 
> -----------------------------------------------------------------------------------
>
>                 Key: BEAM-13990
>                 URL: https://issues.apache.org/jira/browse/BEAM-13990
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>    Affects Versions: 2.36.0
>            Reporter: Du Liu
>            Assignee: Du Liu
>            Priority: P2
>   Original Estimate: 120h
>          Time Spent: 15h 20m
>  Remaining Estimate: 104h 40m
>
> when using Storage Write API with BigQueryIO, DATE and TIMESTAMP values are 
> currently converted to String type in protobuf message. This is incorrect, 
> according to storage write api [documentation|#data_type_conversions],] DATE 
> should be converted to int32 and TIMESTAMP should be converted to int64.
> Here's error message: 
> INFO: Stream finished with error 
> com.google.api.gax.rpc.InvalidArgumentException: 
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched 
> with BigQuery field at D6cbe536b_4dab_4292_8fda_ff2932dded49.datevalue, the 
> proto field type string, BigQuery field type DATE Entity
> I have included an integration test here: 
> [https://github.com/liu-du/beam/commit/b56823d1d213adf6ca5564ce1d244cc4ae8f0816]
>  
> The problem is because DATE and TIMESTAMP are converted to String in protobuf 
> message here: 
> [https://github.com/apache/beam/blob/a78fec72d0d9198eef75144a7bdaf93ada5abf9b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java#L69]
>  
> Storage Write API reject the request because it's expecting int32/int64 
> values. 
>  
> I've opened a PR here: https://github.com/apache/beam/pull/16926



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to