Sorry for being late to the party. I saw your call to vote and looked at
the FLIP.

First, most of the design is looking really good and it will be good to
have another connector integrated into the AWS ecosystem. A couple of
questions/remarks:
1) Since we only have 1 split, we should also limit the parallelism of the
source accordingly. We could exploit the DynamicParallelismInference to
effectively limit it to 1 unless the user explicitly overwrites.
2) If I haven't overlooked something obvious, then your exactly-once
strategy will not work. There is unfortunately no guarantee
that notifyCheckpointComplete is called at all before a failure happens. So
you very likely get duplicate messages.
Scanning the SQS documentation, I saw that you can read the SequenceNumber
of the message. If you also store the latest number in the checkpoint state
of the split, then you can discard all messages during recovery that are
smaller. But I have never used SQS, so just take it as an inspiration.
Also note that you didn't sketch how you delete messages from the queue.
It's very important to only delete those messages that are part of the
successful checkpoint. So you can't use PurgeQueue.
3) I wonder if you need to use ReceiveRequestAttemptId. It looks like it
may be important for retries.

Best,

Arvid

On Tue, Aug 20, 2024 at 11:24 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Abhisagar and Saurabh
> I have created the FLIP page and assigned it FLIP-477[1]. Feel free to
> resume with the next steps.
>
> 1-
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-+477+Amazon+SQS+Source+Connector
> Best Regards
> Ahmed Hamdy
>
>
> On Tue, 20 Aug 2024 at 06:05, Abhisagar Khatri <
> khatri.abhisaga...@gmail.com>
> wrote:
>
> > Hi Flink Devs,
> >
> > Gentle Reminder for the request. We'd like to ask the PMC/Committers to
> > transfer the content from the Amazon SQS Source Connector Google Doc [1]
> > and assign a FLIP Number for us, which we can use further for voting.
> > We are following the procedure outlined on the Flink Improvement Proposal
> > Confluence page [2].
> >
> > [1]
> >
> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
> >
> > Regards,
> > Abhi & Saurabh
> >
> >
> > On Tue, Aug 13, 2024 at 12:50 PM Saurabh Singh <
> saurabhsingh9...@gmail.com>
> > wrote:
> >
> >> Hi Flink Devs,
> >>
> >> Thanks for all the feedback flink devs.
> >>
> >> Following the procedure outlined on the Flink Improvement Proposal
> >> Confluence page [1], we kindly ask the PMC/Committers to transfer the
> >> content from the Amazon SQS Source Connector Google Doc [2] and assign a
> >> FLIP Number for us, which we will use for voting.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
> >> [2]
> >>
> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
> >>
> >> Regards
> >> Saurabh & Abhi
> >>
> >>
> >> On Thu, Aug 8, 2024 at 5:12 PM Saurabh Singh <
> saurabhsingh9...@gmail.com>
> >> wrote:
> >>
> >>> Hi Ahmed,
> >>>
> >>> Yes, you're correct. Currently, we're utilizing the "record emitter" to
> >>> send messages into the queue for deletion. However, for the actual
> deletion
> >>> process, which is dependent on the checkpoints, we've been using the
> source
> >>> reader class because it allows us to override the
> notifyCheckpointComplete
> >>> method.
> >>>
> >>> Regards
> >>> Saurabh & Abhi
> >>>
> >>> On Wed, Aug 7, 2024 at 2:18 PM Ahmed Hamdy <hamdy10...@gmail.com>
> wrote:
> >>>
> >>>> Hi Saurabh
> >>>> Thanks for addressing, I see the FLIP is in much better state.
> >>>> Could we specify where we queue messages for deletion, In my opinion
> >>>> the record emitter is a good place for that where we delete messages
> that
> >>>> are forwarded to the next operator.
> >>>> Other than that I don't have further comments.
> >>>> Thanks again for the effort.
> >>>>
> >>>> Best Regards
> >>>> Ahmed Hamdy
> >>>>
> >>>>
> >>>> On Wed, 31 Jul 2024 at 10:34, Saurabh Singh <
> saurabhsingh9...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi Ahmed,
> >>>>>
> >>>>> Thank you very much for the detailed, valuable review. Please find
> our
> >>>>> responses below:
> >>>>>
> >>>>>
> >>>>>    - In the FLIP you mention the split is going to be 1 sqs Queue,
> >>>>>    does this mean we would support reading from multiple queues?
> This is also
> >>>>>    not clear in the implementation of `addSplitsBack` whether we are
> planning
> >>>>>    to support multiple sqs topics or not.
> >>>>>
> >>>>> *Our current implementation assumes that each source reads from a
> >>>>> single SQS queue. If you need to read from multiple SQS queues, you
> can
> >>>>> define multiple sources accordingly. We believe this approach is
> clearer
> >>>>> and more organized compared to having a single source switch between
> >>>>> multiple queues. This design choice is based on weighing the
> benefits, but
> >>>>> we can support multiple queues per source if the need arises.*
> >>>>>
> >>>>>    - Regarding Client creation, there has been some effort in the
> >>>>>    common `aws-util` module like createAwsSyncClient, we should
> reuse that for
> >>>>>    `SqsClient` creation.
> >>>>>
> >>>>>    *Thank you for bringing this to our attention. Yes, we will
> >>>>>    utilize the existing createClient methods available in the
> libraries. Our
> >>>>>    goal is to avoid any code duplication on our end.*
> >>>>>
> >>>>>
> >>>>>    - On the same point for clients, Is there a reason the FLIP
> >>>>>    suggests async clients? sync clients have proven more stable and
> the source
> >>>>>    threading model already guarantees no blocking by sync clients.
> >>>>>
> >>>>> *We were not aware of this, and we have been using async clients for
> >>>>> our in-house use cases. However, since we already have sync clients
> in the
> >>>>> aws-util that ensure no blocking, we are in a good position. We will
> use
> >>>>> these sync clients during our development and testing efforts, and
> we will
> >>>>> share the results and keep the community updated.*
> >>>>>
> >>>>>    - On mentioning threading, the FLIP doesn’t mention the fetcher
> >>>>>    manager. Is it going to be `SingleThreadFetcherManager`? Would it
> be better
> >>>>>    to make the source reader extend the
> SingleThreadedMultiplexReaderBase or
> >>>>>    are we going to implement a more simple version?
> >>>>>
> >>>>> *Yes, we are considering implementing
> >>>>> SingleThreadMultiplexSourceReaderBase for the Reader. We have
> included the
> >>>>> implementation snippet in the FLIP for reference.*
> >>>>>
> >>>>>    - The FLIP doesn’t mention schema deserialization or the
> >>>>>    recordEmitter implementation, Are we going to use
> `deserializationSchema`
> >>>>>    or some sort of string to element converter? It is also not clear
> form the
> >>>>>    builder example provided?
> >>>>>
> >>>>> *Yes, we plan to use the deserializationSchema and recordEmitter
> >>>>> implementations. We have included sample code for these in the FLIP
> for
> >>>>> reference.*
> >>>>>
> >>>>>    - Are the values mentioned in getSqsClientProperties recommended
> >>>>>    defaults? If so we should highlight that.
> >>>>>
> >>>>> *The defaults are not decided. These are just sample snapshots for
> >>>>> example.*
> >>>>>
> >>>>>    - Most importantly I am a bit skeptical regarding enforcing
> >>>>>    exactly-once semantics with side effects especially with
> dependency on
> >>>>>    checkpointing configuration, could we add flags to disable and
> disable by
> >>>>>    default if the checkpointing is not enabled?
> >>>>>
> >>>>> *During our initial design phase, we intended to enforce exactly-once
> >>>>> semantics via checkpoints. However, you raise a valid point, and we
> will
> >>>>> make this a configurable feature for users. They can choose to
> disable
> >>>>> exactly-once semantics, accepting some duplicate processing
> (at-least-once)
> >>>>> as a trade-off. We have updated the FLIP to include support for this
> >>>>> feature.*
> >>>>>
> >>>>>    - I am not 100% convinced we should block FLIP itself on the
> >>>>>    FLIP-438 implementation but I echo the fact that there might be
> some
> >>>>>    reusable code between the 2 submodules we should make use of.
> >>>>>
> >>>>>    *Yes we echo the same. We fully understand the concern and are
> >>>>>    closely examining the SQS Sink implementation. We will ensure
> there is no
> >>>>>    duplication of work or submodules. If any issues arise, we will
> address
> >>>>>    them promptly.*
> >>>>>
> >>>>>
> >>>>> Thank you for your valuable feedback on the FLIP. Your input is
> >>>>> helping us refine and improve it significantly.
> >>>>>
> >>>>> [Main Proposal doc] -
> >>>>>
> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
> >>>>>
> >>>>> Regards
> >>>>> Saurabh & Abhi
> >>>>>
> >>>>>
> >>>>> On Sun, Jul 28, 2024 at 3:04 AM Ahmed Hamdy <hamdy10...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Saurabh
> >>>>>> I think this is going to be a valuable addition which is needed.
> >>>>>> I have a couple of comments
> >>>>>> - In the FLIP you mention the split is going to be 1 sqs Queue, does
> >>>>>> this
> >>>>>> mean we would support reading from multiple queues? The builder
> >>>>>> example
> >>>>>> seems to support a single queue
> >>>>>> SqsSource.<T>builder.setSqsUrl("
> >>>>>> https://sqs.us-east-1.amazonaws.com/23145433/sqs-test";)
> >>>>>> - This is also not clear in the implementation of `addSplitsBack`
> >>>>>> whether
> >>>>>> we are planning to support multiple sqs topics or not.
> >>>>>> - Regarding Client creation, there has been some effort in the
> common
> >>>>>> `aws-util` module like createAwsSyncClient  , we should reuse that
> for
> >>>>>> `SqsClient` creation.
> >>>>>> - On the same point for clients, Is there a reason the FLIP suggests
> >>>>>> async
> >>>>>> clients? sync clients have proven more stable and the source
> threading
> >>>>>> model already guarantees no blocking by sync clients.
> >>>>>> - On mentioning threading, the FLIP doesn't mention the fetcher
> >>>>>> manager. Is
> >>>>>> it going to be `SingleThreadFetcherManager`? Would it be better to
> >>>>>> make the
> >>>>>> source reader extend the SingleThreadedMultiplexReaderBase or are we
> >>>>>> going
> >>>>>> to implement a more simple version?
> >>>>>> - The FLIP doesn't mention schema deserialization or the
> recordEmitter
> >>>>>> implementation, Are we going to use `deserializationSchema` or some
> >>>>>> sort of
> >>>>>> string to element converter? It is also not clear form the builder
> >>>>>> example
> >>>>>> provided?
> >>>>>> - Are the values mentioned in getSqsClientProperties recommended
> >>>>>> defaults?
> >>>>>> If so we should highlight that
> >>>>>> - Most importantly I am a bit skeptical regarding enforcing
> >>>>>> exactly-once
> >>>>>> semantics with side effects especially with dependency on
> >>>>>> checkpointing
> >>>>>> configuration, could we add flags to disable and disable by default
> >>>>>> if the
> >>>>>> checkpointing is not enabled?
> >>>>>>
> >>>>>> I am not 100% convinced we should block FLIP itself on the FLIP-438
> >>>>>> implementation but I echo the fact that there might be some reusable
> >>>>>> code
> >>>>>> between the 2 submodules we should make use of.
> >>>>>>
> >>>>>>  Best Regards
> >>>>>> Ahmed Hamdy
> >>>>>>
> >>>>>>
> >>>>>> On Fri, 26 Jul 2024 at 17:55, Saurabh Singh <
> >>>>>> saurabhsingh9...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> > Hi Li Wang,
> >>>>>> >
> >>>>>> > Thanks for the review and appreciate your feedback.
> >>>>>> > I completely understand your concern and agree with it. Our goal
> is
> >>>>>> to
> >>>>>> > provide users of connectors with a consistent and coherent
> >>>>>> ecosystem, free
> >>>>>> > of any issues.
> >>>>>> > To ensure that, we are closely monitoring/reviewing the SQS Sink
> >>>>>> > implementation work by Dhingra. Our development work will commence
> >>>>>> once the
> >>>>>> > AWS Sink is near completion or completed. This approach ensures
> >>>>>> that we
> >>>>>> > take in the new learnings, do not duplicate any core modules and
> >>>>>> allow us
> >>>>>> > to save valuable time.
> >>>>>> >
> >>>>>> > In the meantime, we would like to keep the discussion and process
> >>>>>> active on
> >>>>>> > this FLIP. Gaining valuable community feedback (which is helping
> >>>>>> us) will
> >>>>>> > help us address any potential gaps in the source connector design
> >>>>>> and
> >>>>>> > finalize it. Behind the scenes, we are already designing and
> >>>>>> pre-planning
> >>>>>> > our development work to adhere to feedback/best practices/ faster
> >>>>>> delivery
> >>>>>> > when we implement this FLIP.
> >>>>>> > Please share your thoughts on this.
> >>>>>> >
> >>>>>> > Regards
> >>>>>> > Saurabh
> >>>>>> >
> >>>>>> > On Fri, Jul 26, 2024 at 5:52 PM Li Wang <liwang505...@gmail.com>
> >>>>>> wrote:
> >>>>>> >
> >>>>>> > > Hi Saurabh,
> >>>>>> > >
> >>>>>> > > Thanks for the FLIP. Given the ongoing effort to implement the
> >>>>>> SQS sink
> >>>>>> > > connector (
> >>>>>> > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector
> >>>>>> > > ),
> >>>>>> > > it is important to consider how the SQS source connector
> supports
> >>>>>> this
> >>>>>> > > development, ensuring a unified framework for AWS integration
> >>>>>> that avoids
> >>>>>> > > capacity overlap and maximizes synchronization. Since the
> >>>>>> implementation
> >>>>>> > by
> >>>>>> > > Dhingra is still WIP, I would suggest taking that into account
> >>>>>> and keep
> >>>>>> > > this FLIP as an extension of that FLIP which could be taken up
> >>>>>> once the
> >>>>>> > SQS
> >>>>>> > > Sink FLIP is fully implemented. If not, this may lead to
> >>>>>> doublework in
> >>>>>> > > multiple identity-related modules and structure of the overall
> SQS
> >>>>>> > > connector codebase. What do you think?
> >>>>>> > >
> >>>>>> > > Thanks
> >>>>>> > >
> >>>>>> > >
> >>>>>> > > On Thursday, July 25, 2024, Saurabh Singh <
> >>>>>> saurabhsingh9...@gmail.com>
> >>>>>> > > wrote:
> >>>>>> > >
> >>>>>> > > > Hi Samrat,
> >>>>>> > > >
> >>>>>> > > > Thanks for the review and feedback.
> >>>>>> > > > We have evaluated all the three points. Please find the
> answers
> >>>>>> below:
> >>>>>> > > >
> >>>>>> > > > 1. AWS has announced JSON protocol support in SQS [1]. Can you
> >>>>>> shed
> >>>>>> > some
> >>>>>> > > > light on how different protocols will be supported?
> >>>>>> > > >  - We will utilize the AWS client library to connect with the
> >>>>>> AWS SQS
> >>>>>> > > > Service. Versions beyond 2.21.19 now support JSON, so simply
> >>>>>> upgrading
> >>>>>> > > the
> >>>>>> > > > client library will suffice for the protocol switch. However,
> >>>>>> from the
> >>>>>> > > > connector's perspective, we do not anticipate any changes in
> our
> >>>>>> > > > communication process, as it is handled by the client library.
> >>>>>> [4]
> >>>>>> > > >
> >>>>>> > > > 2. AWS SQS has two types of queues [2]. What are the
> >>>>>> implementation
> >>>>>> > > detail
> >>>>>> > > > differences for the source connector?
> >>>>>> > > > - SQS Connector is indifferent to the customer's choice of
> >>>>>> Queue type.
> >>>>>> > If
> >>>>>> > > > out-of-order messages are a concern, it will be the
> >>>>>> responsibility of
> >>>>>> > the
> >>>>>> > > > application code or main job logic to manage this.
> >>>>>> > > >
> >>>>>> > > > 3. Will the SQS source connector implement any kind of
> >>>>>> callbacks [3] on
> >>>>>> > > > success to offer any kind of guarantee?
> >>>>>> > > > - We have proposed deleting SQS messages using the
> notification
> >>>>>> > provided
> >>>>>> > > by
> >>>>>> > > > the checkpoint framework on checkpoint completion. Thus
> >>>>>> providing
> >>>>>> > exactly
> >>>>>> > > > once guarantee.[5] Additionally, when deleting messages, we
> will
> >>>>>> > monitor
> >>>>>> > > > the API call responses and log any failures, along with
> >>>>>> providing
> >>>>>> > > > observability through appropriate metrics.
> >>>>>> > > >
> >>>>>> > > > [1]
> >>>>>> > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
> >>>>>> > > > [2]
> >>>>>> > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
> >>>>>> > > > [3]
> >>>>>> > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
> >>>>>> > > > [4]
> >>>>>> > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-json-faqs.html#json-protocol-getting-started
> >>>>>> > > > [5]
> >>>>>> > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.jdcikzojx5d9
> >>>>>> > > >
> >>>>>> > > >
> >>>>>> > > > [Main Proposal doc] -
> >>>>>> > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
> >>>>>> > > >
> >>>>>> > > > Please feel free to reach out if you have more feedback.
> >>>>>> > > >
> >>>>>> > > > Regards
> >>>>>> > > > Saurabh & Abhi
> >>>>>> > > >
> >>>>>> > > >
> >>>>>> > > > On Wed, Jul 24, 2024 at 8:52 AM Samrat Deb <
> >>>>>> decordea...@gmail.com>
> >>>>>> > > wrote:
> >>>>>> > > >
> >>>>>> > > > > Hi Saurabh,
> >>>>>> > > > >
> >>>>>> > > > > Thank you for sharing the FLIP for the SQS source connector.
> >>>>>> An SQS
> >>>>>> > > > source
> >>>>>> > > > > connector will be a great addition to the Flink ecosystem,
> as
> >>>>>> there
> >>>>>> > is
> >>>>>> > > a
> >>>>>> > > > > growing demand for SQS source/sink integration.
> >>>>>> > > > >
> >>>>>> > > > > I have a few queries:
> >>>>>> > > > >
> >>>>>> > > > > 1. AWS has announced JSON protocol support in SQS [1]. Can
> >>>>>> you shed
> >>>>>> > > some
> >>>>>> > > > > light on how different protocols will be supported?
> >>>>>> > > > > 2. AWS SQS has two types of queues [2]. What are the
> >>>>>> implementation
> >>>>>> > > > detail
> >>>>>> > > > > differences for the source connector?
> >>>>>> > > > > 3. Will the SQS source connector implement any kind of
> >>>>>> callbacks [3]
> >>>>>> > on
> >>>>>> > > > > success to offer any kind of guarantee?
> >>>>>> > > > >
> >>>>>> > > > >
> >>>>>> > > > >
> >>>>>> > > > > [1]
> >>>>>> > > > >
> >>>>>> > > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
> >>>>>> > > > > [2]
> >>>>>> > > > >
> >>>>>> > > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
> >>>>>> > > > > [3]
> >>>>>> > > > >
> >>>>>> > > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
> >>>>>> > > > >
> >>>>>> > > > > Bests,
> >>>>>> > > > > Samrat
> >>>>>> > > > >
> >>>>>> > > > >
> >>>>>> > > > > On Fri, 19 Jul 2024 at 9:53 PM, Saurabh Singh <
> >>>>>> > > > saurabhsingh9...@gmail.com>
> >>>>>> > > > > wrote:
> >>>>>> > > > >
> >>>>>> > > > > > Hi Fink Devs,
> >>>>>> > > > > >
> >>>>>> > > > > > Our team has been working on migrating various data
> >>>>>> pipelines to
> >>>>>> > > Flink
> >>>>>> > > > to
> >>>>>> > > > > > leverage the benefits of exactly-once processing,
> >>>>>> checkpointing,
> >>>>>> > and
> >>>>>> > > > > > stateful computing. We have several use cases built around
> >>>>>> the AWS
> >>>>>> > > SQS
> >>>>>> > > > > > Service. For this migration, we have developed an SQS
> Source
> >>>>>> > > Connector,
> >>>>>> > > > > > which enables us to run both stateless and stateful
> >>>>>> Flink-based
> >>>>>> > jobs.
> >>>>>> > > > > >
> >>>>>> > > > > > We believe that this SQS Source Connector would be a
> >>>>>> valuable
> >>>>>> > > addition
> >>>>>> > > > to
> >>>>>> > > > > > the existing connector set. Therefore, we propose a FLIP
> to
> >>>>>> include
> >>>>>> > > it.
> >>>>>> > > > > >
> >>>>>> > > > > > For more information, please refer to the FLIP document.
> >>>>>> > > > > >
> >>>>>> > > > > >
> >>>>>> > > > > >
> >>>>>> > > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
> >>>>>> > > > > >
> >>>>>> > > > > > Thanks
> >>>>>> > > > > > Saurabh & Abhi
> >>>>>> > > > > >
> >>>>>> > > > >
> >>>>>> > > >
> >>>>>> > >
> >>>>>> >
> >>>>>>
> >>>>>
>

Reply via email to