This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_rootless in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5dee3ebe51ee44c859c96f580ddad4769b9f0895 Author: Enrico Olivelli <eolive...@gmail.com> AuthorDate: Fri May 14 07:29:53 2021 +0200 Use Message.getReaderSchema() in Pulsar IO Sinks when possible (#10557) (cherry picked from commit 90117b2be8df9893e0f9a7b6829d48bdddc7c55f) --- .../apache/pulsar/client/api/SimpleSchemaTest.java | 5 ++ .../client/impl/schema/AbstractStructSchema.java | 8 ++ .../pulsar/client/impl/schema/KeyValueSchema.java | 10 ++- .../pulsar/functions/instance/SinkRecord.java | 12 ++- .../integration/io/TestGenericObjectSink.java | 15 +++- .../io/PulsarGenericObjectSinkTest.java | 86 +++++++++++++++++++++- 6 files changed, 131 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index d594688..af49f96 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -616,6 +616,11 @@ public class SimpleSchemaTest extends ProducerConsumerBase { assertEquals(data.getKey().getField("i"), i * 100); assertEquals(data.getValue().getField("i"), i * 1000); c0.acknowledge(wrapper); + Schema<?> schema = wrapper.getReaderSchema().get(); + KeyValueSchema keyValueSchema = (KeyValueSchema) schema; + assertEquals(SchemaType.AVRO, keyValueSchema.getKeySchema().getSchemaInfo().getType()); + assertEquals(SchemaType.AVRO, keyValueSchema.getValueSchema().getSchemaInfo().getType()); + assertNotNull(schema.getSchemaInfo()); } // verify c1 for (int i = 0; i < numMessages; i++) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java index c4444e7..ce68434 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java @@ -158,6 +158,14 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> { return Optional.empty(); } } + + @Override + public String toString() { + return "VersionedSchema(type=" + schemaInfo.getType() + + ",schemaVersion="+BytesSchemaVersion.of(schemaVersion) + + ",name="+schemaInfo.getName() + + ")"; + } } private AbstractStructSchema<T> getAbstractStructSchemaAtVersion(byte[] schemaVersion, SchemaInfo schemaInfo) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java index f572bbf..c33de77 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java @@ -122,6 +122,8 @@ public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> { // defer configuring the key/value schema info until `configureSchemaInfo` is called. if (!requireFetchingSchemaInfo()) { configureKeyValueSchemaInfo(); + } else { + buildKeyValueSchemaInfo(); } } @@ -224,10 +226,14 @@ public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> { return KeyValueSchema.of(keySchema.clone(), valueSchema.clone(), keyValueEncodingType); } - private void configureKeyValueSchemaInfo() { + private void buildKeyValueSchemaInfo() { this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo( - keySchema, valueSchema, keyValueEncodingType + keySchema, valueSchema, keyValueEncodingType ); + } + + private void configureKeyValueSchemaInfo() { + buildKeyValueSchemaInfo(); this.keySchema.setSchemaInfoProvider(new SchemaInfoProvider() { @Override public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java index 38c7036..a7ff0eb 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java @@ -98,8 +98,18 @@ public class SinkRecord<T> implements Record<T> { if (sourceRecord.getSchema() != null) { // unwrap actual schema Schema<T> schema = sourceRecord.getSchema(); + // AutoConsumeSchema is a special schema, that comes into play + // when the Sink is going to handle any Schema + // usually you see Sink<GenericObject> or Sink<GenericRecord> in this case if (schema instanceof AutoConsumeSchema) { - schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema(); + // extract the Schema from the message, this is the most accurate schema we have + // see PIP-85 + if (sourceRecord.getMessage().isPresent() + && sourceRecord.getMessage().get().getReaderSchema().isPresent()) { + schema = (Schema<T>) sourceRecord.getMessage().get().getReaderSchema().get(); + } else { + schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema(); + } } return schema; } diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java index b7645ba..63b3bac 100644 --- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java @@ -41,10 +41,13 @@ public class TestGenericObjectSink implements Sink<GenericObject> { log.info("received record {} {}", record, record.getClass()); log.info("schema {}", record.getSchema()); log.info("native schema {}", record.getSchema().getNativeSchema().orElse(null)); + log.info("schemaInfo {}", record.getSchema().getSchemaInfo()); + log.info("schemaInfo.type {}", record.getSchema().getSchemaInfo().getType()); String expectedRecordType = record.getProperties().getOrDefault("expectedType", "MISSING"); + log.info("expectedRecordType {}", expectedRecordType); if (!expectedRecordType.equals(record.getSchema().getSchemaInfo().getType().name())) { - throw new RuntimeException("Unexpected record type "+record.getSchema().getSchemaInfo().getType().name() +" is not "+expectedRecordType); + throw new RuntimeException("Unexpected record type " + record.getSchema().getSchemaInfo().getType().name() + " is not " + expectedRecordType); } log.info("value {}", record.getValue()); @@ -66,6 +69,16 @@ public class TestGenericObjectSink implements Sink<GenericObject> { log.info("value schema type {}", record.getValue().getSchemaType()); log.info("value native object {}", record.getValue().getNativeObject()); + String expectedSchemaDefinition = record.getProperties().getOrDefault("expectedSchemaDefinition", ""); + log.info("schemaDefinition {}", record.getSchema().getSchemaInfo().getSchemaDefinition()); + log.info("expectedSchemaDefinition {}", expectedSchemaDefinition); + if (!expectedSchemaDefinition.isEmpty()) { + String schemaDefinition = record.getSchema().getSchemaInfo().getSchemaDefinition(); + if (!expectedSchemaDefinition.equals(schemaDefinition)) { + throw new RuntimeException("Unexpected schema definition " + schemaDefinition + " is not " + expectedSchemaDefinition); + } + } + record.ack(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java index c810b1b..a728f5b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java @@ -74,13 +74,21 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite { @Data @Builder - public static final class Pojo { + public static class Pojo { private String field1; private int field2; } @Data @Builder + public static class PojoV2 { + private String field1; + private int field2; + private Double field3; + } + + @Data + @Builder public static final class PojoKey { private String field1; } @@ -170,6 +178,82 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite { getSinkInfoNotFound(sinkName); } + @Test(groups = {"sink"}) + public void testGenericObjectSinkWithSchemaChange() throws Exception { + + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(container.getPlainTextServiceUrl()) + .build(); + + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build(); + + + final int numRecords = 2; + + String sinkName = "genericobject-sink"; + String topicName = "test-genericobject-sink-schema-change"; + + submitSinkConnector(sinkName, topicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR); + // get sink info + getSinkInfoSuccess(sinkName); + getSinkStatus(sinkName); + + @Cleanup Producer<byte[]> producer = client.newProducer() + .topic(topicName) + .create(); + Schema<Pojo> schemav1 = Schema.AVRO(Pojo.class); + Pojo record1 = Pojo.builder().field1("foo").field2(23).build(); + producer.newMessage(schemav1) + .value(record1) + .property("expectedType", schemav1.getSchemaInfo().getType().toString()) + .property("expectedSchemaDefinition", schemav1.getSchemaInfo().getSchemaDefinition()) + .property("recordNumber", "1") + .send(); + + Schema<PojoV2> schemav2 = Schema.AVRO(PojoV2.class); + PojoV2 record2 = PojoV2.builder().field1("foo").field2(23).field3(42.5).build(); + producer.newMessage(schemav2) + .value(record2) + .property("expectedType", schemav2.getSchemaInfo().getType().toString()) + .property("expectedSchemaDefinition", schemav2.getSchemaInfo().getSchemaDefinition()) + .property("recordNumber", "2") + .send(); + + // wait that sink processed all records without errors + + try { + log.info("waiting for sink {}", sinkName); + + for (int i = 0; i < 120; i++) { + SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName); + log.info("sink {} status {}", sinkName, status); + assertEquals(status.getInstances().size(), 1); + SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0); + if (instance.getStatus().numWrittenToSink >= numRecords + || instance.getStatus().numSinkExceptions > 0 + || instance.getStatus().numSystemExceptions > 0 + || instance.getStatus().numRestarts > 0) { + break; + } + Thread.sleep(1000); + } + + SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName); + log.info("sink {} status {}", sinkName, status); + assertEquals(status.getInstances().size(), 1); + assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords); + assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0); + assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0); + log.info("sink {} is okay", sinkName); + } finally { + dumpFunctionLogs(sinkName); + } + + deleteSink(sinkName); + getSinkInfoNotFound(sinkName); + } + private void submitSinkConnector(String sinkName, String inputTopicName, String className,