So I guess I can add a timestamp to the message attributes when i receive
it from SQS since there is no such built in property.

But what triggers finilizeCheckpoint to be called? So far in my testing, I
never see that method get called, and hence, my messages keep getting
redelivered.

On Thu, Jul 19, 2018 at 5:26 PM, Raghu Angadi <rang...@google.com> wrote:

> A timestamp for a message is fundamental to an element in a PCollection.
> What do you mean by not knowing timestamp of a message?
> There is finalizeCheckpoint API[1] in UnboundedSource. Does that help?
> PubSub is also very similar, a message need to be acked with in a timeout,
> otherwise it will be redelivered to one of the consumer. Pubsub messages
> are acked inside finalize().
>
> [1]: https://github.com/apache/beam/blob/master/sdks/
> java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
>
>
> On Thu, Jul 19, 2018 at 3:28 PM John Rudolf Lewis <johnrle...@gmail.com>
> wrote:
>
>> hmm... made lots of progress on this today. But need help understanding
>> something....
>>
>> UnboundedSource seems to assume that there is some guarantee of message
>> ordering, and that you can get the timestamp of the current message. Using
>> UnboundedSource.CheckpointMark to help advance the offset. Seems to work ok
>> for any source that supports those assumptions. But SQS does not work this
>> way.
>>
>> With a standard SQS queue, there is no guarantee of ordering and there is
>> no timestamp for a message.  With SQS, one needs to call the delete api
>> using the receipt handle from the message to acknowledge receipt of a
>> message and prevent its redelivery after the visibility timeout has expired.
>>
>> I'm not sure how to adapt these two patterns and would welcome
>> suggestions.
>>
>>
>>
>> On Thu, Jul 19, 2018 at 7:40 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
>> wrote:
>>
>>> Thx John !
>>>
>>> Regards
>>> JB
>>>
>>> On 19/07/2018 16:39, John Rudolf Lewis wrote:
>>> > Thank you.
>>> >
>>> > I've created a jira ticket to add SQS and have assigned it to
>>> > myself: https://issues.apache.org/jira/browse/BEAM-4828
>>> >
>>> > Modified the documentation to show it as in-progress:
>>> > https://github.com/apache/beam/pull/5995
>>> >
>>> > And will be starting my work
>>> > here: https://github.com/JohnRudolfLewis/beam/tree/Add-SqsIO
>>> >
>>> > On Thu, Jul 19, 2018 at 1:43 AM, Jean-Baptiste Onofré <j...@nanthrax.net
>>> > <mailto:j...@nanthrax.net>> wrote:
>>> >
>>> >     Agree with Ismaël.
>>> >
>>> >     I would be more than happy to help on this one (as I contributed
>>> on AMQP
>>> >     and JMS IOs ;)).
>>> >
>>> >     Regards
>>> >     JB
>>> >
>>> >     On 19/07/2018 10:39, Ismaël Mejía wrote:
>>> >     > Thanks for your interest John, it would be a really nice
>>> contribution
>>> >     > to add SQS support.
>>> >     >
>>> >     > Some context on the kinesis stuff:
>>> >     >
>>> >     > The reason why kinesis is still in a separate module is more
>>> related
>>> >     > to a licensing problem. Kinesis uses some native libraries that
>>> are
>>> >     > published under a not 100% apache compatible license and we are
>>> not
>>> >     > allowed to shade and republish them but it seems there is a
>>> workaround
>>> >     > now, for more details see
>>> >     > https://issues.apache.org/jira/browse/BEAM-3549
>>> >     <https://issues.apache.org/jira/browse/BEAM-3549>
>>> >     > In any case if to use SQS you only need the Apache licensed
>>> aws-sdk
>>> >     > deps it is ok (and a good idea) if you put it in the
>>> >     > amazon-web-services module.
>>> >     >
>>> >     > The kinesis connector is way more complex for multiple reasons,
>>> first,
>>> >     > the raw version of the amazon client libraries is not so
>>> ‘friendly’
>>> >     > and the guys who created KinesisIO had to do some workarounds to
>>> >     > provide accurate checkpointing/watermarks. So since SQS is a way
>>> >     > simpler system you should probably be ok basing it in simpler
>>> sources
>>> >     > like AMQP or JMS.
>>> >     >
>>> >     > If you feel like to, please create the JIRA and don’t hesitate
>>> to ask
>>> >     > questions if you find issues or if you need some review.
>>> >     >
>>> >     > On Thu, Jul 19, 2018 at 12:55 AM Lukasz Cwik <lc...@google.com
>>> >     <mailto:lc...@google.com>> wrote:
>>> >     >>
>>> >     >>
>>> >     >>
>>> >     >> On Wed, Jul 18, 2018 at 3:30 PM John Rudolf Lewis
>>> >     <johnrle...@gmail.com <mailto:johnrle...@gmail.com>> wrote:
>>> >     >>>
>>> >     >>> I need an SQS source for my project that is using beam. A brief
>>> >     search did not turn up any in-progress work in this area. Please
>>> >     point me to the right repo if I missed it.
>>> >     >>
>>> >     >>
>>> >     >> To my knowledge there is none and nobody has marked it in
>>> >     progress on https://beam.apache.org/documentation/io/built-in/
>>> >     <https://beam.apache.org/documentation/io/built-in/>. It would be
>>> >     good to create a JIRA issue on https://issues.apache.org/ and
>>> send a
>>> >     PR to add SQS to the inprogress list referencing your JIRA. I added
>>> >     you as a contributor in JIRA so you should be able to assign
>>> >     yourself to any issues that you create.
>>> >     >>
>>> >     >>>
>>> >     >>> Assuming there is no in-progress effort, I would like to
>>> >     contribute an Amazon SQS source. I have a few questions before I
>>> begin.
>>> >     >>
>>> >     >>
>>> >     >> Great, note that this is a good starting point for authoring an
>>> >     IO transform:
>>> >     https://beam.apache.org/documentation/io/authoring-overview/
>>> >     <https://beam.apache.org/documentation/io/authoring-overview/>
>>> >     >>
>>> >     >>>
>>> >     >>>
>>> >     >>> It seems that the current AWS code is split into two different
>>> >     modules: sdk/java/io/amazon-web-services which contains the
>>> >     S3FileSystem, AwsOptions, etc, and sdk/java/io/kinesis which
>>> >     contains an unbounded source based on a kinesis topic. I'd like to
>>> >     add this source to the amazon-web-services module since I'd like to
>>> >     depend on AwsOptions. Does adding this source to the
>>> >     amazon-web-services module make sense?
>>> >     >>
>>> >     >>
>>> >     >> Putting it inside of amazon-web-services makes a lot of sense.
>>> >     The Google connectors all live within the one package and there has
>>> >     been discussion to consolidate all the AWS stuff under
>>> >     amazon-web-services.
>>> >     >>
>>> >     >>>
>>> >     >>> Also, the kinesis source looks a touch more complex than other
>>> >     sources. Both the JMS and AMQP sources look like better examples to
>>> >     follow. Which existing source would be the best to model this
>>> >     contribution after?
>>> >     >>
>>> >     >>
>>> >     >> Some of it has to do with how many ways a source can be read and
>>> >     how complicated the watermark tracking but it would be best if the
>>> >     IO authors comment on implementation details.
>>> >     >>
>>> >     >>>
>>> >     >>> If anyone has put some thoughts into this, or better yet some
>>> >     code, I'd appreciate hearing from you.
>>> >     >>>
>>> >     >>> Thanks!
>>> >     >>>
>>> >
>>> >     --
>>> >     Jean-Baptiste Onofré
>>> >     jbono...@apache.org <mailto:jbono...@apache.org>
>>> >     http://blog.nanthrax.net
>>> >     Talend - http://www.talend.com
>>> >
>>> >
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>>

Reply via email to