This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3a6b009bcc95a69e99c799e72796c8f17f853365
Author: congbo <[email protected]>
AuthorDate: Mon Nov 22 16:19:28 2021 +0800

    [Schema] Fix pulsar use json or avro primitive schema. (#12886)
    
    now, when we use Schema.AUTO_CONSUME will return GenericRecord, but in json 
or avro primitive schema, it will not decode data to GenericRecord. So, when 
use Schema.AUTO_CONSUME and the data schema is json or avro primitive schema, 
we should return GenericObjectWrapper.
    
    note:
     1. we can't change the GenericObjectWrapper.get to pulsar primitive 
schema, because their decode and encode are different.
    
    when Schema.AUTO_CONSUME receive json or avro primitive schema, return the 
original schema and getValue return
    
    (cherry picked from commit f763f27c434dbf9c91e9bb48e34079b9c80d2bb0)
---
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 48 ++++++++++++++++++++++
 .../client/impl/schema/AutoConsumeSchema.java      | 29 ++++++++++++-
 .../pulsar/client/impl/schema/SchemaUtils.java     | 21 ++++++----
 3 files changed, 88 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 38373cc..f13b266 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -301,6 +301,54 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testSendAvroAndJsonPrimitiveSchema() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicOne = "test-multi-version-schema-one";
+        final String fqtnOne = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicOne
+        ).toString();
+
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME)
+        );
+
+        admin.namespaces().setSchemaCompatibilityStrategy(tenant + "/" + 
namespace,
+                SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+                .topic(fqtnOne)
+                .create();
+
+        final Consumer<GenericRecord> consumer = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(fqtnOne)
+                .subscriptionName("sub")
+                .subscribe();
+
+        int producerAvroIntegerValue = 1;
+        byte[] producerAvroBytesValue = "testProducerAvroBytes".getBytes();
+        producer.newMessage(Schema.AVRO(Integer.class)).value(1).send();
+        
producer.newMessage(Schema.AVRO(byte[].class)).value(producerAvroBytesValue).send();
+
+        int producerJsonIntegerValue = 2;
+        byte[] producerJsonBytesValue = "testProducerJsonBytes".getBytes();
+        
producer.newMessage(Schema.JSON(Integer.class)).value(producerJsonIntegerValue).send();
+        
producer.newMessage(Schema.JSON(byte[].class)).value(producerJsonBytesValue).send();
+
+        // AVRO schema with primitive class can consume
+        assertEquals(consumer.receive().getValue().getNativeObject(), 
producerAvroIntegerValue);
+        assertArrayEquals((byte[]) 
consumer.receive().getValue().getNativeObject(), producerAvroBytesValue);
+
+        // JSON schema with primitive class can consume
+        assertEquals(consumer.receive().getValue().getNativeObject(), 
producerJsonIntegerValue);
+        assertArrayEquals((byte[])  
consumer.receive().getValue().getNativeObject(), producerJsonBytesValue);
+}
+
+    @Test
     public void testJSONSchemaDeserialize() throws Exception {
         final String tenant = PUBLIC_TENANT;
         final String namespace = "test-namespace-" + randomName(16);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 8c63133..d82085f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -18,20 +18,24 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.avro.Schema.Type.RECORD;
 import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import 
org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.client.impl.schema.util.SchemaUtil;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
-
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
@@ -172,7 +176,7 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
         switch (schemaInfo.getType()) {
             case JSON:
             case AVRO:
-                return 
GenericSchemaImpl.of(schemaInfo,useProvidedSchemaAsReaderSchema);
+                return extractFromAvroSchema(schemaInfo, 
useProvidedSchemaAsReaderSchema);
             case PROTOBUF_NATIVE:
                 return GenericProtobufNativeSchema.of(schemaInfo, 
useProvidedSchemaAsReaderSchema);
             default:
@@ -180,6 +184,27 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
         }
     }
 
