sijie closed pull request #2717: [schema] use AUTO_PRODUCE schema when possible URL: https://github.com/apache/pulsar/pull/2717
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index ff85e3436c..f209c483aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -30,6 +30,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -74,7 +75,7 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); - this.producerBuilder = client.newProducer() // + this.producerBuilder = client.newProducer(Schema.AUTO_PRODUCE_BYTES()) // .topic(topicName) .messageRoutingMode(MessageRoutingMode.SinglePartition) .enableBatching(false) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 2f599eeac8..a57889b3ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -268,7 +269,7 @@ void shutdown() throws Exception { this.namespace = dest.getNamespace(); this.topicName = dest.toString(); client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build(); - producer = client.newProducer() + producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES()) .topic(topicName) .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) @@ -281,7 +282,7 @@ void shutdown() throws Exception { this.namespace = dest.getNamespace(); this.topicName = dest.toString(); client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build(); - ProducerBuilder<byte[]> producerBuilder = client.newProducer() + ProducerBuilder<byte[]> producerBuilder = client.newProducer(Schema.AUTO_PRODUCE_BYTES()) .topic(topicName) .enableBatching(batch) .batchingMaxPublishDelay(1, TimeUnit.SECONDS) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java index 5a9a651527..ede4b358fb 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java @@ -22,6 +22,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; public class PulsarProducerKafkaConfig { @@ -35,7 +36,7 @@ public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages"; public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) { - ProducerBuilder<byte[]> producerBuilder = client.newProducer(); + ProducerBuilder<byte[]> producerBuilder = client.newProducer(Schema.AUTO_PRODUCE_BYTES()); if (properties.containsKey(PRODUCER_NAME)) { producerBuilder.producerName(properties.getProperty(PRODUCER_NAME)); diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java index 36520a9ee7..c3b9ecc57b 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java @@ -23,16 +23,20 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; /** * Auto detect schema. */ public class AutoProduceBytesSchema<T> implements Schema<byte[]> { + private boolean requireSchemaValidation = true; private Schema<T> schema; public void setSchema(Schema<T> schema) { this.schema = schema; + this.requireSchemaValidation = SchemaType.BYTES != schema.getSchemaInfo().getType() + && SchemaType.NONE != schema.getSchemaInfo().getType(); } private void ensureSchemaInitialized() { @@ -43,8 +47,10 @@ private void ensureSchemaInitialized() { public byte[] encode(byte[] message) { ensureSchemaInitialized(); - // verify if the message can be decoded by the underlying schema - schema.decode(message); + if (requireSchemaValidation) { + // verify if the message can be decoded by the underlying schema + schema.decode(message); + } return message; } @@ -53,8 +59,10 @@ private void ensureSchemaInitialized() { public byte[] decode(byte[] bytes) { ensureSchemaInitialized(); - // verify the message can be detected by the underlying schema - schema.decode(bytes); + if (requireSchemaValidation) { + // verify the message can be detected by the underlying schema + schema.decode(bytes); + } return bytes; } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java index c559494ea3..4409e027cc 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,7 +140,9 @@ public int run() throws PulsarClientException { try { PulsarClient client = clientBuilder.build(); - Producer<byte[]> producer = client.newProducer().topic(topic).create(); + Producer<byte[]> producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES()) + .topic(topic) + .create(); List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames); RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousAsyncProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousAsyncProducer.java index a28acafdc3..185690d709 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousAsyncProducer.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousAsyncProducer.java @@ -23,16 +23,18 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; public class ContinuousAsyncProducer { public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException { PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://127.0.0.1:8080").build(); - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-tenant/my-ns/my-topic") - .blockIfQueueFull(true).create(); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-tenant/my-ns/my-topic") + .blockIfQueueFull(true).create(); while (true) { - producer.sendAsync("my-message".getBytes()); + producer.sendAsync("my-message"); } } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousProducer.java index 30ce04f747..1c920c97dd 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousProducer.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousProducer.java @@ -23,17 +23,19 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; public class ContinuousProducer { public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException { PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://127.0.0.1:8080").build(); - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-tenant/my-ns/my-topic") - .create(); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-tenant/my-ns/my-topic") + .create(); while (true) { try { - producer.send("my-message".getBytes()); + producer.send("my-message"); Thread.sleep(1000); } catch (Exception e) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducer.java index b4b3f23b9c..61c2fc6572 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducer.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducer.java @@ -31,20 +31,22 @@ import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; @Slf4j public class SampleAsyncProducer { public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException { PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build(); - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-tenant/my-ns/my-topic") - .sendTimeout(3, TimeUnit.SECONDS).create(); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-tenant/my-ns/my-topic") + .sendTimeout(3, TimeUnit.SECONDS).create(); List<CompletableFuture<MessageId>> futures = Lists.newArrayList(); for (int i = 0; i < 10; i++) { final String content = "my-message-" + i; - CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes()); + CompletableFuture<MessageId> future = producer.sendAsync(content); future.handle((v, ex) -> { if (ex == null) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleCryptoProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleCryptoProducer.java index 478969289a..83b6172cd9 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleCryptoProducer.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleCryptoProducer.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; @Slf4j public class SampleCryptoProducer { @@ -76,12 +77,13 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://127.0.0.1:8080").build(); // Setup the CryptoKeyReader with the file name where public/private key is kept - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-tenant/my-ns/my-topic") - .cryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem")) - .addEncryptionKey("myappkey").create(); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-tenant/my-ns/my-topic") + .cryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem")) + .addEncryptionKey("myappkey").create(); for (int i = 0; i < 10; i++) { - producer.send("my-message".getBytes()); + producer.send("my-message"); } pulsarClient.close(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java index 467d34cd5b..003818ed42 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java @@ -23,15 +23,17 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; public class SampleProducer { public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException { PulsarClient client = PulsarClient.builder().serviceUrl("http://localhost:6650").build(); - Producer<byte[]> producer = client.newProducer().topic("persistent://my-tenant/my-ns/my-topic").create(); + Producer<String> producer = client.newProducer(Schema.STRING) + .topic("persistent://my-tenant/my-ns/my-topic").create(); for (int i = 0; i < 10; i++) { - producer.send("my-message".getBytes()); + producer.send("my-message"); } client.close(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 4d474333b9..cf57cf4422 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -115,8 +115,10 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, Li this.inputTopics = inputTopics; this.topicSchema = new TopicSchema(client); - this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true) - .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); + this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer(Schema.AUTO_PRODUCE_BYTES()) + .blockIfQueueFull(true) + .enableBatching(true) + .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); if (config.getFunctionDetails().getUserConfig().isEmpty()) { userConfigs = new HashMap<>(); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index f5108fc6c1..4129635878 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.junit.Before; @@ -63,7 +64,9 @@ public void setup() { config.setFunctionDetails(functionDetails); logger = mock(Logger.class); client = mock(PulsarClientImpl.class); - when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES)); + AutoProduceBytesSchema<byte[]> autoProduceBytesSchema = (AutoProduceBytesSchema<byte[]>) Schema.AUTO_PRODUCE_BYTES(); + autoProduceBytesSchema.setSchema(Schema.BYTES); + when(client.newProducer(any(Schema.class))).thenReturn(new ProducerBuilderImpl(client, autoProduceBytesSchema)); when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null))) .thenReturn(CompletableFuture.completedFuture(producer)); when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services