[
https://issues.apache.org/jira/browse/BAHIR-203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Tachev updated BAHIR-203:
-------------------------------
Description:
Hi,
We have a use case where acknowledgement has to be sent at a later stage when
streaming data from google pubsub. Any chance for the acknowledgement in
PubsubReceiver to be made optional and ackId to be included in the
SparkPubsubMessage model?
Example:
{code:java}
store(receivedMessages
.map(x => {
val sm = new SparkPubsubMessage
sm.message = x.getMessage
sm.ackId = x.getAckId
sm
})
.iterator)
if ( ... ) {
val ackRequest = new AcknowledgeRequest()
ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
client.projects().subscriptions().acknowledge(subscriptionFullName,
ackRequest).execute()
}{code}
was:
Hi,
We have a use case where acknowledgement has to be sent at a later stage when
streaming data from google pubsub. Any chance for the acknowledgement in
PubsubReceiver to be made optional and ackId to be included in the
SparkPubsubMessage model?
Example:
{code:language=scala}
{code:java}
store(receivedMessages
.map(x => {
val sm = new SparkPubsubMessage
sm.message = x.getMessage
sm.ackId = x.getAckId
sm
})
.iterator)
if ( ... ) {
val ackRequest = new AcknowledgeRequest()
ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
client.projects().subscriptions().acknowledge(subscriptionFullName,
ackRequest).execute()
}{code}
> Pubsub manual acknowledgement
> ------------------------------
>
> Key: BAHIR-203
> URL: https://issues.apache.org/jira/browse/BAHIR-203
> Project: Bahir
> Issue Type: Improvement
> Components: Spark Streaming Connectors
> Reporter: Danny Tachev
> Priority: Minor
> Fix For: Spark-2.3.0, Spark-2.4.0
>
>
> Hi,
> We have a use case where acknowledgement has to be sent at a later stage when
> streaming data from google pubsub. Any chance for the acknowledgement in
> PubsubReceiver to be made optional and ackId to be included in the
> SparkPubsubMessage model?
> Example:
> {code:java}
> store(receivedMessages
> .map(x => {
> val sm = new SparkPubsubMessage
> sm.message = x.getMessage
> sm.ackId = x.getAckId
> sm
> })
> .iterator)
> if ( ... ) {
> val ackRequest = new AcknowledgeRequest()
> ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
> client.projects().subscriptions().acknowledge(subscriptionFullName,
> ackRequest).execute()
> }{code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)