This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d0e5d96 [function] enable protobuf-native schema support for function (#11868) d0e5d96 is described below commit d0e5d96185336f56a7599e97474a6074cf6b76a7 Author: Neng Lu <n...@streamnative.io> AuthorDate: Wed Sep 1 21:48:02 2021 -0700 [function] enable protobuf-native schema support for function (#11868) Fixes #11721 ### Motivation Enable function proces topic with protobuf_native schema ### Modifications update `TopicSchema` --- .../pulsar/functions/source/TopicSchema.java | 4 ++ .../pulsar/functions/source/TopicSchemaTest.java | 58 ++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index dcd424e..067a793 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.schema.KeyValue; @@ -172,6 +173,9 @@ public class TopicSchema { case PROTOBUF: return ProtobufSchema.ofGenericClass(clazz, new HashMap<>()); + case PROTOBUF_NATIVE: + return ProtobufNativeSchema.ofGenericClass(clazz, new HashMap<>()); + case AUTO_PUBLISH: return (Schema<T>) Schema.AUTO_PRODUCE_BYTES(); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java new file mode 100644 index 0000000..c746093 --- /dev/null +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.source; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; +import org.apache.pulsar.client.impl.schema.ProtobufSchema; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.functions.proto.Request; +import org.junit.Test; + +import java.util.Optional; + +import static org.testng.Assert.assertEquals; + +@Slf4j +public class TopicSchemaTest { + + @Test + public void testGetSchema() { + TopicSchema topicSchema = new TopicSchema(null); + + String TOPIC = "public/default/test"; + Schema<?> schema = topicSchema.getSchema(TOPIC + "1", DummyClass.class, Optional.of(SchemaType.JSON)); + assertEquals(schema.getClass(), JSONSchema.class); + + schema = topicSchema.getSchema(TOPIC + "2", DummyClass.class, Optional.of(SchemaType.AVRO)); + assertEquals(schema.getClass(), AvroSchema.class); + + // use an arbitrary protobuf class for testing purpose + schema = topicSchema.getSchema(TOPIC + "3", Request.ServiceRequest.class, Optional.of(SchemaType.PROTOBUF)); + assertEquals(schema.getClass(), ProtobufSchema.class); + + schema = topicSchema.getSchema(TOPIC + "4", Request.ServiceRequest.class, Optional.of(SchemaType.PROTOBUF_NATIVE)); + assertEquals(schema.getClass(), ProtobufNativeSchema.class); + } + + private static class DummyClass {} +}