+    private static Schema<?> extractFromAvroSchema(SchemaInfo schemaInfo, 
final boolean useProvidedSchemaAsReaderSchema) {
+        org.apache.avro.Schema avroSchema = SchemaUtil.parseAvroSchema(new 
String(schemaInfo.getSchema(), UTF_8));
+        // if avroSchema type is RECORD we can use GenericSchema, otherwise 
use its own schema and decode return
+        // `GenericObjectWrapper`
+        if (avroSchema.getType() == RECORD) {
+            return GenericSchemaImpl.of(schemaInfo, 
useProvidedSchemaAsReaderSchema);
+        } else {
+            // because of we use json primitive schema or avro primitive 
schema generated data
+            // different from the data generated using the primitive schema of 
pulsar itself.
+            // so we should use the original schema of this data
+            if (schemaInfo.getType() == SchemaType.JSON) {
+                // It should be generated and used POJO, otherwise json cannot 
be parsed correctly
+                return Schema.JSON(SchemaDefinition.builder()
+                        
.withPojo(ReflectData.get().getClass(avroSchema)).build());
+            } else {
+                return Schema.AVRO(SchemaDefinition.builder()
+                        .withJsonDef(new String(schemaInfo.getSchema(), 
UTF_8)).build());
+            }
+        }
+    }
+
     public static Schema<?> getSchema(SchemaInfo schemaInfo) {
         switch (schemaInfo.getType()) {
             case INT8:
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
index 23ecb4e..7c7281c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
@@ -284,15 +284,15 @@ public final class SchemaUtils {
                 case AVRO:
                 case JSON:
                 case PROTOBUF:
-                    return toJsonObject(schemaInfo.getSchemaDefinition());
+                    return toJsonElement(schemaInfo.getSchemaDefinition());
                 case KEY_VALUE:
                     KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
                         
DefaultImplementation.decodeKeyValueSchemaInfo(schemaInfo);
                     JsonObject obj = new JsonObject();
                     String keyJson = 
jsonifySchemaInfo(schemaInfoKeyValue.getKey());
                     String valueJson = 
jsonifySchemaInfo(schemaInfoKeyValue.getValue());
-                    obj.add("key", toJsonObject(keyJson));
-                    obj.add("value", toJsonObject(valueJson));
+                    obj.add("key", toJsonElement(keyJson));
+                    obj.add("value", toJsonElement(valueJson));
                     return obj;
                 default:
                     return new JsonPrimitive(schemaDef);
@@ -300,9 +300,13 @@ public final class SchemaUtils {
         }
     }
 
-    public static JsonObject toJsonObject(String json) {
-        JsonParser parser = new JsonParser();
-        return parser.parse(json).getAsJsonObject();
+    public static JsonElement toJsonElement(String str) {
+        try {
+            return JsonParser.parseString(str).getAsJsonObject();
+        } catch (IllegalStateException e) {
+            // because str may not a json, so we should use JsonPrimitive
+            return new JsonPrimitive(str);
+        }
     }
 
     private static class SchemaInfoToStringAdapter implements 
JsonSerializer<SchemaInfo> {
@@ -311,7 +315,8 @@ public final class SchemaUtils {
         public JsonElement serialize(SchemaInfo schemaInfo,
                                      Type type,
                                      JsonSerializationContext 
jsonSerializationContext) {
-            return toJsonObject(jsonifySchemaInfo(schemaInfo));
+            // schema will not a json, so use toJsonElement
+            return toJsonElement(jsonifySchemaInfo(schemaInfo));
         }
     }
 
@@ -356,7 +361,7 @@ public final class SchemaUtils {
      * @return the key/value schema info data bytes
      */
     public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] 
keyValueSchemaInfoDataJsonBytes) throws IOException {
-        JsonObject jsonObject = toJsonObject(new 
String(keyValueSchemaInfoDataJsonBytes, UTF_8));
+        JsonObject jsonObject = (JsonObject) toJsonElement(new 
String(keyValueSchemaInfoDataJsonBytes, UTF_8));
         byte[] keyBytes = getKeyOrValueSchemaBytes(jsonObject.get("key"));
         byte[] valueBytes = getKeyOrValueSchemaBytes(jsonObject.get("value"));
         int dataLength = 4 + keyBytes.length + 4 + valueBytes.length;

Reply via email to