[ 
https://issues.apache.org/jira/browse/BEAM-4454?focusedWorklogId=171553&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171553
 ]

ASF GitHub Bot logged work on BEAM-4454:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Dec/18 12:45
            Start Date: 03/Dec/18 12:45
    Worklog Time Spent: 10m 
      Work Description: kanterov commented on a change in pull request #7181: 
[BEAM-4454] Add more AVRO utilities to convert between Beam and Avro.
URL: https://github.com/apache/beam/pull/7181#discussion_r238254112
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
 ##########
 @@ -111,28 +234,143 @@ public static Schema toSchema(@Nonnull 
org.apache.avro.Schema schema) {
       default:
         throw new AssertionError("Unexpected AVRO Schema.Type: " + 
avroSchema.getType());
     }
+    fieldType = fieldType.withNullable(type.nullable);
+    return fieldType;
   }
 
-  /**
-   * Strict conversion from AVRO to Beam, strict because it doesn't do 
widening or narrowing during
-   * conversion.
-   */
-  public static Row toRowStrict(@Nonnull GenericRecord record, @Nonnull Schema 
schema) {
-    Row.Builder builder = Row.withSchema(schema);
-    org.apache.avro.Schema avroSchema = record.getSchema();
+  private static org.apache.avro.Schema getFieldSchema(Schema.FieldType 
fieldType) {
+    org.apache.avro.Schema baseType;
+    switch (fieldType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+        baseType = org.apache.avro.Schema.create(Type.INT);
+        break;
 
-    for (Schema.Field field : schema.getFields()) {
-      Object value = record.get(field.getName());
-      org.apache.avro.Schema fieldAvroSchema = 
avroSchema.getField(field.getName()).schema();
+      case INT64:
+        baseType = org.apache.avro.Schema.create(Type.LONG);
+        break;
 
-      if (value == null) {
-        builder.addValue(null);
-      } else {
-        builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema, 
field.getType()));
-      }
+      case DECIMAL:
+        // TODO: Use a string representation.
+        throw new IllegalArgumentException("Avro does not support decimal 
types.");
+
+      case FLOAT:
+        baseType = org.apache.avro.Schema.create(Type.FLOAT);
+        break;
+
+      case DOUBLE:
+        baseType = org.apache.avro.Schema.create(Type.DOUBLE);
+        break;
+
+      case STRING:
+        baseType = org.apache.avro.Schema.create(Type.STRING);
+        break;
+
+      case DATETIME:
+        // TODO: Use a string representation.
+        throw new IllegalArgumentException("Avro does not support datetime 
types.");
+
+      case BOOLEAN:
+        baseType = org.apache.avro.Schema.create(Type.BOOLEAN);
+        break;
+
+      case BYTES:
+        baseType = org.apache.avro.Schema.create(Type.BYTES);
+        break;
+
+      case ARRAY:
+        baseType =
+            org.apache.avro.Schema.createArray(
+                getFieldSchema(fieldType.getCollectionElementType()));
+        break;
+
+      case MAP:
+        if (fieldType.getMapKeyType().getTypeName().isStringType()) {
+          // Avro only supports string keys in maps.
+          baseType = 
org.apache.avro.Schema.createMap(getFieldSchema(fieldType.getMapValueType()));
+        } else {
+          throw new IllegalArgumentException("Avro only supports maps with 
string keys");
+        }
+        break;
+
+      case ROW:
+        baseType = toAvroSchema(fieldType.getRowSchema());
+        break;
+
+      default:
+        throw new IllegalArgumentException("Unexpected type " + fieldType);
     }
+    return fieldType.getNullable() ? ReflectData.makeNullable(baseType) : 
baseType;
+  }
 
-    return builder.build();
+  private static Object genericFromField(
+      Schema.FieldType fieldType, org.apache.avro.Schema avroSchema, Object 
value) {
+    org.apache.avro.Schema expectedSchema = getFieldSchema(fieldType);
+    switch (fieldType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+      case BOOLEAN:
+        return wrapIfNullable(avroSchema, value, fieldType, expectedSchema);
+
+      case BYTES:
+        return wrapIfNullable(
+            avroSchema, ByteBuffer.wrap((byte[]) value), fieldType, 
expectedSchema);
+
+      case ARRAY:
+        List array = (List) value;
+        List<Object> translatedArray = 
Lists.newArrayListWithExpectedSize(array.size());
+        for (int i = 0; i < array.size(); i++) {
+          translatedArray.add(
+              genericFromField(
+                  fieldType.getCollectionElementType(), 
avroSchema.getElementType(), array.get(i)));
+        }
+        return wrapIfNullable(avroSchema, translatedArray, fieldType, 
expectedSchema);
+
+      case MAP:
+        ImmutableMap.Builder builder = ImmutableMap.builder();
+        Map<Object, Object> valueMap = (Map<Object, Object>) value;
+        for (Map.Entry entry : valueMap.entrySet()) {
+          builder.put(
+              entry.getKey(),
+              genericFromField(
+                  fieldType.getMapValueType(), avroSchema.getValueType(), 
entry.getValue()));
+        }
+        return wrapIfNullable(avroSchema, builder.build(), fieldType, 
expectedSchema);
+
+      case ROW:
+        return wrapIfNullable(
+            avroSchema, toGenericRecord((Row) value, avroSchema), fieldType, 
expectedSchema);
+
+      default:
+        throw new IllegalArgumentException("Unsupported type " + fieldType);
+    }
+  }
+
+  private static Object wrapIfNullable(
+      org.apache.avro.Schema avroSchema,
+      Object o,
+      FieldType fieldType,
+      org.apache.avro.Schema expectedType) {
+    TypeWithNullability typeWithNullability = new 
TypeWithNullability(avroSchema);
+    if (!fieldType.getNullable().equals(typeWithNullability.nullable)) {
+      throw new IllegalArgumentException(
+          "FieldType "
+              + fieldType
+              + " and AVRO schema "
+              + avroSchema
+              + " don't have matching nullability");
+    }
+    if (!typeWithNullability.type.equals(expectedType)) {
 
 Review comment:
   Agree, it should be more clear during the implementation of `AvroIO.Write`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 171553)
    Time Spent: 2h 50m  (was: 2h 40m)

> Provide automatic schema registration for AVROs
> -----------------------------------------------
>
>                 Key: BEAM-4454
>                 URL: https://issues.apache.org/jira/browse/BEAM-4454
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Need to make sure this is a compatible change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to