This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_rootless
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5dee3ebe51ee44c859c96f580ddad4769b9f0895
Author: Enrico Olivelli <eolive...@gmail.com>
AuthorDate: Fri May 14 07:29:53 2021 +0200

    Use Message.getReaderSchema() in Pulsar IO Sinks when possible (#10557)
    
    (cherry picked from commit 90117b2be8df9893e0f9a7b6829d48bdddc7c55f)
---
 .../apache/pulsar/client/api/SimpleSchemaTest.java |  5 ++
 .../client/impl/schema/AbstractStructSchema.java   |  8 ++
 .../pulsar/client/impl/schema/KeyValueSchema.java  | 10 ++-
 .../pulsar/functions/instance/SinkRecord.java      | 12 ++-
 .../integration/io/TestGenericObjectSink.java      | 15 +++-
 .../io/PulsarGenericObjectSinkTest.java            | 86 +++++++++++++++++++++-
 6 files changed, 131 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index d594688..af49f96 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -616,6 +616,11 @@ public class SimpleSchemaTest extends ProducerConsumerBase 
{
                 assertEquals(data.getKey().getField("i"), i * 100);
                 assertEquals(data.getValue().getField("i"), i * 1000);
                 c0.acknowledge(wrapper);
+                Schema<?> schema = wrapper.getReaderSchema().get();
+                KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
+                assertEquals(SchemaType.AVRO, 
keyValueSchema.getKeySchema().getSchemaInfo().getType());
+                assertEquals(SchemaType.AVRO, 
keyValueSchema.getValueSchema().getSchemaInfo().getType());
+                assertNotNull(schema.getSchemaInfo());
             }
             // verify c1
             for (int i = 0; i < numMessages; i++) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
index c4444e7..ce68434 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
@@ -158,6 +158,14 @@ public abstract class AbstractStructSchema<T> extends 
AbstractSchema<T> {
                 return Optional.empty();
             }
         }
+
+        @Override
+        public String toString() {
+            return "VersionedSchema(type=" + schemaInfo.getType() +
+                    ",schemaVersion="+BytesSchemaVersion.of(schemaVersion) +
+                    ",name="+schemaInfo.getName()
+                    + ")";
+        }
     }
 
     private AbstractStructSchema<T> getAbstractStructSchemaAtVersion(byte[] 
schemaVersion, SchemaInfo schemaInfo) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index f572bbf..c33de77 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -122,6 +122,8 @@ public class KeyValueSchema<K, V> extends 
AbstractSchema<KeyValue<K, V>> {
         // defer configuring the key/value schema info until 
`configureSchemaInfo` is called.
         if (!requireFetchingSchemaInfo()) {
             configureKeyValueSchemaInfo();
+        } else {
+            buildKeyValueSchemaInfo();
         }
     }
 
@@ -224,10 +226,14 @@ public class KeyValueSchema<K, V> extends 
AbstractSchema<KeyValue<K, V>> {
         return KeyValueSchema.of(keySchema.clone(), valueSchema.clone(), 
keyValueEncodingType);
     }
 
-    private void configureKeyValueSchemaInfo() {
+    private void buildKeyValueSchemaInfo() {
         this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
-            keySchema, valueSchema, keyValueEncodingType
+                keySchema, valueSchema, keyValueEncodingType
         );
+    }
+
+    private void configureKeyValueSchemaInfo() {
+        buildKeyValueSchemaInfo();
         this.keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
             @Override
             public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] 
