[ https://issues.apache.org/jira/browse/BEAM-4454?focusedWorklogId=174375&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174375 ]
ASF GitHub Bot logged work on BEAM-4454: ---------------------------------------- Author: ASF GitHub Bot Created on: 12/Dec/18 01:24 Start Date: 12/Dec/18 01:24 Worklog Time Spent: 10m Work Description: reuvenlax commented on a change in pull request #7233: [BEAM-4454] Add remaining functionality for AVRO schemas URL: https://github.com/apache/beam/pull/7233#discussion_r240853895 ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java ########## @@ -184,6 +195,79 @@ public static GenericRecord toGenericRecord( return builder.build(); } + /** + * Returns a function mapping AVRO {@link GenericRecord}s to Beam {@link Row}s for use in {@link + * org.apache.beam.sdk.values.PCollection#setSchema}. + */ + public static SerializableFunction<GenericRecord, Row> getGenericRecordToRowFunction( + @Nullable Schema schema) { + return g -> toBeamRowStrict(g, schema); + } + + /** + * Returns a function mapping Beam {@link Row}s to AVRO {@link GenericRecord}s for use in {@link + * org.apache.beam.sdk.values.PCollection#setSchema}. + */ + public static SerializableFunction<Row, GenericRecord> getRowToGenericRecordFunction( + @Nullable org.apache.avro.Schema avroSchema) { + return g -> toGenericRecord(g, avroSchema); + } + + /** Infer a {@link Schema} from an AVRO-generated SpecificRecord. */ + public static <T extends SpecificRecord> Schema getSchema(Class<T> clazz) { + try { + org.apache.avro.Schema avroSchema = + (org.apache.avro.Schema) (clazz.getDeclaredField("SCHEMA$").get(null)); + return toBeamSchema(avroSchema); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException( + "Class " + + clazz + + " is not an AVRO SpecificRecord. " + + "No public SCHEMA$ field was found."); + } + } + + private static final class AvroSpecificRecordFieldNamePolicy + implements SerializableFunction<String, String> { + Schema schema; + Map<String, String> nameMapping = Maps.newHashMap(); + + AvroSpecificRecordFieldNamePolicy(Schema schema) { + this.schema = schema; + for (Field field : schema.getFields()) { + String getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName()); + nameMapping.put(getter, field.getName()); + // The Avro compiler might add a $ at the end of a getter to disambiguate. Review comment: Doesn't seem to be complete. The Avro compiler also mangles any field named "schema" (as it otherwise conflicts with the base-class getSchema method), and this doesn't appear in RESERVED_WORDS. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 174375) Time Spent: 8h 50m (was: 8h 40m) > Provide automatic schema registration for AVROs > ----------------------------------------------- > > Key: BEAM-4454 > URL: https://issues.apache.org/jira/browse/BEAM-4454 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core > Reporter: Reuven Lax > Assignee: Reuven Lax > Priority: Major > Time Spent: 8h 50m > Remaining Estimate: 0h > > Need to make sure this is a compatible change -- This message was sent by Atlassian JIRA (v7.6.3#76005)