This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 757222d [Schema] Provide a generic record interface for representing a typed message (#2452) 757222d is described below commit 757222d9d232dbe03293df890609aa85ff63556c Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Mon Aug 27 22:21:27 2018 -0700 [Schema] Provide a generic record interface for representing a typed message (#2452) * [Schema] Provide a generic record interface for representing a typed message ### Motivation In some use cases, the publishers and consumers don't know the type or schema of the messages ahead of time. For example, in pulsar io connector, when connecting a topic to a jdbc table, the connector doesn't know the tyep of the messages ahead of time; the connector can only fetch schema info from schema registry and that is the only information connector knows. It is impossible for mapping the messages to a relational database table. So we need a way to present a generic `Struct` record with fields. ### Changes Introduce `Field` and `GenericRecord` to represent `Struct` records deserialized with a schema. ### NotCovered This change only introduces the interfaces. It doesn't integrate with the producer and consumer workflow. That would be done in subsequent changes if we agree on the interfaces. --- .../org/apache/pulsar/client/api/schema/Field.java | 43 +++++++++ .../pulsar/client/api/schema/GenericRecord.java | 51 +++++++++++ .../pulsar/client/impl/schema/AvroSchema.java | 17 ++-- .../client/impl/schema/GenericAvroRecord.java | 79 ++++++++++++++++ .../client/impl/schema/GenericAvroSchema.java | 102 +++++++++++++++++++++ .../pulsar/client/schema/AvroSchemaTest.java | 47 +++++++++- 6 files changed, 329 insertions(+), 10 deletions(-) diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java new file mode 100644 index 0000000..653b5d6 --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * A field in a record, consisting of a field name, index, and + * {@link org.apache.pulsar.client.api.Schema} for the field value. + */ +@Data +@EqualsAndHashCode +@ToString +public class Field { + + /** + * The field name. + */ + private final String name; + /** + * The index of the field within the record. + */ + private final int index; + +} diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java new file mode 100644 index 0000000..0a4fce4 --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import java.util.List; + +/** + * An interface represents a message with schema. + */ +public interface GenericRecord { + + /** + * Returns the list of fields associated with the record. + * + * @return the list of fields associated with the record. + */ + List<Field> getFields(); + + /** + * Retrieve the value of the provided <tt>field</tt>. + * + * @param field the field to retrieve the value + * @return the value object + */ + Object getField(Field field); + + /** + * Retrieve the value of the provided <tt>fieldName</tt>. + * + * @param fieldName the field name + * @return the value object + */ + Object getField(String fieldName); + +} diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java index 7d90d2b..6867fdc 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -45,8 +45,9 @@ public class AvroSchema<T> implements Schema<T> { private BinaryEncoder encoder; private ByteArrayOutputStream byteArrayOutputStream; - private AvroSchema(Class<T> pojo, Map<String, String> properties) { - this.schema = ReflectData.AllowNull.get().getSchema(pojo); + private AvroSchema(org.apache.avro.Schema schema, + Map<String, String> properties) { + this.schema = schema; this.schemaInfo = new SchemaInfo(); this.schemaInfo.setName(""); @@ -61,8 +62,7 @@ public class AvroSchema<T> implements Schema<T> { } @Override - public byte[] encode(T message) { - + public synchronized byte[] encode(T message) { try { datumWriter.write(message, this.encoder); this.encoder.flush(); @@ -88,11 +88,16 @@ public class AvroSchema<T> implements Schema<T> { return this.schemaInfo; } + private static <T> org.apache.avro.Schema createAvroSchema(Class<T> pojo) { + return ReflectData.AllowNull.get().getSchema(pojo); + } + public static <T> AvroSchema<T> of(Class<T> pojo) { - return new AvroSchema<>(pojo, Collections.emptyMap()); + return new AvroSchema<>(createAvroSchema(pojo), Collections.emptyMap()); } public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) { - return new AvroSchema<>(pojo, properties); + return new AvroSchema<>(createAvroSchema(pojo), properties); } + } diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java new file mode 100644 index 0000000..fb65c7a --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import java.util.List; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.util.Utf8; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; + +/** + * A generic avro record. + */ +@Slf4j +class GenericAvroRecord implements GenericRecord { + + private final org.apache.avro.Schema schema; + private final List<Field> fields; + private final org.apache.avro.generic.GenericRecord record; + + GenericAvroRecord(org.apache.avro.Schema schema, + List<Field> fields, + org.apache.avro.generic.GenericRecord record) { + this.schema = schema; + this.fields = fields; + this.record = record; + } + + @Override + public List<Field> getFields() { + return fields; + } + + @Override + public Object getField(Field field) { + return getField(field.getName()); + } + + @Override + public Object getField(String fieldName) { + Object value = record.get(fieldName); + if (value instanceof Utf8) { + return ((Utf8) value).toString(); + } else if (value instanceof org.apache.avro.generic.GenericRecord) { + org.apache.avro.generic.GenericRecord avroRecord = + (org.apache.avro.generic.GenericRecord) value; + org.apache.avro.Schema recordSchema = avroRecord.getSchema(); + List<Field> fields = recordSchema.getFields() + .stream() + .map(f -> new Field(f.name(), f.pos())) + .collect(Collectors.toList()); + return new GenericAvroRecord(schema, fields, avroRecord); + } else { + return value; + } + } + + org.apache.avro.generic.GenericRecord getAvroRecord() { + return record; + } + +} diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java new file mode 100644 index 0000000..4ccfe55 --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.SchemaInfo; + +/** + * A generic avro schema. + */ +public class GenericAvroSchema implements Schema<GenericRecord> { + + private final org.apache.avro.Schema schema; + private final List<Field> fields; + private final SchemaInfo schemaInfo; + private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter; + private BinaryEncoder encoder; + private final ByteArrayOutputStream byteArrayOutputStream; + private final GenericDatumReader<org.apache.avro.generic.GenericRecord> datumReader; + + public GenericAvroSchema(SchemaInfo schemaInfo) { + this.schemaInfo = schemaInfo; + this.schema = new org.apache.avro.Schema.Parser().parse( + new String(schemaInfo.getSchema(), UTF_8) + ); + this.fields = schema.getFields() + .stream() + .map(f -> new Field(f.name(), f.pos())) + .collect(Collectors.toList()); + this.byteArrayOutputStream = new ByteArrayOutputStream(); + this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder); + this.datumWriter = new GenericDatumWriter(schema); + this.datumReader = new GenericDatumReader(schema); + } + + public org.apache.avro.Schema getAvroSchema() { + return schema; + } + + @Override + public synchronized byte[] encode(GenericRecord message) { + checkArgument(message instanceof GenericAvroRecord); + GenericAvroRecord gar = (GenericAvroRecord) message; + try { + datumWriter.write(gar.getAvroRecord(), this.encoder); + this.encoder.flush(); + return this.byteArrayOutputStream.toByteArray(); + } catch (Exception e) { + throw new SchemaSerializationException(e); + } finally { + this.byteArrayOutputStream.reset(); + } + } + + @Override + public GenericRecord decode(byte[] bytes) { + try { + org.apache.avro.generic.GenericRecord avroRecord = datumReader.read( + null, + DecoderFactory.get().binaryDecoder(bytes, null)); + return new GenericAvroRecord(schema, fields, avroRecord); + } catch (IOException e) { + throw new SchemaSerializationException(e); + } + } + + @Override + public SchemaInfo getSchemaInfo() { + return schemaInfo; + } +} diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java index 84c060e..cfe9cb7 100644 --- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java @@ -18,12 +18,17 @@ */ package org.apache.pulsar.client.schema; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaType; import org.testng.Assert; import org.testng.annotations.Test; @@ -65,10 +70,10 @@ public class AvroSchemaTest { @Test public void testSchema() { AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class); - Assert.assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO); + assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO); Schema.Parser parser = new Schema.Parser(); String schemaJson = new String(avroSchema.getSchemaInfo().getSchema()); - Assert.assertEquals(schemaJson, SCHEMA_JSON); + assertEquals(schemaJson, SCHEMA_JSON); Schema schema = parser.parse(schemaJson); for (String fieldName : FOO_FIELDS) { @@ -103,7 +108,41 @@ public class AvroSchemaTest { Foo object1 = avroSchema.decode(bytes1); Foo object2 = avroSchema.decode(bytes2); - Assert.assertEquals(object1, foo1); - Assert.assertEquals(object2, foo2); + assertEquals(object1, foo1); + assertEquals(object2, foo2); + } + + @Test + public void testEncodeAndDecodeGenericRecord() { + AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null); + + GenericAvroSchema genericAvroSchema = new GenericAvroSchema(avroSchema.getSchemaInfo()); + + log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema()); + + int numRecords = 10; + for (int i = 0; i < numRecords; i++) { + Foo foo = new Foo(); + foo.setField1("field-1-" + i); + foo.setField2("field-2-" + i); + foo.setField3(i); + Bar bar = new Bar(); + bar.setField1(i % 2 == 0); + foo.setField4(bar); + + byte[] data = avroSchema.encode(foo); + + GenericRecord record = genericAvroSchema.decode(data); + Object field1 = record.getField("field1"); + assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass()); + Object field2 = record.getField("field2"); + assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass()); + Object field3 = record.getField("field3"); + assertEquals(i, field3, "Field 3 is " + field3.getClass()); + Object field4 = record.getField("field4"); + assertTrue(field4 instanceof GenericRecord); + GenericRecord field4Record = (GenericRecord) field4; + assertEquals(i % 2 == 0, field4Record.getField("field1")); + } } }