[ 
https://issues.apache.org/jira/browse/STORM-3981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Zowalla closed STORM-3981.
----------------------------------
    Resolution: Won't Fix

Storm Pulsar will be removed in the next release of Storm. See the related 
discussion on the mailing list

> Negative Acknowledge not implemented for Pulsar Storm Adapter 
> --------------------------------------------------------------
>
>                 Key: STORM-3981
>                 URL: https://issues.apache.org/jira/browse/STORM-3981
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Karunam goyal
>            Priority: Major
>
> [https://github.com/apache/storm/blob/a837e6add1fee99115eb426077f6e62fd406eea2/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpout.java]
> v2.11.0 pulsar-storm
> There is no way to negatively acknowledge the consumer and the registry 
> method for DeadLetterPolicy is broken
> {{ConsumerConfigurationData<byte[]> subscriptionConfig = new 
> ConsumerConfigurationData<>(); 
> subscriptionConfig.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
>  subscriptionConfig.setSubscriptionType(SubscriptionType.Shared); 
> subscriptionConfig.setDeadLetterPolicy(DeadLetterPolicy.builder() 
> .deadLetterTopic(viestiSourceConfig.getDeadLetterTopic()).build());}}
> {{PulsarSpoutV2 pulsarSpout = new PulsarSpoutV2( spoutConfiguration, 
> ((ClientBuilderImpl) createBuilder(viestiSourceConfig)) 
> .getClientConfigurationData() .clone(), subscriptionConfig);}}
> This above code doesnt stick while creating PulsarSpout.
>  
> {{static class SpoutConsumer implements PulsarSpoutConsumer {}}
> {{private Consumer<byte[]> consumer;}}
> {{public SpoutConsumer(Consumer<byte[]> consumer) \{
>         this.consumer = consumer;
>     }
>     public Message<byte[]> receive(int timeout, TimeUnit unit) throws 
> PulsarClientException \{
>         return this.consumer.receive(timeout, unit);
>     }
>     public void acknowledgeAsync(Message<?> msg) \{
>         this.consumer.acknowledgeAsync(msg);
>     }
>     public void close() throws PulsarClientException \{
>         this.consumer.close();
>     }
>     public void unsubscribe() throws PulsarClientException \{
>         this.consumer.unsubscribe();
>     }
> }}}
>  
> Also there is no Mechanism to negativelyAcknowledge a message.
> *Expected behavior*
> While Setting DeadletterPolicy It should not drop it while serialising. 
> Negative Acks support should be there
> *Desktop (please complete the following information):*
> MacOs Ventura 13.4.1
> java version "1.8.0_333"
> Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
> Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
> Apache Storm 2.2.1
> Trying to consume from Pulsar Topic in Apache Storm



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to