sijie closed pull request #2497: [schema] implement generic schema/record for 
Schema.JSON
URL: https://github.com/apache/incubator-pulsar/pull/2497
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
index 0a4fce43cb..46a49a1c39 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
@@ -38,7 +38,9 @@
      * @param field the field to retrieve the value
      * @return the value object
      */
-    Object getField(Field field);
+    default Object getField(Field field) {
+        return getField(field.getName());
+    }
 
     /**
      * Retrieve the value of the provided <tt>fieldName</tt>.
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java
similarity index 94%
rename from 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
rename to 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java
index fb65c7aced..c9dbeb747b 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.generic;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -48,11 +48,6 @@
         return fields;
     }
 
-    @Override
-    public Object getField(Field field) {
-        return getField(field.getName());
-    }
-
     @Override
     public Object getField(String fieldName) {
         Object value = record.get(fieldName);
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
similarity index 74%
rename from 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
rename to 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 4ccfe55df0..5fe44596ba 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -16,58 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.generic;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
  * A generic avro schema.
  */
-public class GenericAvroSchema implements Schema<GenericRecord> {
+class GenericAvroSchema extends GenericSchema {
 
-    private final org.apache.avro.Schema schema;
-    private final List<Field> fields;
-    private final SchemaInfo schemaInfo;
     private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> 
datumWriter;
     private BinaryEncoder encoder;
     private final ByteArrayOutputStream byteArrayOutputStream;
     private final GenericDatumReader<org.apache.avro.generic.GenericRecord> 
datumReader;
 
     public GenericAvroSchema(SchemaInfo schemaInfo) {
-        this.schemaInfo = schemaInfo;
-        this.schema = new org.apache.avro.Schema.Parser().parse(
-            new String(schemaInfo.getSchema(), UTF_8)
-        );
-        this.fields = schema.getFields()
-            .stream()
-            .map(f -> new Field(f.name(), f.pos()))
-            .collect(Collectors.toList());
+        super(schemaInfo);
         this.byteArrayOutputStream = new ByteArrayOutputStream();
         this.encoder = 
EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
         this.datumWriter = new GenericDatumWriter(schema);
         this.datumReader = new GenericDatumReader(schema);
     }
 
-    public org.apache.avro.Schema getAvroSchema() {
-        return schema;
-    }
-
     @Override
     public synchronized byte[] encode(GenericRecord message) {
         checkArgument(message instanceof GenericAvroRecord);
@@ -86,17 +68,14 @@ public GenericAvroSchema(SchemaInfo schemaInfo) {
     @Override
     public GenericRecord decode(byte[] bytes) {
         try {
+            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
             org.apache.avro.generic.GenericRecord avroRecord = 
datumReader.read(
                 null,
-                DecoderFactory.get().binaryDecoder(bytes, null));
+                decoder);
             return new GenericAvroRecord(schema, fields, avroRecord);
         } catch (IOException e) {
             throw new SchemaSerializationException(e);
         }
     }
 
-    @Override
-    public SchemaInfo getSchemaInfo() {
-        return schemaInfo;
-    }
 }
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
new file mode 100644
index 0000000000..7d13ebc635
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+
+/**
+ * Generic json record.
+ */
+class GenericJsonRecord implements GenericRecord {
+
+    private final List<Field> fields;
+    private final JsonNode jn;
+
+    GenericJsonRecord(List<Field> fields,
+                      JsonNode jn) {
+        this.fields = fields;
+        this.jn = jn;
+    }
+
+    JsonNode getJsonNode() {
+        return jn;
+    }
+
+    @Override
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    @Override
+    public Object getField(String fieldName) {
+        JsonNode fn = jn.get(fieldName);
+        if (fn.isContainerNode()) {
+            AtomicInteger idx = new AtomicInteger(0);
+            List<Field> fields = Lists.newArrayList(fn.fieldNames())
+                .stream()
+                .map(f -> new Field(f, idx.getAndIncrement()))
+                .collect(Collectors.toList());
+            return new GenericJsonRecord(fields, fn);
+        } else if (fn.isBoolean()) {
+            return fn.asBoolean();
+        } else if (fn.isInt()) {
+            return fn.asInt();
+        } else if (fn.isFloatingPointNumber()) {
+            return fn.asDouble();
+        } else if (fn.isDouble()) {
+            return fn.asDouble();
+        } else {
+            return fn.asText();
+        }
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
new file mode 100644
index 0000000000..d0f2b54233
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * A generic json schema.
+ */
+class GenericJsonSchema extends GenericSchema {
+
+    private final ObjectMapper objectMapper;
+
+    public GenericJsonSchema(SchemaInfo schemaInfo) {
+        super(schemaInfo);
+        this.objectMapper = new ObjectMapper();
+    }
+
+    @Override
+    public byte[] encode(GenericRecord message) {
+        checkArgument(message instanceof GenericAvroRecord);
+        GenericJsonRecord gjr = (GenericJsonRecord) message;
+        try {
+            return 
objectMapper.writeValueAsBytes(gjr.getJsonNode().toString());
+        } catch (IOException ioe) {
+            throw new RuntimeException(new SchemaSerializationException(ioe));
+        }
+    }
+
+    @Override
+    public GenericRecord decode(byte[] bytes) {
+        try {
+            JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
+            return new GenericJsonRecord(fields, jn);
+        } catch (IOException ioe) {
+            throw new RuntimeException(new SchemaSerializationException(ioe));
+        }
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java
new file mode 100644
index 0000000000..76ff3fef11
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * A generic schema representation.
+ */
+public abstract class GenericSchema implements Schema<GenericRecord> {
+
+    protected final org.apache.avro.Schema schema;
+    protected final List<Field> fields;
+    protected final SchemaInfo schemaInfo;
+
+    protected GenericSchema(SchemaInfo schemaInfo) {
+        this.schemaInfo = schemaInfo;
+        this.schema = new org.apache.avro.Schema.Parser().parse(
+            new String(schemaInfo.getSchema(), UTF_8)
+        );
+        this.fields = schema.getFields()
+            .stream()
+            .map(f -> new Field(f.name(), f.pos()))
+            .collect(Collectors.toList());
+    }
+
+    public org.apache.avro.Schema getAvroSchema() {
+        return schema;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return schemaInfo;
+    }
+
+    /**
+     * Create a generic schema out of a <tt>SchemaInfo</tt>.
+     *
+     * @param schemaInfo schema info
+     * @return a generic schema instance
+     */
+    public static GenericSchema of(SchemaInfo schemaInfo) {
+        switch (schemaInfo.getType()) {
+            case AVRO:
+                return new GenericAvroSchema(schemaInfo);
+            case JSON:
+                return new GenericJsonSchema(schemaInfo);
+            default:
+                throw new UnsupportedOperationException("Generic schema is not 
supported on schema type '"
+                    + schemaInfo.getType() + "'");
+        }
+    }
+
+}
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
new file mode 100644
index 0000000000..f07c928870
--- /dev/null
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
+import org.testng.annotations.Test;
+
+/**
+ * Unit testing generic schemas.
+ */
+@Slf4j
+public class GenericSchemaTest {
+
+    @Test
+    public void testGenericAvroSchema() {
+        Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+        GenericSchema decodeSchema = 
GenericSchema.of(encodeSchema.getSchemaInfo());
+        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+    }
+
+    @Test
+    public void testGenericJsonSchema() {
+        Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
+        GenericSchema decodeSchema = 
GenericSchema.of(encodeSchema.getSchemaInfo());
+        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+    }
+
+    @Test
+    public void testAutoAvroSchema() {
+        Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+        AutoSchema decodeSchema = new AutoSchema();
+        decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
+        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+    }
+
+    @Test
+    public void testAutoJsonSchema() {
+        Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
+        AutoSchema decodeSchema = new AutoSchema();
+        decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
+        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+    }
+
+    public void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
+                                                 Schema<GenericRecord> 
decodeSchema) {
+        int numRecords = 10;
+        for (int i = 0; i < numRecords; i++) {
+            Foo foo = new Foo();
+            foo.setField1("field-1-" + i);
+            foo.setField2("field-2-" + i);
+            foo.setField3(i);
+            Bar bar = new Bar();
+            bar.setField1(i % 2 == 0);
+            foo.setField4(bar);
+
+            byte[] data = encodeSchema.encode(foo);
+
+            log.info("Decoding : {}", new String(data, UTF_8));
+
+            GenericRecord record = decodeSchema.decode(data);
+            Object field1 = record.getField("field1");
+            assertEquals("field-1-" + i, field1, "Field 1 is " + 
field1.getClass());
+            Object field2 = record.getField("field2");
+            assertEquals("field-2-" + i, field2, "Field 2 is " + 
field2.getClass());
+            Object field3 = record.getField("field3");
+            assertEquals(i, field3, "Field 3 is " + field3.getClass());
+            Object field4 = record.getField("field4");
+            assertTrue(field4 instanceof GenericRecord);
+            GenericRecord field4Record = (GenericRecord) field4;
+            assertEquals(i % 2 == 0, field4Record.getField("field1"));
+        }
+    }
+
+}
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
index e5fcc3c4f6..58d159347a 100644
--- 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
@@ -18,18 +18,15 @@
  */
 package org.apache.pulsar.client.schema;
 
+import static org.apache.pulsar.client.schema.SchemaTestUtils.FOO_FIELDS;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.SCHEMA_JSON;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -37,37 +34,6 @@
 @Slf4j
 public class AvroSchemaTest {
 
-    @Data
-    @ToString
-    @EqualsAndHashCode
-    private static class Foo {
-        private String field1;
-        private String field2;
-        private int field3;
-        private Bar field4;
-    }
-
-    @Data
-    @ToString
-    @EqualsAndHashCode
-    private static class Bar {
-        private boolean field1;
-    }
-
-    private static final String SCHEMA_JSON = 
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
-            ".pulsar.client" +
-            
".schema.AvroSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],"
 +
-            
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},"
 +
-            
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\","
 +
-            
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
-
-    private static String[] FOO_FIELDS = {
-            "field1",
-            "field2",
-            "field3",
-            "field4"
-    };
-
     @Test
     public void testSchema() {
         AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class);
@@ -113,55 +79,4 @@ public void testEncodeAndDecode() {
         assertEquals(object2, foo2);
     }
 
-    @Test
-    public void testEncodeAndDecodeGenericRecord() {
-        AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
-        GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(avroSchema.getSchemaInfo());
-
-        log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
-
-        testGenericSchema(avroSchema, genericAvroSchema);
-    }
-
-    @Test
-    public void testAutoSchema() {
-        AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
-
-        GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(avroSchema.getSchemaInfo());
-
-        log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
-
-        AutoSchema schema = new AutoSchema();
-        schema.setSchema(genericAvroSchema);
-
-        testGenericSchema(avroSchema, schema);
-    }
-
-    private void testGenericSchema(AvroSchema<Foo> avroSchema,
-                                   
org.apache.pulsar.client.api.Schema<GenericRecord> genericRecordSchema) {
-        int numRecords = 10;
-        for (int i = 0; i < numRecords; i++) {
-            Foo foo = new Foo();
-            foo.setField1("field-1-" + i);
-            foo.setField2("field-2-" + i);
-            foo.setField3(i);
-            Bar bar = new Bar();
-            bar.setField1(i % 2 == 0);
-            foo.setField4(bar);
-
-            byte[] data = avroSchema.encode(foo);
-
-            GenericRecord record = genericRecordSchema.decode(data);
-            Object field1 = record.getField("field1");
-            assertEquals("field-1-" + i, field1, "Field 1 is " + 
field1.getClass());
-            Object field2 = record.getField("field2");
-            assertEquals("field-2-" + i, field2, "Field 2 is " + 
field2.getClass());
-            Object field3 = record.getField("field3");
-            assertEquals(i, field3, "Field 3 is " + field3.getClass());
-            Object field4 = record.getField("field4");
-            assertTrue(field4 instanceof GenericRecord);
-            GenericRecord field4Record = (GenericRecord) field4;
-            assertEquals(i % 2 == 0, field4Record.getField("field1"));
-        }
-    }
 }
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
index 2add60f27a..7a677f4e69 100644
--- 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
@@ -18,46 +18,18 @@
  */
 package org.apache.pulsar.client.schema;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.FOO_FIELDS;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.SCHEMA_JSON;
+
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class JSONSchemaTest {
-    @Data
-    @ToString
-    @EqualsAndHashCode
-    private static class Foo {
-        private String field1;
-        private String field2;
-        private int field3;
-        private Bar field4;
-    }
-
-    @Data
-    @ToString
-    @EqualsAndHashCode
-    private static class Bar {
-        private boolean field1;
-    }
-
-    private static final String SCHEMA_JSON = 
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
-            ".pulsar.client" +
-            
".schema.JSONSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],"
 +
-            
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},"
 +
-            
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\","
 +
-            
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
-
-    private static String[] FOO_FIELDS = {
-            "field1",
-            "field2",
-            "field3",
-            "field4"
-    };
 
     @Test
     public void testSchema() {
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
new file mode 100644
index 0000000000..52d80eb9a6
--- /dev/null
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Utils for testing avro.
+ */
+public class SchemaTestUtils {
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        private String field1;
+        private String field2;
+        private int field3;
+        private Bar field4;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Bar {
+        private boolean field1;
+    }
+
+    public static final String SCHEMA_JSON = 
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
+            ".pulsar.client" +
+            
".schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],"
 +
+            
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},"
 +
+            
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\","
 +
+            
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
+
+    public static String[] FOO_FIELDS = {
+            "field1",
+            "field2",
+            "field3",
+            "field4"
+    };
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e82cf6c2f2..4013068246 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -63,7 +63,7 @@
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.impl.schema.AutoSchema;
-import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -391,10 +391,10 @@ public ClientConfigurationData getConfiguration() {
             return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
                     .thenCompose(schemaInfoOptional -> {
                         if (schemaInfoOptional.isPresent() && 
schemaInfoOptional.get().getType() == SchemaType.AVRO) {
-                            GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(schemaInfoOptional.get());
+                            GenericSchema genericSchema = 
GenericSchema.of(schemaInfoOptional.get());
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getSingleTopic(), new 
String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoSchema.setSchema(genericAvroSchema);
+                            autoSchema.setSchema(genericSchema);
                             return doSingleTopicSubscribeAsync(conf, schema);
                         } else {
                             return FutureUtil.failedFuture(
@@ -542,10 +542,10 @@ public ClientConfigurationData getConfiguration() {
             return lookup.getSchema(TopicName.get(conf.getTopicName()))
                     .thenCompose(schemaInfoOptional -> {
                         if (schemaInfoOptional.isPresent() && 
schemaInfoOptional.get().getType() == SchemaType.AVRO) {
-                            GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(schemaInfoOptional.get());
+                            GenericSchema genericSchema = 
GenericSchema.of(schemaInfoOptional.get());
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getTopicName(), new 
String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoSchema.setSchema(genericAvroSchema);
+                            autoSchema.setSchema(genericSchema);
                             return doCreateReaderAsync(conf, schema);
                         } else {
                             return FutureUtil.failedFuture(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to