schemaVersion) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 38c7036..a7ff0eb 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -98,8 +98,18 @@ public class SinkRecord<T> implements Record<T> {
         if (sourceRecord.getSchema() != null) {
             // unwrap actual schema
             Schema<T> schema =  sourceRecord.getSchema();
+            // AutoConsumeSchema is a special schema, that comes into play
+            // when the Sink is going to handle any Schema
+            // usually you see Sink<GenericObject> or Sink<GenericRecord> in 
this case
             if (schema instanceof AutoConsumeSchema) {
-                schema = (Schema<T>) ((AutoConsumeSchema) 
schema).getInternalSchema();
+                // extract the Schema from the message, this is the most 
accurate schema we have
+                // see PIP-85
+                if (sourceRecord.getMessage().isPresent()
+                        && 
sourceRecord.getMessage().get().getReaderSchema().isPresent()) {
+                    schema = (Schema<T>) 
sourceRecord.getMessage().get().getReaderSchema().get();
+                } else {
+                    schema = (Schema<T>) ((AutoConsumeSchema) 
schema).getInternalSchema();
+                }
             }
             return schema;
         }
diff --git 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
index b7645ba..63b3bac 100644
--- 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
+++ 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
@@ -41,10 +41,13 @@ public class TestGenericObjectSink implements 
Sink<GenericObject> {
         log.info("received record {} {}", record, record.getClass());
         log.info("schema {}", record.getSchema());
         log.info("native schema {}", 
record.getSchema().getNativeSchema().orElse(null));
+        log.info("schemaInfo {}", record.getSchema().getSchemaInfo());
+        log.info("schemaInfo.type {}", 
record.getSchema().getSchemaInfo().getType());
 
         String expectedRecordType = 
record.getProperties().getOrDefault("expectedType", "MISSING");
+        log.info("expectedRecordType {}", expectedRecordType);
         if 
(!expectedRecordType.equals(record.getSchema().getSchemaInfo().getType().name()))
 {
-            throw new RuntimeException("Unexpected record type 
"+record.getSchema().getSchemaInfo().getType().name() +" is not 
"+expectedRecordType);
+            throw new RuntimeException("Unexpected record type " + 
record.getSchema().getSchemaInfo().getType().name() + " is not " + 
expectedRecordType);
         }
 
         log.info("value {}", record.getValue());
@@ -66,6 +69,16 @@ public class TestGenericObjectSink implements 
Sink<GenericObject> {
         log.info("value schema type {}", record.getValue().getSchemaType());
         log.info("value native object {}", 
record.getValue().getNativeObject());
 
+        String expectedSchemaDefinition = 
record.getProperties().getOrDefault("expectedSchemaDefinition", "");
+        log.info("schemaDefinition {}", 
record.getSchema().getSchemaInfo().getSchemaDefinition());
+        log.info("expectedSchemaDefinition {}", expectedSchemaDefinition);
+        if (!expectedSchemaDefinition.isEmpty()) {
+            String schemaDefinition = 
record.getSchema().getSchemaInfo().getSchemaDefinition();
+            if (!expectedSchemaDefinition.equals(schemaDefinition)) {
+                throw new RuntimeException("Unexpected schema definition " + 
schemaDefinition + " is not " + expectedSchemaDefinition);
+            }
+        }
+
         record.ack();
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
index c810b1b..a728f5b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
@@ -74,13 +74,21 @@ public class PulsarGenericObjectSinkTest extends 
PulsarStandaloneTestSuite {
 
     @Data
     @Builder
-    public static final class Pojo {
+    public static class Pojo {
         private String field1;
         private int field2;
     }
 
     @Data
     @Builder
+    public static class PojoV2 {
+        private String field1;
+        private int field2;
+        private Double field3;
+    }
+
+    @Data
+    @Builder
     public static final class PojoKey {
         private String field1;
     }
@@ -170,6 +178,82 @@ public class PulsarGenericObjectSinkTest extends 
PulsarStandaloneTestSuite {
         getSinkInfoNotFound(sinkName);
     }
 
+    @Test(groups = {"sink"})
+    public void testGenericObjectSinkWithSchemaChange() throws Exception {
+
+        @Cleanup PulsarClient client = PulsarClient.builder()
+                .serviceUrl(container.getPlainTextServiceUrl())
+                .build();
+
+        @Cleanup
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build();
+
+
+        final int numRecords = 2;
+
+        String sinkName = "genericobject-sink";
+        String topicName = "test-genericobject-sink-schema-change";
+
+        submitSinkConnector(sinkName, topicName, 
"org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
+        // get sink info
+        getSinkInfoSuccess(sinkName);
+        getSinkStatus(sinkName);
+
+        @Cleanup Producer<byte[]> producer = client.newProducer()
+                    .topic(topicName)
+                    .create();
+        Schema<Pojo> schemav1 = Schema.AVRO(Pojo.class);
+        Pojo record1 = Pojo.builder().field1("foo").field2(23).build();
+        producer.newMessage(schemav1)
+                .value(record1)
+                .property("expectedType", 
schemav1.getSchemaInfo().getType().toString())
+                .property("expectedSchemaDefinition", 
schemav1.getSchemaInfo().getSchemaDefinition())
+                .property("recordNumber", "1")
+                .send();
+
+        Schema<PojoV2> schemav2 = Schema.AVRO(PojoV2.class);
+        PojoV2 record2 = 
PojoV2.builder().field1("foo").field2(23).field3(42.5).build();
+        producer.newMessage(schemav2)
+                .value(record2)
+                .property("expectedType", 
schemav2.getSchemaInfo().getType().toString())
+                .property("expectedSchemaDefinition", 
schemav2.getSchemaInfo().getSchemaDefinition())
+                .property("recordNumber", "2")
+                .send();
+
+        // wait that sink processed all records without errors
+
+        try {
+            log.info("waiting for sink {}", sinkName);
+
+            for (int i = 0; i < 120; i++) {
+                SinkStatus status = admin.sinks().getSinkStatus("public", 
"default", sinkName);
+                log.info("sink {} status {}", sinkName, status);
+                assertEquals(status.getInstances().size(), 1);
+                SinkStatus.SinkInstanceStatus instance = 
status.getInstances().get(0);
+                if (instance.getStatus().numWrittenToSink >= numRecords
+                        || instance.getStatus().numSinkExceptions > 0
+                        || instance.getStatus().numSystemExceptions > 0
+                        || instance.getStatus().numRestarts > 0) {
+                    break;
+                }
+                Thread.sleep(1000);
+            }
+
+            SinkStatus status = admin.sinks().getSinkStatus("public", 
"default", sinkName);
+            log.info("sink {} status {}", sinkName, status);
+            assertEquals(status.getInstances().size(), 1);
+            
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= 
numRecords);
+            
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
+            
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
+            log.info("sink {} is okay", sinkName);
+        } finally {
+            dumpFunctionLogs(sinkName);
+        }
+
+        deleteSink(sinkName);
+        getSinkInfoNotFound(sinkName);
+    }
+
     private void submitSinkConnector(String sinkName,
                                      String inputTopicName,
                                      String className,

Reply via email to