[ 
https://issues.apache.org/jira/browse/BAHIR-203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Tachev updated BAHIR-203:
-------------------------------
    Description: 
Hi, 

We have a use case where acknowledgement has to be sent at a later stage when 
streaming data from google pubsub. Any chance for the acknowledgement in 
PubsubReceiver to be made optional and ackId to be included in the 
SparkPubsubMessage model?

Example:
{code:java}
store(receivedMessages
    .map(x => {
      val sm = new SparkPubsubMessage
      sm.message = x.getMessage
      sm.ackId = x.getAckId
      sm
    })
    .iterator)

if ( ... ) {
  val ackRequest = new AcknowledgeRequest()
  ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
  client.projects().subscriptions().acknowledge(subscriptionFullName, 
ackRequest).execute()
}{code}
 

  was:
Hi, 

We have a use case where acknowledgement has to be sent at a later stage when 
streaming data from google pubsub. Any chance for the acknowledgement in 
PubsubReceiver to be made optional and ackId to be included in the 
SparkPubsubMessage model?

Example:

{code:language=scala}
{code:java}
store(receivedMessages
    .map(x => {
      val sm = new SparkPubsubMessage
      sm.message = x.getMessage
      sm.ackId = x.getAckId
      sm
    })
    .iterator)

if ( ... ) {
  val ackRequest = new AcknowledgeRequest()
  ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
  client.projects().subscriptions().acknowledge(subscriptionFullName, 
ackRequest).execute()
}{code}
 


> Pubsub manual acknowledgement 
> ------------------------------
>
>                 Key: BAHIR-203
>                 URL: https://issues.apache.org/jira/browse/BAHIR-203
>             Project: Bahir
>          Issue Type: Improvement
>          Components: Spark Streaming Connectors
>            Reporter: Danny Tachev
>            Priority: Minor
>             Fix For: Spark-2.3.0, Spark-2.4.0
>
>
> Hi, 
> We have a use case where acknowledgement has to be sent at a later stage when 
> streaming data from google pubsub. Any chance for the acknowledgement in 
> PubsubReceiver to be made optional and ackId to be included in the 
> SparkPubsubMessage model?
> Example:
> {code:java}
> store(receivedMessages
>     .map(x => {
>       val sm = new SparkPubsubMessage
>       sm.message = x.getMessage
>       sm.ackId = x.getAckId
>       sm
>     })
>     .iterator)
> if ( ... ) {
>   val ackRequest = new AcknowledgeRequest()
>   ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
>   client.projects().subscriptions().acknowledge(subscriptionFullName, 
> ackRequest).execute()
> }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to