This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 311817f adding protobuf schema check support (#1954) 311817f is described below commit 311817f91eacd49811589d5bf75e54c8221fb642 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Tue Jun 12 22:09:10 2018 -0700 adding protobuf schema check support (#1954) * adding protobuf schema check support * refactoring pom --- pulsar-broker/pom.xml | 1 + .../schema/AvroSchemaCompatibilityCheck.java | 14 ++-- .../schema/ProtobufSchemaCompatibilityCheck.java | 29 ++++---- .../schema/SchemaCompatibilityStrategy.java | 27 ++------ .../schema/AvroSchemaCompatibilityCheckTest.java | 6 +- .../api/SimpleTypedProducerConsumerTest.java | 79 ++++++++++++++++++++++ .../src/test/proto/ProtobufSchemaTest.proto | 41 +++++------ pulsar-client/pom.xml | 33 +++++++++ .../pulsar/client/impl/schema/ProtobufSchema.java | 7 +- .../pulsar/client/schemas/ProtobufSchemaTest.java | 28 ++++++++ .../ProtobufSchemaTest.java => proto/Test.proto} | 39 ++++++----- 11 files changed, 210 insertions(+), 94 deletions(-) diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 0a118a8..0306b69 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -332,6 +332,7 @@ <execution> <goals> <goal>compile</goal> + <goal>test-compile</goal> </goals> </execution> </executions> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java index 5d5a77e..b7dd6d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java @@ -30,13 +30,13 @@ import java.util.Arrays; public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck { - private final CompatibilityStrategy compatibilityStrategy; + private final SchemaCompatibilityStrategy compatibilityStrategy; public AvroSchemaCompatibilityCheck () { - this(CompatibilityStrategy.FULL); + this(SchemaCompatibilityStrategy.FULL); } - public AvroSchemaCompatibilityCheck(CompatibilityStrategy compatibilityStrategy) { + public AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy compatibilityStrategy) { this.compatibilityStrategy = compatibilityStrategy; } @@ -62,13 +62,7 @@ public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck { return true; } - public enum CompatibilityStrategy { - BACKWARD, - FORWARD, - FULL - } - - private static SchemaValidator createSchemaValidator(CompatibilityStrategy compatibilityStrategy, + private static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy, boolean onlyLatestValidator) { final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder(); switch (compatibilityStrategy) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java similarity index 51% copy from pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java index fe2c116..0e4dafe 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufSchemaCompatibilityCheck.java @@ -16,27 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.schemas; +package org.apache.pulsar.broker.service.schema; -import org.apache.pulsar.client.impl.schema.ProtobufSchema; -import org.apache.pulsar.functions.proto.Function; -import org.testng.Assert; -import org.testng.annotations.Test; +import org.apache.pulsar.common.schema.SchemaType; -public class ProtobufSchemaTest { +public class ProtobufSchemaCompatibilityCheck extends AvroSchemaCompatibilityCheck { - private static final String NAME = "foo"; - - @Test - public void testEncodeAndDecode() { - Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setName(NAME).build(); - - ProtobufSchema<Function.FunctionDetails> protobufSchema = ProtobufSchema.of(Function.FunctionDetails.class); - - byte[] bytes = protobufSchema.encode(functionDetails); + public ProtobufSchemaCompatibilityCheck () { + this(SchemaCompatibilityStrategy.FULL); + } - Function.FunctionDetails message = protobufSchema.decode(bytes); + public ProtobufSchemaCompatibilityCheck (SchemaCompatibilityStrategy compatibilityStrategy) { + super(compatibilityStrategy); + } - Assert.assertEquals(message.getName(), NAME); + @Override + public SchemaType getSchemaType() { + return SchemaType.PROTOBUF; } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java similarity index 51% copy from pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java index fe2c116..85c9d50 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java @@ -16,27 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.schemas; +package org.apache.pulsar.broker.service.schema; -import org.apache.pulsar.client.impl.schema.ProtobufSchema; -import org.apache.pulsar.functions.proto.Function; -import org.testng.Assert; -import org.testng.annotations.Test; - -public class ProtobufSchemaTest { - - private static final String NAME = "foo"; - - @Test - public void testEncodeAndDecode() { - Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setName(NAME).build(); - - ProtobufSchema<Function.FunctionDetails> protobufSchema = ProtobufSchema.of(Function.FunctionDetails.class); - - byte[] bytes = protobufSchema.encode(functionDetails); - - Function.FunctionDetails message = protobufSchema.decode(bytes); - - Assert.assertEquals(message.getName(), NAME); - } +public enum SchemaCompatibilityStrategy { + BACKWARD, + FORWARD, + FULL } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java index de63f54..eceb9c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java @@ -74,7 +74,7 @@ public class AvroSchemaCompatibilityCheckTest { public void testBackwardCompatibility() { AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck( - AvroSchemaCompatibilityCheck.CompatibilityStrategy.BACKWARD + SchemaCompatibilityStrategy.BACKWARD ); // adding a field with default is backwards compatible @@ -107,7 +107,7 @@ public class AvroSchemaCompatibilityCheckTest { public void testForwardCompatibility() { AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck( - AvroSchemaCompatibilityCheck.CompatibilityStrategy.FORWARD + SchemaCompatibilityStrategy.FORWARD ); Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2), @@ -132,7 +132,7 @@ public class AvroSchemaCompatibilityCheckTest { @Test public void testFullCompatibility() { AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck( - AvroSchemaCompatibilityCheck.CompatibilityStrategy.FULL + SchemaCompatibilityStrategy.FULL ); Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2), "adding a field with default fully compatible"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index 873080f..b4bd76e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.common.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.slf4j.Logger; @@ -194,6 +195,84 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + + @Test + public void testProtobufProducerAndConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + + ProtobufSchema<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> protobufSchema = + ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class); + + Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> consumer = pulsarClient + .newConsumer(protobufSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscribe(); + + Producer<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> producer = pulsarClient + .newProducer(protobufSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.newBuilder() + .setStringField(message).build()); + } + + Message<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> msg = null; + Set<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + org.apache.pulsar.client.api.schema.proto.Test.TestMessage receivedMessage = msg.getValue(); + log.debug("Received message: [{}]", receivedMessage); + org.apache.pulsar.client.api.schema.proto.Test.TestMessage expectedMessage + = org.apache.pulsar.client.api.schema.proto.Test.TestMessage.newBuilder() + .setStringField("my-message-" + i).build(); + + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + + SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() + .getSchema("my-property/my-ns/my-topic1") + .get(); + + Assert.assertEquals(storedSchema.schema.getData(), protobufSchema.getSchemaInfo().getSchema()); + + log.info("-- Exiting {} test --", methodName); + } + + @Test(expectedExceptions = {PulsarClientException.class}) + public void testProtobufConsumerWithWrongPrestoredSchema() throws Exception { + log.info("-- Starting {} test --", methodName); + + ProtobufSchema<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> schema + = ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class); + + pulsar.getSchemaRegistryService() + .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SchemaData.builder() + .type(SchemaType.PROTOBUF) + .isDeleted(false) + .timestamp(Clock.systemUTC().millis()) + .user("me") + .data(schema.getSchemaInfo().getSchema()) + .props(Collections.emptyMap()) + .build() + ).get(); + + Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong> consumer = pulsarClient + .newConsumer(AvroSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class)) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscribe(); + + log.info("-- Exiting {} test --", methodName); + } + @Test public void testAvroProducerAndConsumer() throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java b/pulsar-broker/src/test/proto/ProtobufSchemaTest.proto similarity index 51% copy from pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java copy to pulsar-broker/src/test/proto/ProtobufSchemaTest.proto index fe2c116..1086e91 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java +++ b/pulsar-broker/src/test/proto/ProtobufSchemaTest.proto @@ -16,27 +16,30 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.schemas; +syntax = "proto3"; +package proto; -import org.apache.pulsar.client.impl.schema.ProtobufSchema; -import org.apache.pulsar.functions.proto.Function; -import org.testng.Assert; -import org.testng.annotations.Test; +option java_package = "org.apache.pulsar.client.api.schema.proto"; +option java_outer_classname = "Test"; -public class ProtobufSchemaTest { - - private static final String NAME = "foo"; - - @Test - public void testEncodeAndDecode() { - Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setName(NAME).build(); - - ProtobufSchema<Function.FunctionDetails> protobufSchema = ProtobufSchema.of(Function.FunctionDetails.class); - - byte[] bytes = protobufSchema.encode(functionDetails); +enum TestEnum { + SHARED = 0; + FAILOVER = 1; +} - Function.FunctionDetails message = protobufSchema.decode(bytes); +message SubMessage { + string foo = 1; + double bar = 2; +} - Assert.assertEquals(message.getName(), NAME); - } +message TestMessage { + string stringField = 1; + double doubleField = 2; + int32 intField = 3; + TestEnum testEnum = 4; + SubMessage nestedField = 5; } + +message TestMessageWrong { + float foo = 1; +} \ No newline at end of file diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index 16a7224..27e4155 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -98,6 +98,18 @@ </dependency> <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-protobuf</artifactId> + <version>${avro.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf3.version}</version> @@ -137,5 +149,26 @@ <filtering>true</filtering> </resource> </resources> + <!-- Generate protobuf for testing purposes --> + <plugins> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>${protobuf-maven-plugin.version}</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact> + <checkStaleness>true</checkStaleness> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.version}:exe:${os.detected.classifier}</pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>test-compile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> </build> </project> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java index 42c9331..4f2fba3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl.schema; import com.google.protobuf.Parser; +import org.apache.avro.protobuf.ProtobufDatumReader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.schema.SchemaInfo; @@ -73,9 +74,9 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> im info.setName(""); info.setProperties(properties); info.setType(SchemaType.PROTOBUF); - - //TODO determine best method to extract schema from a protobuf message - info.setSchema(null); + ProtobufDatumReader<T> datumReader = new ProtobufDatumReader<>(pojo); + org.apache.avro.Schema schema = datumReader.getSchema(); + info.setSchema(schema.toString().getBytes()); return new ProtobufSchema<>(info, pojo); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java index fe2c116..f099eb2 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java @@ -18,15 +18,29 @@ */ package org.apache.pulsar.client.schemas; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; import org.apache.pulsar.client.impl.schema.ProtobufSchema; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.proto.Function; import org.testng.Assert; import org.testng.annotations.Test; +@Slf4j public class ProtobufSchemaTest { private static final String NAME = "foo"; + private static final String EXPECTED_SCHEMA_JSON = "{\"type\":\"record\",\"name\":\"TestMessage\"," + + "\"namespace\":\"org.apache.pulsar.client.schemas.proto.Test$\",\"fields\":[{\"name\":\"stringField\"," + + "\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"}," + + "{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0},{\"name\":\"intField\",\"type\":\"int\"," + + "\"default\":0},{\"name\":\"testEnum\",\"type\":{\"type\":\"enum\",\"name\":\"TestEnum\"," + + "\"symbols\":[\"SHARED\",\"FAILOVER\"]},\"default\":\"SHARED\"},{\"name\":\"nestedField\"," + + "\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubMessage\",\"fields\":[{\"name\":\"foo\"," + + "\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"bar\"," + + "\"type\":\"double\",\"default\":0}]}],\"default\":null}]}"; + @Test public void testEncodeAndDecode() { Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setName(NAME).build(); @@ -39,4 +53,18 @@ public class ProtobufSchemaTest { Assert.assertEquals(message.getName(), NAME); } + + @Test + public void testSchema() { + ProtobufSchema<org.apache.pulsar.client.schemas.proto.Test.TestMessage> protobufSchema + = ProtobufSchema.of(org.apache.pulsar.client.schemas.proto.Test.TestMessage.class); + + Assert.assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF); + + String schemaJson = new String(protobufSchema.getSchemaInfo().getSchema()); + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(schemaJson); + + Assert.assertEquals(schema.toString(), EXPECTED_SCHEMA_JSON); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java b/pulsar-client/src/test/proto/Test.proto similarity index 51% copy from pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java copy to pulsar-client/src/test/proto/Test.proto index fe2c116..d140051 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/ProtobufSchemaTest.java +++ b/pulsar-client/src/test/proto/Test.proto @@ -16,27 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.schemas; +syntax = "proto3"; +package proto; -import org.apache.pulsar.client.impl.schema.ProtobufSchema; -import org.apache.pulsar.functions.proto.Function; -import org.testng.Assert; -import org.testng.annotations.Test; +option java_package = "org.apache.pulsar.client.schemas.proto"; +option java_outer_classname = "Test"; -public class ProtobufSchemaTest { - - private static final String NAME = "foo"; - - @Test - public void testEncodeAndDecode() { - Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setName(NAME).build(); - - ProtobufSchema<Function.FunctionDetails> protobufSchema = ProtobufSchema.of(Function.FunctionDetails.class); - - byte[] bytes = protobufSchema.encode(functionDetails); - - Function.FunctionDetails message = protobufSchema.decode(bytes); +enum TestEnum { + SHARED = 0; + FAILOVER = 1; +} - Assert.assertEquals(message.getName(), NAME); - } +message SubMessage { + string foo = 1; + double bar = 2; } + +message TestMessage { + string stringField = 1; + double doubleField = 2; + int32 intField = 3; + TestEnum testEnum = 4; + SubMessage nestedField = 5; +} \ No newline at end of file -- To stop receiving notification emails like this one, please contact jerryp...@apache.org.