reuvenlax commented on code in PR #22179:
URL: https://github.com/apache/beam/pull/22179#discussion_r1036580549


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -229,6 +243,8 @@ private static Object messageValueFromRowValue(
     if (value == null) {
       if (fieldDescriptor.isOptional()) {
         return null;
+      } else if (fieldDescriptor.isRepeated()) {
+        return Lists.newArrayList();

Review Comment:
   Collections.emptyList()



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -248,9 +264,20 @@ private static Object toProtoValue(
         if (arrayElementType == null) {
           throw new RuntimeException("Unexpected null element type!");
         }
-        return list.stream()
-            .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
-            .collect(Collectors.toList());
+        Boolean shouldFlatMap =
+            arrayElementType.getTypeName().isCollectionType()
+                    || arrayElementType.getTypeName().isMapType()
+                ? true
+                : false;
+
+        Stream<Object> valueStream =
+            list.stream().map(v -> toProtoValue(fieldDescriptor, 
arrayElementType, v));
+
+        if (shouldFlatMap) {
+          valueStream = valueStream.flatMap(vs -> ((List) vs).stream());
+        }

Review Comment:
   explain why flatMap is correct here?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -248,9 +264,20 @@ private static Object toProtoValue(
         if (arrayElementType == null) {
           throw new RuntimeException("Unexpected null element type!");
         }
-        return list.stream()
-            .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
-            .collect(Collectors.toList());
+        Boolean shouldFlatMap =
+            arrayElementType.getTypeName().isCollectionType()
+                    || arrayElementType.getTypeName().isMapType()
+                ? true
+                : false;

Review Comment:
   remove redundant ternary operator. Also no need for Boolean



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -261,12 +288,47 @@ private static Object toProtoValue(
             .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
             .collect(Collectors.toList());
       case MAP:
-        throw new RuntimeException("Map types not supported by BigQuery.");
+        Map<Object, Object> map = (Map<Object, Object>) value;
+        @Nullable FieldType keyType = beamFieldType.getMapKeyType();
+        @Nullable FieldType valueType = beamFieldType.getMapValueType();
+        if (keyType == null || valueType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+
+        return map.entrySet().stream()
+            .map(
+                (Map.Entry<Object, Object> entry) ->
+                    mapEntryToProtoValue(
+                        fieldDescriptor.getMessageType(), keyType, valueType, 
entry))
+            .collect(Collectors.toList());
       default:
         return scalarToProtoValue(beamFieldType, value);
     }
   }
 
+  static Object mapEntryToProtoValue(
+      Descriptor descriptor,
+      FieldType keyFieldType,
+      FieldType valueFieldType,
+      Map.Entry<Object, Object> entryValue) {
+
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    FieldDescriptor keyFieldDescriptor =
+        Preconditions.checkNotNull(descriptor.findFieldByName("key"));
+    @Nullable Object key = toProtoValue(keyFieldDescriptor, keyFieldType, 
entryValue.getKey());
+    if (key != null) {

Review Comment:
   are null keys allowed?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -261,12 +288,47 @@ private static Object toProtoValue(
             .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
             .collect(Collectors.toList());
       case MAP:
-        throw new RuntimeException("Map types not supported by BigQuery.");
+        Map<Object, Object> map = (Map<Object, Object>) value;
+        @Nullable FieldType keyType = beamFieldType.getMapKeyType();
+        @Nullable FieldType valueType = beamFieldType.getMapValueType();
+        if (keyType == null || valueType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+
+        return map.entrySet().stream()
+            .map(
+                (Map.Entry<Object, Object> entry) ->
+                    mapEntryToProtoValue(
+                        fieldDescriptor.getMessageType(), keyType, valueType, 
entry))
+            .collect(Collectors.toList());
       default:
         return scalarToProtoValue(beamFieldType, value);
     }
   }
 
+  static Object mapEntryToProtoValue(
+      Descriptor descriptor,
+      FieldType keyFieldType,
+      FieldType valueFieldType,
+      Map.Entry<Object, Object> entryValue) {
+

Review Comment:
   remove extra line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to