This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b87357dbf3 [improve][client] Support protobuf v4 schema compatibility 
(#25261)
6b87357dbf3 is described below

commit 6b87357dbf384ef45d0f4620066e31f9a3b60ae2
Author: Penghui Li <[email protected]>
AuthorDate: Thu Feb 26 04:09:31 2026 -0800

    [improve][client] Support protobuf v4 schema compatibility (#25261)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .github/workflows/pulsar-ci.yaml                   |  2 +
 build/run_unit_group.sh                            |  8 +++
 .../java/org/apache/pulsar/client/api/Schema.java  |  8 +--
 .../PulsarClientImplementationBinding.java         |  4 +-
 .../PulsarClientImplementationBindingImpl.java     |  4 +-
 .../client/impl/schema/ProtobufNativeSchema.java   | 15 +++---
 .../pulsar/client/impl/schema/ProtobufSchema.java  | 17 +++----
 .../impl/schema/reader/ProtobufNativeReader.java   |  4 +-
 .../client/impl/schema/reader/ProtobufReader.java  |  3 +-
 .../impl/schema/writer/ProtobufNativeWriter.java   |  4 +-
 .../client/impl/schema/writer/ProtobufWriter.java  |  3 +-
 .../client/api/ProtobufSchemaApiSignatureTest.java | 58 ++++++++++++++++++++++
 .../impl/schema/ProtobufNativeSchemaTest.java      | 15 ++++++
 .../client/impl/schema/ProtobufSchemaTest.java     | 13 +++++
 .../functions/instance/JavaInstanceRunnable.java   |  2 +-
 .../pulsar/functions/source/TopicSchema.java       |  2 +-
 .../instance/JavaInstanceRunnableTest.java         | 10 ++++
 .../pulsar/functions/source/TopicSchemaTest.java   | 16 ++++++
 18 files changed, 157 insertions(+), 31 deletions(-)

diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 17947a45b5b..9d7e025d71d 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -337,6 +337,8 @@ jobs:
             group: CLIENT
           - name: Pulsar Metadata
             group: METADATA
+          - name: Protobuf v4
+            group: PROTOBUFV4
 
     steps:
       - name: checkout
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 9653a787325..5286d34d6c4 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -115,6 +115,14 @@ function test_group_metadata() {
   mvn_test -pl pulsar-metadata -DtestReuseFork=false
 }
 
+function test_group_protobufv4() {
+  mvn_test --clean --install \
+    -Dprotobuf3.version=4.31.1 \
+    -Dprotoc3.version=4.31.1 \
+    -pl pulsar-client,pulsar-functions/instance \
+    
-Dtest=org.apache.pulsar.client.api.ProtobufSchemaApiSignatureTest,org.apache.pulsar.client.impl.schema.ProtobufSchemaTest,org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaTest,org.apache.pulsar.functions.source.TopicSchemaTest,org.apache.pulsar.functions.instance.JavaInstanceRunnableTest
+}
+
 # prints summaries of failed tests to console
 # by using the targer/surefire-reports files
 # works only when testForkCount > 1 since that is when surefire will create 
reports for individual test classes
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 3089684a1f9..64862edca1e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -300,7 +300,7 @@ public interface Schema<T> extends Cloneable {
      * @param clazz the Protobuf generated class to be used to extract the 
schema
      * @return a Schema instance
      */
-    static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> 
PROTOBUF(Class<T> clazz) {
+    static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF(Class<T> 
clazz) {
         return DefaultImplementation.getDefaultImplementation()
                 
.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
     }
@@ -311,7 +311,7 @@ public interface Schema<T> extends Cloneable {
      * @param schemaDefinition schemaDefinition the definition of the schema
      * @return a Schema instance
      */
-    static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> 
PROTOBUF(SchemaDefinition<T> schemaDefinition) {
+    static <T extends com.google.protobuf.Message> Schema<T> 
PROTOBUF(SchemaDefinition<T> schemaDefinition) {
         return 
DefaultImplementation.getDefaultImplementation().newProtobufSchema(schemaDefinition);
     }
 
@@ -321,7 +321,7 @@ public interface Schema<T> extends Cloneable {
      * @param clazz the Protobuf generated class to be used to extract the 
schema
      * @return a Schema instance
      */
-    static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> 
PROTOBUF_NATIVE(Class<T> clazz) {
+    static <T extends com.google.protobuf.Message> Schema<T> 
PROTOBUF_NATIVE(Class<T> clazz) {
         return DefaultImplementation.getDefaultImplementation()
                 
.newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build());
     }
@@ -332,7 +332,7 @@ public interface Schema<T> extends Cloneable {
      * @param schemaDefinition schemaDefinition the definition of the schema
      * @return a Schema instance
      */
-    static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> 
PROTOBUF_NATIVE(
+    static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF_NATIVE(
             SchemaDefinition<T> schemaDefinition) {
         return 
DefaultImplementation.getDefaultImplementation().newProtobufNativeSchema(schemaDefinition);
     }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index b5f2a3a468e..f13cc003f5b 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -116,9 +116,9 @@ public interface PulsarClientImplementationBinding {
 
     <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition);
 
-    <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> 
newProtobufSchema(SchemaDefinition schemaDefinition);
+    <T extends com.google.protobuf.Message> Schema<T> 
newProtobufSchema(SchemaDefinition schemaDefinition);
 
-    <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> 
newProtobufNativeSchema(
+    <T extends com.google.protobuf.Message> Schema<T> newProtobufNativeSchema(
             SchemaDefinition schemaDefinition);
 
     <T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index 0351477985f..0b61540821a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -210,12 +210,12 @@ public final class PulsarClientImplementationBindingImpl 
implements PulsarClient
         return AvroSchema.of(schemaDefinition);
     }
 
-    public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> 
newProtobufSchema(
+    public <T extends com.google.protobuf.Message> Schema<T> newProtobufSchema(
             SchemaDefinition schemaDefinition) {
         return ProtobufSchema.of(schemaDefinition);
     }
 
-    public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> 
newProtobufNativeSchema(
+    public <T extends com.google.protobuf.Message> Schema<T> 
newProtobufNativeSchema(
             SchemaDefinition schemaDefinition) {
         return ProtobufNativeSchema.of(schemaDefinition);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
index 46a2f7d806a..9243303e842 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.client.impl.schema;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.protobuf.Descriptors;
-import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.Message;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.HashMap;
@@ -41,7 +41,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 /**
  * A schema implementation to deal with protobuf generated messages.
  */
-public class ProtobufNativeSchema<T extends GeneratedMessageV3> extends 
AbstractStructSchema<T> {
+public class ProtobufNativeSchema<T extends Message> extends 
AbstractStructSchema<T> {
 
     public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
 
@@ -104,7 +104,7 @@ public class ProtobufNativeSchema<T extends 
GeneratedMessageV3> extends Abstract
         return Optional.of(getProtobufNativeSchema());
     }
 
-    public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T> 
of(Class<T> pojo) {
+    public static <T extends Message> ProtobufNativeSchema<T> of(Class<T> 
pojo) {
         return of(pojo, new HashMap<>());
     }
 
@@ -117,8 +117,8 @@ public class ProtobufNativeSchema<T extends 
GeneratedMessageV3> extends Abstract
     public static <T> ProtobufNativeSchema of(SchemaDefinition<T> 
schemaDefinition) {
         Class<T> pojo = schemaDefinition.getPojo();
 
-        if (!GeneratedMessageV3.class.isAssignableFrom(pojo)) {
-            throw new 
IllegalArgumentException(GeneratedMessageV3.class.getName()
+        if (!Message.class.isAssignableFrom(pojo)) {
+            throw new IllegalArgumentException(Message.class.getName()
                     + " is not assignable from " + pojo.getName());
         }
         Descriptors.Descriptor descriptor = 
createProtobufNativeSchema(schemaDefinition.getPojo());
@@ -131,14 +131,13 @@ public class ProtobufNativeSchema<T extends 
GeneratedMessageV3> extends Abstract
                 .build();
         try {
             return new ProtobufNativeSchema(schemaInfo,
-                    (GeneratedMessageV3) 
pojo.getMethod("getDefaultInstance").invoke(null));
+                    (Message) 
pojo.getMethod("getDefaultInstance").invoke(null));
         } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
             throw new IllegalArgumentException(e);
         }
     }
 
-    public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T> of(
-            Class pojo, Map<String, String> properties) {
+    public static <T extends Message> ProtobufNativeSchema<T> of(Class<T> 
pojo, Map<String, String> properties) {
         return ofGenericClass(pojo, properties);
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 2e33bcda299..71e0bed6912 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl.schema;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.protobuf.Descriptors;
-import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.Message;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -41,7 +41,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 /**
  * A schema implementation to deal with protobuf generated messages.
  */
-public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> 
extends AvroBaseStructSchema<T> {
+public class ProtobufSchema<T extends Message> extends AvroBaseStructSchema<T> 
{
 
     public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
 
@@ -53,7 +53,7 @@ public class ProtobufSchema<T extends 
com.google.protobuf.GeneratedMessageV3> ex
         private final String type;
         private final String label;
         // For future nested fields
-        private final Map <String, Object> definition;
+        private final Map<String, Object> definition;
     }
 
     private static <T> org.apache.avro.Schema 
createProtobufAvroSchema(Class<T> pojo) {
@@ -89,7 +89,7 @@ public class ProtobufSchema<T extends 
com.google.protobuf.GeneratedMessageV3> ex
         }
     }
 
-    public static <T extends com.google.protobuf.GeneratedMessageV3> 
ProtobufSchema<T> of(Class<T> pojo) {
+    public static <T extends Message> ProtobufSchema<T> of(Class<T> pojo) {
         return of(pojo, new HashMap<>());
     }
 
@@ -102,8 +102,8 @@ public class ProtobufSchema<T extends 
com.google.protobuf.GeneratedMessageV3> ex
     public static <T> ProtobufSchema of(SchemaDefinition<T> schemaDefinition) {
         Class<T> pojo = schemaDefinition.getPojo();
 
-        if 
(!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
-            throw new 
IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
+        if (!Message.class.isAssignableFrom(pojo)) {
+            throw new IllegalArgumentException(Message.class.getName()
                     + " is not assignable from " + pojo.getName());
         }
 
@@ -116,14 +116,13 @@ public class ProtobufSchema<T extends 
com.google.protobuf.GeneratedMessageV3> ex
 
         try {
             return new ProtobufSchema(schemaInfo,
-                (GeneratedMessageV3) 
pojo.getMethod("getDefaultInstance").invoke(null));
+                (Message) pojo.getMethod("getDefaultInstance").invoke(null));
         } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
             throw new IllegalArgumentException(e);
         }
     }
 
-    public static <T extends com.google.protobuf.GeneratedMessageV3> 
ProtobufSchema<T> of(
-            Class pojo, Map<String, String> properties){
+    public static <T extends Message> ProtobufSchema<T> of(Class<T> pojo, 
Map<String, String> properties) {
         return ofGenericClass(pojo, properties);
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java
index 1c10608d448..0ecc0be3d8c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.impl.schema.reader;
 
-public class ProtobufNativeReader<T extends 
com.google.protobuf.GeneratedMessageV3> extends ProtobufReader<T> {
+import com.google.protobuf.Message;
+
+public class ProtobufNativeReader<T extends Message> extends ProtobufReader<T> 
{
 
     public ProtobufNativeReader(T protoMessageInstance) {
         super(protoMessageInstance);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
index a2504a660a0..a56fb30b140 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl.schema.reader;
 
 import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
 import com.google.protobuf.Parser;
 import java.io.IOException;
 import java.io.InputStream;
@@ -27,7 +28,7 @@ import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ProtobufReader<T extends com.google.protobuf.GeneratedMessageV3> 
implements SchemaReader<T> {
+public class ProtobufReader<T extends Message> implements SchemaReader<T> {
     private Parser<T> tParser;
 
     public ProtobufReader(T protoMessageInstance) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java
index 32569f7b7c0..0abd231475f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.impl.schema.writer;
 
-public class ProtobufNativeWriter<T extends 
com.google.protobuf.GeneratedMessageV3> extends ProtobufWriter<T> {
+import com.google.protobuf.Message;
+
+public class ProtobufNativeWriter<T extends Message> extends ProtobufWriter<T> 
{
 
     public ProtobufNativeWriter() {
         super();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
index 52ccec8dfaa..7bab227e181 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pulsar.client.impl.schema.writer;
 
+import com.google.protobuf.Message;
 import org.apache.pulsar.client.api.schema.SchemaWriter;
 
-public class ProtobufWriter<T extends com.google.protobuf.GeneratedMessageV3> 
implements SchemaWriter<T> {
+public class ProtobufWriter<T extends Message> implements SchemaWriter<T> {
 
     @Override
     public byte[] write(T message) {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java
new file mode 100644
index 00000000000..f9ce4ecbf97
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.protobuf.Message;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.internal.PulsarClientImplementationBinding;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ProtobufSchemaApiSignatureTest {
+
+    @Test
+    public void testSchemaProtobufTypeBounds() throws NoSuchMethodException {
+        assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF", 
Class.class), Message.class);
+        assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF", 
SchemaDefinition.class), Message.class);
+        
assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF_NATIVE", 
Class.class), Message.class);
+        assertSingleTypeParamUpperBound(
+                Schema.class.getMethod("PROTOBUF_NATIVE", 
SchemaDefinition.class), Message.class);
+    }
+
+    @Test
+    public void testBindingProtobufTypeBounds() throws NoSuchMethodException {
+        assertSingleTypeParamUpperBound(
+                
PulsarClientImplementationBinding.class.getMethod("newProtobufSchema", 
SchemaDefinition.class),
+                Message.class);
+        assertSingleTypeParamUpperBound(
+                
PulsarClientImplementationBinding.class.getMethod("newProtobufNativeSchema", 
SchemaDefinition.class),
+                Message.class);
+    }
+
+    private static void assertSingleTypeParamUpperBound(Method method, Type 
expectedBound) {
+        TypeVariable<Method>[] typeParameters = method.getTypeParameters();
+        Assert.assertEquals(typeParameters.length, 1, method + " should define 
one type parameter");
+        Type[] bounds = typeParameters[0].getBounds();
+        Assert.assertEquals(bounds.length, 1, method + " should define one 
type bound");
+        Assert.assertEquals(bounds[0], expectedBound, method + " has 
unexpected type bound");
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
index a0ea3c1cc6e..ed918cd0b8c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
@@ -23,9 +23,11 @@ import static org.testng.Assert.assertNotNull;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.protobuf.Any;
 import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.StringValue;
 import com.google.protobuf.util.JsonFormat;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -93,6 +95,19 @@ public class ProtobufNativeSchemaTest {
         assertEquals(message.getStringField(), stringFieldValue);
     }
 
+    @Test
+    public void testSchemaApiSupportsMessageBound() {
+        Any any = 
Any.pack(StringValue.newBuilder().setValue("native-message").build());
+        org.apache.pulsar.client.api.Schema<Any> protobufSchema =
+                org.apache.pulsar.client.api.Schema.PROTOBUF_NATIVE(Any.class);
+
+        byte[] bytes = protobufSchema.encode(any);
+        Any message = protobufSchema.decode(bytes);
+
+        assertEquals(protobufSchema.getSchemaInfo().getType(), 
SchemaType.PROTOBUF_NATIVE);
+        assertEquals(message, any);
+    }
+
     @Test
     public void testSchema() throws Exception {
         
ProtobufNativeSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> 
protobufSchema =
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
index 5acdd5b1b1c..9e283b6799d 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.protobuf.Any;
+import com.google.protobuf.StringValue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import java.util.ArrayList;
@@ -121,6 +123,17 @@ public class ProtobufSchemaTest {
         Assert.assertEquals(message.getName(), NAME);
     }
 
+    @Test
+    public void testSchemaApiSupportsMessageBound() {
+        Any any = Any.pack(StringValue.newBuilder().setValue(NAME).build());
+        org.apache.pulsar.client.api.Schema<Any> protobufSchema =
+                org.apache.pulsar.client.api.Schema.PROTOBUF(Any.class);
+
+        byte[] bytes = protobufSchema.encode(any);
+        Any message = protobufSchema.decode(bytes);
+        Assert.assertEquals(message, any);
+    }
+
     @Test
     public void testSchema() {
         ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> 
protobufSchema =
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 50cb1fb54e8..56dc802affb 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -1182,7 +1182,7 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
 
     private static boolean isProtobufClass(Class<?> pojoClazz) {
         try {
-            Class<?> protobufBaseClass = 
Class.forName("com.google.protobuf.GeneratedMessageV3");
+            Class<?> protobufBaseClass = 
Class.forName("com.google.protobuf.Message");
             return protobufBaseClass.isAssignableFrom(pojoClazz);
         } catch (ClassNotFoundException | NoClassDefFoundError e) {
             // If sink does not have protobuf in classpath then it cannot be 
protobuf
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index d8ae6b19f4a..81f4de49682 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -196,7 +196,7 @@ public class TopicSchema {
 
     private static boolean isProtobufClass(Class<?> pojoClazz) {
         try {
-            Class<?> protobufBaseClass = 
Class.forName("com.google.protobuf.GeneratedMessageV3");
+            Class<?> protobufBaseClass = 
Class.forName("com.google.protobuf.Message");
             return protobufBaseClass.isAssignableFrom(pojoClazz);
         } catch (ClassNotFoundException | NoClassDefFoundError e) {
             // If function does not have protobuf in classpath then it cannot 
be protobuf
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 7cae03c8f5f..eea1c9cc966 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.protobuf.Any;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.Duration;
@@ -44,6 +45,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
@@ -245,6 +247,14 @@ public class JavaInstanceRunnableTest {
         Assert.assertEquals(javaInstanceRunnable.getMetrics(), 
InstanceCommunication.MetricsData.newBuilder().build());
     }
 
+    @Test
+    public void testDefaultSchemaTypeInfersProtobufForMessageBaseClass() 
throws Exception {
+        Method method = 
JavaInstanceRunnable.class.getDeclaredMethod("getDefaultSchemaType", 
Class.class);
+        method.setAccessible(true);
+        SchemaType schemaType = (SchemaType) method.invoke(null, Any.class);
+        Assert.assertEquals(schemaType, SchemaType.PROTOBUF);
+    }
+
     @Test
     public void testSinkConfigParsingPreservesOriginalType() throws Exception {
         final Map<String, Object> parsedConfig = 
JavaInstanceRunnable.augmentAndFilterConnectorConfig(
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
index fa6d1a533dc..6b065fc237a 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
@@ -18,10 +18,16 @@
  */
 package org.apache.pulsar.functions.source;
 
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import com.google.protobuf.Any;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
@@ -53,6 +59,16 @@ public class TopicSchemaTest {
         assertEquals(schema.getClass(), ProtobufNativeSchema.class);
     }
 
+    @Test
+    public void testDefaultSchemaTypeInfersProtobufForMessageBaseClass() {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+        TopicSchema topicSchema = new TopicSchema(client, 
Thread.currentThread().getContextClassLoader());
+        Schema<?> schema = 
topicSchema.getSchema("public/default/test-protobuf-default", Any.class, 
Optional.empty());
+        assertEquals(schema.getClass(), ProtobufSchema.class);
+    }
+
     private static class DummyClass {
     }
 }

Reply via email to