[ https://issues.apache.org/jira/browse/STORM-3981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Richard Zowalla closed STORM-3981. ---------------------------------- Resolution: Won't Fix Storm Pulsar will be removed in the next release of Storm. See the related discussion on the mailing list > Negative Acknowledge not implemented for Pulsar Storm Adapter > -------------------------------------------------------------- > > Key: STORM-3981 > URL: https://issues.apache.org/jira/browse/STORM-3981 > Project: Apache Storm > Issue Type: Bug > Reporter: Karunam goyal > Priority: Major > > [https://github.com/apache/storm/blob/a837e6add1fee99115eb426077f6e62fd406eea2/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpout.java] > v2.11.0 pulsar-storm > There is no way to negatively acknowledge the consumer and the registry > method for DeadLetterPolicy is broken > {{ConsumerConfigurationData<byte[]> subscriptionConfig = new > ConsumerConfigurationData<>(); > subscriptionConfig.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); > subscriptionConfig.setSubscriptionType(SubscriptionType.Shared); > subscriptionConfig.setDeadLetterPolicy(DeadLetterPolicy.builder() > .deadLetterTopic(viestiSourceConfig.getDeadLetterTopic()).build());}} > {{PulsarSpoutV2 pulsarSpout = new PulsarSpoutV2( spoutConfiguration, > ((ClientBuilderImpl) createBuilder(viestiSourceConfig)) > .getClientConfigurationData() .clone(), subscriptionConfig);}} > This above code doesnt stick while creating PulsarSpout. > > {{static class SpoutConsumer implements PulsarSpoutConsumer {}} > {{private Consumer<byte[]> consumer;}} > {{public SpoutConsumer(Consumer<byte[]> consumer) \{ > this.consumer = consumer; > } > public Message<byte[]> receive(int timeout, TimeUnit unit) throws > PulsarClientException \{ > return this.consumer.receive(timeout, unit); > } > public void acknowledgeAsync(Message<?> msg) \{ > this.consumer.acknowledgeAsync(msg); > } > public void close() throws PulsarClientException \{ > this.consumer.close(); > } > public void unsubscribe() throws PulsarClientException \{ > this.consumer.unsubscribe(); > } > }}} > > Also there is no Mechanism to negativelyAcknowledge a message. > *Expected behavior* > While Setting DeadletterPolicy It should not drop it while serialising. > Negative Acks support should be there > *Desktop (please complete the following information):* > MacOs Ventura 13.4.1 > java version "1.8.0_333" > Java(TM) SE Runtime Environment (build 1.8.0_333-b02) > Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode) > Apache Storm 2.2.1 > Trying to consume from Pulsar Topic in Apache Storm -- This message was sent by Atlassian Jira (v8.20.10#820010)