[ 
https://issues.apache.org/jira/browse/BEAM-4454?focusedWorklogId=174383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174383
 ]

ASF GitHub Bot logged work on BEAM-4454:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Dec/18 02:04
            Start Date: 12/Dec/18 02:04
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on a change in pull request #7233:  
[BEAM-4454] Add remaining functionality for AVRO schemas
URL: https://github.com/apache/beam/pull/7233#discussion_r240860252
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
 ##########
 @@ -184,6 +195,79 @@ public static GenericRecord toGenericRecord(
     return builder.build();
   }
 
+  /**
+   * Returns a function mapping AVRO {@link GenericRecord}s to Beam {@link 
Row}s for use in {@link
+   * org.apache.beam.sdk.values.PCollection#setSchema}.
+   */
+  public static SerializableFunction<GenericRecord, Row> 
getGenericRecordToRowFunction(
+      @Nullable Schema schema) {
+    return g -> toBeamRowStrict(g, schema);
+  }
+
+  /**
+   * Returns a function mapping Beam {@link Row}s to AVRO {@link 
GenericRecord}s for use in {@link
+   * org.apache.beam.sdk.values.PCollection#setSchema}.
+   */
+  public static SerializableFunction<Row, GenericRecord> 
getRowToGenericRecordFunction(
+      @Nullable org.apache.avro.Schema avroSchema) {
+    return g -> toGenericRecord(g, avroSchema);
+  }
+
+  /** Infer a {@link Schema} from an AVRO-generated SpecificRecord. */
+  public static <T extends SpecificRecord> Schema getSchema(Class<T> clazz) {
+    try {
+      org.apache.avro.Schema avroSchema =
+          (org.apache.avro.Schema) 
(clazz.getDeclaredField("SCHEMA$").get(null));
+      return toBeamSchema(avroSchema);
+    } catch (NoSuchFieldException | IllegalAccessException e) {
+      throw new IllegalArgumentException(
+          "Class "
+              + clazz
+              + " is not an AVRO SpecificRecord. "
+              + "No public SCHEMA$ field was found.");
+    }
+  }
+
+  private static final class AvroSpecificRecordFieldNamePolicy
+      implements SerializableFunction<String, String> {
+    Schema schema;
+    Map<String, String> nameMapping = Maps.newHashMap();
+
+    AvroSpecificRecordFieldNamePolicy(Schema schema) {
+      this.schema = schema;
+      for (Field field : schema.getFields()) {
+        String getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, 
field.getName());
+        nameMapping.put(getter, field.getName());
+        // The Avro compiler might add a $ at the end of a getter to 
disambiguate.
+        nameMapping.put(getter + "$", field.getName());
+      }
+    }
+
+    @Override
+    public String apply(String input) {
+      return nameMapping.getOrDefault(input, input);
 
 Review comment:
   No - could be any type of map here. Throwing an exception is incorrect, as 
there are some getters that are not in the map (i.e. getSchema()).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 174383)
    Time Spent: 9h 40m  (was: 9.5h)

> Provide automatic schema registration for AVROs
> -----------------------------------------------
>
>                 Key: BEAM-4454
>                 URL: https://issues.apache.org/jira/browse/BEAM-4454
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> Need to make sure this is a compatible change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to