This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3a6b009bcc95a69e99c799e72796c8f17f853365 Author: congbo <[email protected]> AuthorDate: Mon Nov 22 16:19:28 2021 +0800 [Schema] Fix pulsar use json or avro primitive schema. (#12886) now, when we use Schema.AUTO_CONSUME will return GenericRecord, but in json or avro primitive schema, it will not decode data to GenericRecord. So, when use Schema.AUTO_CONSUME and the data schema is json or avro primitive schema, we should return GenericObjectWrapper. note: 1. we can't change the GenericObjectWrapper.get to pulsar primitive schema, because their decode and encode are different. when Schema.AUTO_CONSUME receive json or avro primitive schema, return the original schema and getValue return (cherry picked from commit f763f27c434dbf9c91e9bb48e34079b9c80d2bb0) --- .../java/org/apache/pulsar/schema/SchemaTest.java | 48 ++++++++++++++++++++++ .../client/impl/schema/AutoConsumeSchema.java | 29 ++++++++++++- .../pulsar/client/impl/schema/SchemaUtils.java | 21 ++++++---- 3 files changed, 88 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 38373cc..f13b266 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -301,6 +301,54 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } @Test + public void testSendAvroAndJsonPrimitiveSchema() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicOne = "test-multi-version-schema-one"; + final String fqtnOne = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topicOne + ).toString(); + + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME) + ); + + admin.namespaces().setSchemaCompatibilityStrategy(tenant + "/" + namespace, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) + .topic(fqtnOne) + .create(); + + final Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(fqtnOne) + .subscriptionName("sub") + .subscribe(); + + int producerAvroIntegerValue = 1; + byte[] producerAvroBytesValue = "testProducerAvroBytes".getBytes(); + producer.newMessage(Schema.AVRO(Integer.class)).value(1).send(); + producer.newMessage(Schema.AVRO(byte[].class)).value(producerAvroBytesValue).send(); + + int producerJsonIntegerValue = 2; + byte[] producerJsonBytesValue = "testProducerJsonBytes".getBytes(); + producer.newMessage(Schema.JSON(Integer.class)).value(producerJsonIntegerValue).send(); + producer.newMessage(Schema.JSON(byte[].class)).value(producerJsonBytesValue).send(); + + // AVRO schema with primitive class can consume + assertEquals(consumer.receive().getValue().getNativeObject(), producerAvroIntegerValue); + assertArrayEquals((byte[]) consumer.receive().getValue().getNativeObject(), producerAvroBytesValue); + + // JSON schema with primitive class can consume + assertEquals(consumer.receive().getValue().getNativeObject(), producerJsonIntegerValue); + assertArrayEquals((byte[]) consumer.receive().getValue().getNativeObject(), producerJsonBytesValue); +} + + @Test public void testJSONSchemaDeserialize() throws Exception { final String tenant = PUBLIC_TENANT; final String namespace = "test-namespace-" + randomName(16); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 8c63133..d82085f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -18,20 +18,24 @@ */ package org.apache.pulsar.client.impl.schema; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.avro.Schema.Type.RECORD; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import org.apache.avro.reflect.ReflectData; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema; import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; +import org.apache.pulsar.client.impl.schema.util.SchemaUtil; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; - import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentMap; @@ -172,7 +176,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> { switch (schemaInfo.getType()) { case JSON: case AVRO: - return GenericSchemaImpl.of(schemaInfo,useProvidedSchemaAsReaderSchema); + return extractFromAvroSchema(schemaInfo, useProvidedSchemaAsReaderSchema); case PROTOBUF_NATIVE: return GenericProtobufNativeSchema.of(schemaInfo, useProvidedSchemaAsReaderSchema); default: @@ -180,6 +184,27 @@ public class AutoConsumeSchema implements Schema<GenericRecord> { } } + private static Schema<?> extractFromAvroSchema(SchemaInfo schemaInfo, final boolean useProvidedSchemaAsReaderSchema) { + org.apache.avro.Schema avroSchema = SchemaUtil.parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8)); + // if avroSchema type is RECORD we can use GenericSchema, otherwise use its own schema and decode return + // `GenericObjectWrapper` + if (avroSchema.getType() == RECORD) { + return GenericSchemaImpl.of(schemaInfo, useProvidedSchemaAsReaderSchema); + } else { + // because of we use json primitive schema or avro primitive schema generated data + // different from the data generated using the primitive schema of pulsar itself. + // so we should use the original schema of this data + if (schemaInfo.getType() == SchemaType.JSON) { + // It should be generated and used POJO, otherwise json cannot be parsed correctly + return Schema.JSON(SchemaDefinition.builder() + .withPojo(ReflectData.get().getClass(avroSchema)).build()); + } else { + return Schema.AVRO(SchemaDefinition.builder() + .withJsonDef(new String(schemaInfo.getSchema(), UTF_8)).build()); + } + } + } + public static Schema<?> getSchema(SchemaInfo schemaInfo) { switch (schemaInfo.getType()) { case INT8: diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java index 23ecb4e..7c7281c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java @@ -284,15 +284,15 @@ public final class SchemaUtils { case AVRO: case JSON: case PROTOBUF: - return toJsonObject(schemaInfo.getSchemaDefinition()); + return toJsonElement(schemaInfo.getSchemaDefinition()); case KEY_VALUE: KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue = DefaultImplementation.decodeKeyValueSchemaInfo(schemaInfo); JsonObject obj = new JsonObject(); String keyJson = jsonifySchemaInfo(schemaInfoKeyValue.getKey()); String valueJson = jsonifySchemaInfo(schemaInfoKeyValue.getValue()); - obj.add("key", toJsonObject(keyJson)); - obj.add("value", toJsonObject(valueJson)); + obj.add("key", toJsonElement(keyJson)); + obj.add("value", toJsonElement(valueJson)); return obj; default: return new JsonPrimitive(schemaDef); @@ -300,9 +300,13 @@ public final class SchemaUtils { } } - public static JsonObject toJsonObject(String json) { - JsonParser parser = new JsonParser(); - return parser.parse(json).getAsJsonObject(); + public static JsonElement toJsonElement(String str) { + try { + return JsonParser.parseString(str).getAsJsonObject(); + } catch (IllegalStateException e) { + // because str may not a json, so we should use JsonPrimitive + return new JsonPrimitive(str); + } } private static class SchemaInfoToStringAdapter implements JsonSerializer<SchemaInfo> { @@ -311,7 +315,8 @@ public final class SchemaUtils { public JsonElement serialize(SchemaInfo schemaInfo, Type type, JsonSerializationContext jsonSerializationContext) { - return toJsonObject(jsonifySchemaInfo(schemaInfo)); + // schema will not a json, so use toJsonElement + return toJsonElement(jsonifySchemaInfo(schemaInfo)); } } @@ -356,7 +361,7 @@ public final class SchemaUtils { * @return the key/value schema info data bytes */ public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValueSchemaInfoDataJsonBytes) throws IOException { - JsonObject jsonObject = toJsonObject(new String(keyValueSchemaInfoDataJsonBytes, UTF_8)); + JsonObject jsonObject = (JsonObject) toJsonElement(new String(keyValueSchemaInfoDataJsonBytes, UTF_8)); byte[] keyBytes = getKeyOrValueSchemaBytes(jsonObject.get("key")); byte[] valueBytes = getKeyOrValueSchemaBytes(jsonObject.get("value")); int dataLength = 4 + keyBytes.length + 4 + valueBytes.length;
