[ https://issues.apache.org/jira/browse/BAHIR-203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Danny Tachev updated BAHIR-203: ------------------------------- Description: Hi, We have a use case where acknowledgement has to be sent at a later stage when streaming data from google pubsub. Any chance for the acknowledgement in PubsubReceiver to be made optional and ackId to be included in the SparkPubsubMessage model? Example: {code:java} store(receivedMessages .map(x => { val sm = new SparkPubsubMessage sm.message = x.getMessage sm.ackId = x.getAckId sm }) .iterator) if ( ... ) { val ackRequest = new AcknowledgeRequest() ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava) client.projects().subscriptions().acknowledge(subscriptionFullName, ackRequest).execute() }{code} was: Hi, We have a use case where acknowledgement has to be sent at a later stage when streaming data from google pubsub. Any chance for the acknowledgement in PubsubReceiver to be made optional and ackId to be included in the SparkPubsubMessage model? Example: {code:language=scala} {code:java} store(receivedMessages .map(x => { val sm = new SparkPubsubMessage sm.message = x.getMessage sm.ackId = x.getAckId sm }) .iterator) if ( ... ) { val ackRequest = new AcknowledgeRequest() ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava) client.projects().subscriptions().acknowledge(subscriptionFullName, ackRequest).execute() }{code} > Pubsub manual acknowledgement > ------------------------------ > > Key: BAHIR-203 > URL: https://issues.apache.org/jira/browse/BAHIR-203 > Project: Bahir > Issue Type: Improvement > Components: Spark Streaming Connectors > Reporter: Danny Tachev > Priority: Minor > Fix For: Spark-2.3.0, Spark-2.4.0 > > > Hi, > We have a use case where acknowledgement has to be sent at a later stage when > streaming data from google pubsub. Any chance for the acknowledgement in > PubsubReceiver to be made optional and ackId to be included in the > SparkPubsubMessage model? > Example: > {code:java} > store(receivedMessages > .map(x => { > val sm = new SparkPubsubMessage > sm.message = x.getMessage > sm.ackId = x.getAckId > sm > }) > .iterator) > if ( ... ) { > val ackRequest = new AcknowledgeRequest() > ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava) > client.projects().subscriptions().acknowledge(subscriptionFullName, > ackRequest).execute() > }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)