This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 18c26e0 Do not create any producer if the output type of a function is void (#2756) 18c26e0 is described below commit 18c26e07202ed8d234bffd91c4e12163460ea8c9 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Wed Oct 10 06:11:00 2018 -0700 Do not create any producer if the output type of a function is void (#2756) * Do not create any producer if the output type of a function is void * Do not write if the record is null * Revert the check for null --- .../java/org/apache/pulsar/functions/sink/PulsarSink.java | 6 +++++- .../org/apache/pulsar/functions/sink/PulsarSinkTest.java | 12 +++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 024638b..8c0c29f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -218,6 +218,10 @@ public class PulsarSink<T> implements Sink<T> { log.info("Opening pulsar sink with config: {}", pulsarSinkConfig); Schema<T> schema = initializeSchema(); + if (schema == null) { + log.info("Since output type is null, not creating any real sink"); + return; + } FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees(); switch (processingGuarantees) { @@ -283,7 +287,7 @@ public class PulsarSink<T> implements Sink<T> { if (Void.class.equals(typeArg)) { // return type is 'void', so there's no schema to check - return (Schema<T>) Schema.BYTES; + return null; } if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) { diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 4722c6f..7dfe6a7 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -42,14 +42,7 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.SerDe; @@ -169,7 +162,8 @@ public class PulsarSinkTest { PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); try { - pulsarSink.initializeSchema(); + Schema schema = pulsarSink.initializeSchema(); + assertEquals(schema, (Schema)null); } catch (Exception ex) { ex.printStackTrace(); assertEquals(ex, null);