milantracy commented on a change in pull request #10487:
URL: https://github.com/apache/beam/pull/10487#discussion_r448052137
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##########
@@ -339,6 +350,35 @@ public static Schema toBeamSchema(org.apache.avro.Schema
schema) {
return toAvroSchema(beamSchema, null, null);
}
+ /** Convert a {@link GenericRecord} to an corresponding array of bytes. */
+ public static byte[] toBytes(GenericRecord record) {
+ org.apache.avro.Schema schema = record.getSchema();
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+ datumWriter.write(record, encoder);
+ encoder.flush();
+ return outputStream.toByteArray();
+ } catch (IOException exception) {
+ throw new RuntimeException("Fail to parse generic record", exception);
+ }
+ }
+
+ /** Convert an array to bytes to a {@link GenericRecord} with the target
schema. */
+ public static GenericRecord toGenericRecord(byte[] bytes,
org.apache.avro.Schema schema) {
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
+ try (InputStream inputStream = new SeekableByteArrayInput(bytes)) {
+ Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ GenericRecord record = datumReader.read(null, decoder);
+ if (record != null) {
+ return record;
+ }
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to extract the record from the
payload.", exception);
+ }
+ throw new RuntimeException("No record is extracted from the payload");
+ }
+
Review comment:
I may keep the util functions here as I don't see a way to convert
between GenericRecord and Row via AvroCoder
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]