Missed the statement about needing to acknowledge receipt of a message in a certain amount of time, my suggestion of using reshuffle won't work.
On Thu, Jul 19, 2018 at 5:26 PM Raghu Angadi <rang...@google.com> wrote: > A timestamp for a message is fundamental to an element in a PCollection. > What do you mean by not knowing timestamp of a message? > There is finalizeCheckpoint API[1] in UnboundedSource. Does that help? > PubSub is also very similar, a message need to be acked with in a timeout, > otherwise it will be redelivered to one of the consumer. Pubsub messages > are acked inside finalize(). > > [1]: > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129 > > On Thu, Jul 19, 2018 at 3:28 PM John Rudolf Lewis <johnrle...@gmail.com> > wrote: > >> hmm... made lots of progress on this today. But need help understanding >> something.... >> >> UnboundedSource seems to assume that there is some guarantee of message >> ordering, and that you can get the timestamp of the current message. Using >> UnboundedSource.CheckpointMark to help advance the offset. Seems to work ok >> for any source that supports those assumptions. But SQS does not work this >> way. >> >> With a standard SQS queue, there is no guarantee of ordering and there is >> no timestamp for a message. With SQS, one needs to call the delete api >> using the receipt handle from the message to acknowledge receipt of a >> message and prevent its redelivery after the visibility timeout has expired. >> >> I'm not sure how to adapt these two patterns and would welcome >> suggestions. >> >> >> >> On Thu, Jul 19, 2018 at 7:40 AM, Jean-Baptiste Onofré <j...@nanthrax.net> >> wrote: >> >>> Thx John ! >>> >>> Regards >>> JB >>> >>> On 19/07/2018 16:39, John Rudolf Lewis wrote: >>> > Thank you. >>> > >>> > I've created a jira ticket to add SQS and have assigned it to >>> > myself: https://issues.apache.org/jira/browse/BEAM-4828 >>> > >>> > Modified the documentation to show it as in-progress: >>> > https://github.com/apache/beam/pull/5995 >>> > >>> > And will be starting my work >>> > here: https://github.com/JohnRudolfLewis/beam/tree/Add-SqsIO >>> > >>> > On Thu, Jul 19, 2018 at 1:43 AM, Jean-Baptiste Onofré <j...@nanthrax.net >>> > <mailto:j...@nanthrax.net>> wrote: >>> > >>> > Agree with Ismaël. >>> > >>> > I would be more than happy to help on this one (as I contributed >>> on AMQP >>> > and JMS IOs ;)). >>> > >>> > Regards >>> > JB >>> > >>> > On 19/07/2018 10:39, Ismaël Mejía wrote: >>> > > Thanks for your interest John, it would be a really nice >>> contribution >>> > > to add SQS support. >>> > > >>> > > Some context on the kinesis stuff: >>> > > >>> > > The reason why kinesis is still in a separate module is more >>> related >>> > > to a licensing problem. Kinesis uses some native libraries that >>> are >>> > > published under a not 100% apache compatible license and we are >>> not >>> > > allowed to shade and republish them but it seems there is a >>> workaround >>> > > now, for more details see >>> > > https://issues.apache.org/jira/browse/BEAM-3549 >>> > <https://issues.apache.org/jira/browse/BEAM-3549> >>> > > In any case if to use SQS you only need the Apache licensed >>> aws-sdk >>> > > deps it is ok (and a good idea) if you put it in the >>> > > amazon-web-services module. >>> > > >>> > > The kinesis connector is way more complex for multiple reasons, >>> first, >>> > > the raw version of the amazon client libraries is not so >>> ‘friendly’ >>> > > and the guys who created KinesisIO had to do some workarounds to >>> > > provide accurate checkpointing/watermarks. So since SQS is a way >>> > > simpler system you should probably be ok basing it in simpler >>> sources >>> > > like AMQP or JMS. >>> > > >>> > > If you feel like to, please create the JIRA and don’t hesitate >>> to ask >>> > > questions if you find issues or if you need some review. >>> > > >>> > > On Thu, Jul 19, 2018 at 12:55 AM Lukasz Cwik <lc...@google.com >>> > <mailto:lc...@google.com>> wrote: >>> > >> >>> > >> >>> > >> >>> > >> On Wed, Jul 18, 2018 at 3:30 PM John Rudolf Lewis >>> > <johnrle...@gmail.com <mailto:johnrle...@gmail.com>> wrote: >>> > >>> >>> > >>> I need an SQS source for my project that is using beam. A brief >>> > search did not turn up any in-progress work in this area. Please >>> > point me to the right repo if I missed it. >>> > >> >>> > >> >>> > >> To my knowledge there is none and nobody has marked it in >>> > progress on https://beam.apache.org/documentation/io/built-in/ >>> > <https://beam.apache.org/documentation/io/built-in/>. It would be >>> > good to create a JIRA issue on https://issues.apache.org/ and >>> send a >>> > PR to add SQS to the inprogress list referencing your JIRA. I added >>> > you as a contributor in JIRA so you should be able to assign >>> > yourself to any issues that you create. >>> > >> >>> > >>> >>> > >>> Assuming there is no in-progress effort, I would like to >>> > contribute an Amazon SQS source. I have a few questions before I >>> begin. >>> > >> >>> > >> >>> > >> Great, note that this is a good starting point for authoring an >>> > IO transform: >>> > https://beam.apache.org/documentation/io/authoring-overview/ >>> > <https://beam.apache.org/documentation/io/authoring-overview/> >>> > >> >>> > >>> >>> > >>> >>> > >>> It seems that the current AWS code is split into two different >>> > modules: sdk/java/io/amazon-web-services which contains the >>> > S3FileSystem, AwsOptions, etc, and sdk/java/io/kinesis which >>> > contains an unbounded source based on a kinesis topic. I'd like to >>> > add this source to the amazon-web-services module since I'd like to >>> > depend on AwsOptions. Does adding this source to the >>> > amazon-web-services module make sense? >>> > >> >>> > >> >>> > >> Putting it inside of amazon-web-services makes a lot of sense. >>> > The Google connectors all live within the one package and there has >>> > been discussion to consolidate all the AWS stuff under >>> > amazon-web-services. >>> > >> >>> > >>> >>> > >>> Also, the kinesis source looks a touch more complex than other >>> > sources. Both the JMS and AMQP sources look like better examples to >>> > follow. Which existing source would be the best to model this >>> > contribution after? >>> > >> >>> > >> >>> > >> Some of it has to do with how many ways a source can be read and >>> > how complicated the watermark tracking but it would be best if the >>> > IO authors comment on implementation details. >>> > >> >>> > >>> >>> > >>> If anyone has put some thoughts into this, or better yet some >>> > code, I'd appreciate hearing from you. >>> > >>> >>> > >>> Thanks! >>> > >>> >>> > >>> > -- >>> > Jean-Baptiste Onofré >>> > jbono...@apache.org <mailto:jbono...@apache.org> >>> > http://blog.nanthrax.net >>> > Talend - http://www.talend.com >>> > >>> > >>> >>> -- >>> Jean-Baptiste Onofré >>> jbono...@apache.org >>> http://blog.nanthrax.net >>> Talend - http://www.talend.com >>> >> >>