Yes, that's what I was proposing!

@Karim If there's not already a Jira issue, please create one. You can ping me, so that I can assign you.

@Austin There's a Jira component for the RMQ source, maybe you can take a stab at some of the issues there: https://issues.apache.org/jira/browse/FLINK-17204?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Connectors%2F%20RabbitMQ%22%20AND%20statusCategory%20!%3D%20Done.

Best,
Aljoscha

On 03.05.20 16:38, seneg...@gmail.com wrote:
Hi,

Okay so keep the current constructors as is, create new ones with more
granular parsing of the results. Sounds like a good plan.

How do we proceed from here ?

Regards,
Karim Mansour

On Fri, May 1, 2020 at 5:03 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

Hey,

(Switching to my personal email)

Correct me if I'm wrong, but I think Aljoscha is proposing keeping the
public API as is, and adding some new constructors/ custom deserialization
schemas as was done with Kafka. Here's what I was able to find on that
feature:

* https://issues.apache.org/jira/browse/FLINK-8354
*

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java
*

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java#L100-L114

Best,
Austin

On Fri, May 1, 2020 at 6:19 AM seneg...@gmail.com <seneg...@gmail.com>
wrote:

Hello,

So the proposal is to keep the current RMQSource constructors /  public
api
as is and create new ones that gives more granular parsing ?

Regards,
Karim Mansour

On Thu, Apr 30, 2020 at 5:23 PM Austin Cawley-Edwards <
aus...@fintechstudios.com> wrote:

Hey all + thanks Konstantin,

Like mentioned, we also run into issues with the RMQ Source
inflexibility.
I think Aljoscha's idea of supporting both would be a nice way to
incorporate new changes without breaking the current API.

We'd definitely benefit from the changes proposed here but have another
issue with the Correlation ID. When a message gets in the queue
without a
correlation ID, the source errors and the job cannot recover, requiring
(painful) manual intervention. It would be nice to be able to
dead-letter
these inputs from the source, but I don't think that's possible with
the
current source interface (don't know too much about the source
specifics).
We might be able to work around this with a custom Correlation ID
extractor, as proposed by Karim.

Also, if there are other tickets in the RMQ integrations that have gone
unmaintained, I'm also happy to chip it at maintaining them!

Best,
Austin
________________________________
From: Konstantin Knauf <kna...@apache.org>
Sent: Thursday, April 30, 2020 6:14 AM
To: dev <dev@flink.apache.org>
Cc: Austin Cawley-Edwards <aus...@fintechstudios.com>
Subject: Re: [DISCUSS] flink-connector-rabbitmq api changes

Hi everyone,

just looping in Austin as he mentioned that they also ran into issues
due
to the inflexibility of the RabiitMQSourcce to me yesterday.

Cheers,

Konstantin

On Thu, Apr 30, 2020 at 11:23 AM seneg...@gmail.com<mailto:
seneg...@gmail.com> <seneg...@gmail.com<mailto:seneg...@gmail.com>>
wrote:
Hello Guys,

Thanks for all the responses, i want to stress out that i didn't feel
ignored i just thought that i forgot an important step or something.

Since i am a newbie i would follow whatever route you guys would
suggest
:)
and i agree that the RMQ connector needs a lot of love still "which i
would
be happy to submit gradually"

as for the code i have it here in the PR:
https://github.com/senegalo/flink/pull/1 it's not that much of a
change
in
terms of logic but more of what is exposed.

Let me know how you want me to proceed.

Thanks again,
Karim Mansour

On Thu, Apr 30, 2020 at 10:40 AM Aljoscha Krettek <aljos...@apache.org
<mailto:aljos...@apache.org>>
wrote:

Hi,

I think it's good to contribute the changes to Flink directly since
we
already have the RMQ connector in the respository.

I would propose something similar to the Kafka connector, which takes
both the generic DeserializationSchema and a
KafkaDeserializationSchema
that is specific to Kafka and allows access to the ConsumerRecord and
therefore all the Kafka features. What do you think about that?

Best,
Aljoscha

On 30.04.20 10:26, Robert Metzger wrote:
Hey Karim,

I'm sorry that you had such a bad experience contributing to Flink,
even
though you are nicely following the rules.

You mentioned that you've implemented the proposed change already.
Could
you share a link to a branch here so that we can take a look? I can
assess
the API changes easier if I see them :)

Thanks a lot!


Best,
Robert

On Thu, Apr 30, 2020 at 8:09 AM Dawid Wysakowicz <
dwysakow...@apache.org<mailto:dwysakow...@apache.org>

wrote:

Hi Karim,

Sorry you did not have the best first time experience. You
certainly
did
everything right which I definitely appreciate.

The problem in that particular case, as I see it, is that RabbitMQ
is
not very actively maintained and therefore it is not easy too
find a
committer willing to take on this topic. The point of connectors
not
being properly maintained was raised a few times in the past on
the
ML.
One of the ideas how to improve the situation there was to start a
https://flink-packages.org/ page. The idea is to ask active users
of
certain connectors to maintain those connectors outside of the
core
project, while giving them a platform within the community where
they
can make their modules visible. That way it is possible to
overcome
the
lack of capabilities within the core committers without loosing
much
on
the visibility.

I would kindly ask you to consider that path, if you are
interested.
You
can of course also wait/reach out to more committers if you feel
strong
about contributing those changes back to the Flink repository
itself.

Best,

Dawid

On 30/04/2020 07:29, seneg...@gmail.com<mailto:seneg...@gmail.com

wrote:
Hello,

I am new to the mailing list and to contributing in Big
opensource
projects
in general and i don't know if i did something wrong or should be
more
patient :)

I put a topic for discussion as per the contribution guide "
https://flink.apache.org/contributing/how-to-contribute.html";
almost a
week
ago and since what i propose is not backward compatible it needs
to
be
discussed here before opening a ticket and moving forward.

So my question is. Will someone pick the discussion up ? or at
least
someone would say that this is not the way to go ? or should i
assume
from
the silence that it's not important / relevant to the project ?
Should
i
track the author of the connector and send him directly ?

Thank you for your time.

Regards,
Karim Mansour

On Fri, Apr 24, 2020 at 11:17 AM seneg...@gmail.com<mailto:
seneg...@gmail.com> <
seneg...@gmail.com<mailto:seneg...@gmail.com>>
wrote:

Dear All,

I want to propose a change to the current RabbitMQ connector.

Currently the RMQSource is extracting the body of the message
which
is a
byte array and pass it to a an instance of a user implementation
of
the
DeserializationSchema class to deserialize the body of the
message.
It
also uses the correlation id from the message properties to
deduplicate
the
message.

What i want to propose is instead of taking a implementation of
a
DeserializationSchema in the RMQSource constructor, actually
have
the
user implement an interface that would have methods both the
output
for
the
RMQSource and the correlation id used not only from the body of
the
message
but also to it's metadata and properties thus giving the
connector
much
more power and flexibility.

This of course would mean a breaking API change for the
RMQSource
since
it
will no longer take a DeserializationSchema but an
implementation
of a
predefined interface that has the methods to extract both the
output
of
the
RMQSource and the to extract the unique message id as well.

The reason behind that is that in my company we were relaying on
another
property the message id for deduplication of the messages and i
also
needed
that information further down the pipeline and there was
absolutely
no
way
of getting it other than modifying the RMQSource.

I already have code written but as the rules dictates i have to
run
it
by
you guys first before i attempt to create a Jira ticket :)

Let me know what you think.

Regards,
Karim Mansour








--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk





Reply via email to