RE: Flink custom parallel data source

2023-10-31 Thread Kamal Mittal via user
Thanks for sharing views.

Our client supports TCP stream based traffic only which is in some proprietary 
format and need to decode that. System which is accepting this traffic is flink 
based and that’s why all this tried with custom data source?

As you suggested message broker below then how it is feasible in this case?

From: Alexander Fedulov 
Sent: 01 November 2023 01:54 AM
To: Kamal Mittal 
Cc: user@flink.apache.org
Subject: Re: Flink custom parallel data source

Flink natively supports a pull-based model for sources, where the source 
operators request data from the external system when they are ready to process 
it.  Implementing a TCP server socket operator essentially creates a push-based 
source, which could lead to backpressure problems if the data ingestion rate 
exceeds the processing rate. You also lose any delivery guarantees because 
Flink's fault tolerance model relies on having replayable sources.
Is using a message broker not feasible in your case?

Best,
Alexander Fedulov

On Tue, 31 Oct 2023 at 13:08, Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello,

We are writing TCP server socket custom source function in which TCP server 
socket listener will accept connections and read data.
Single Custom source server socket function – ServerSocket serversocket = new 
ServerSocket();
Now using thread pool accept multiple connections in separate threads = new 
Runnable () -> serversocket.accept();
So client socket will be accepted and given to separate thread for read data 
from TCP stream.
Rgds,
Kamal
From: Alexander Fedulov 
mailto:alexander.fedu...@gmail.com>>
Sent: 31 October 2023 04:03 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Flink custom parallel data source

Please note that SourceFunction API is deprecated and is due to be removed, 
possibly in the next major version of Flink.
Ideally you should not be manually spawning threads in your Flink applications. 
Typically you would only perform data fetching in the sources and do processing 
in the subsequent operators which you can scale independently from the source 
parallelism. Can you describe what you are trying to achieve?

Best,
Alexander Fedulov

On Tue, 31 Oct 2023 at 07:22, Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello Community,

I need to have a custom parallel data source (Flink ParallelSourceFunction) for 
fetching data based on some custom logic. In this source function, opening 
multiple threads via java thread pool to distribute work further.

These threads share Flink provided ‘SourceContext’ and collect records via 
source_context.collect() method.

Is it ok to share source context in separate threads and get data?

Is there any issue for downstream operators due to above design?

Rgds,
Kamal


[ANNOUNCE] Apache Flink Kafka Connectors 3.0.1 released

2023-10-31 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Kafka Connectors 3.0.1. This release is compatible with the Apache
Flink 1.17.x and 1.18.x release series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352910

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gordon


Re: Flink custom parallel data source

2023-10-31 Thread Alexander Fedulov
Flink natively supports a pull-based model for sources, where the source
operators request data from the external system when they are ready to
process it.  Implementing a TCP server socket operator essentially creates
a push-based source, which could lead to backpressure problems if the data
ingestion rate exceeds the processing rate. You also lose any delivery
guarantees because Flink's fault tolerance model relies on having
replayable sources.
Is using a message broker not feasible in your case?

Best,
Alexander Fedulov

On Tue, 31 Oct 2023 at 13:08, Kamal Mittal 
wrote:

> Hello,
>
>
>
> We are writing TCP server socket custom source function in which TCP
> server socket listener will accept connections and read data.
>
> Single Custom source server socket function – ServerSocket *serversocket* =
> new ServerSocket();
>
> Now using thread pool accept multiple connections in separate threads = new
>  *Runnable* () -> *serversocket*.accept();
>
> So client socket will be accepted and given to separate thread for read
> data from TCP stream.
>
> Rgds,
>
> Kamal
>
> *From:* Alexander Fedulov 
> *Sent:* 31 October 2023 04:03 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink custom parallel data source
>
>
>
> Please note that SourceFunction API is deprecated and is due to be
> removed, possibly in the next major version of Flink.
>
> Ideally you should not be manually spawning threads in your Flink
> applications. Typically you would only perform data fetching in the sources
> and do processing in the subsequent operators which you can scale
> independently from the source parallelism. Can you describe what you are
> trying to achieve?
>
>
>
> Best,
>
> Alexander Fedulov
>
>
>
> On Tue, 31 Oct 2023 at 07:22, Kamal Mittal via user 
> wrote:
>
> Hello Community,
>
>
>
> I need to have a custom parallel data source (Flink ParallelSourceFunction)
> for fetching data based on some custom logic. In this source function,
> opening multiple threads via java thread pool to distribute work further.
>
>
>
> These threads share Flink provided ‘SourceContext’ and collect records via
> source_context.collect() method.
>
>
>
> Is it ok to share source context in separate threads and get data?
>
>
>
> Is there any issue for downstream operators due to above design?
>
>
>
> Rgds,
>
> Kamal
>
>


Why is Apache Beam a required dependency of PyFlink (and can it be removed)?

2023-10-31 Thread Deepyaman Datta
Hi,

I'm trying to understand where the Apache Beam dependency comes from; it's
not just a regular dependency of PyFlink, but a build system dependency.
Searching through the code, it seems like Beam is only used by PyFlink, and
not by non-Python Flink. In my (limited) understanding, it looks like the
Beam dependency is there mostly to enable a Beam runner.

However, if that's the case, then there are a lot of users who may not want
to use PyFlink via Beam, in which case the Apache Beam dependency is
unnecessarily restrictive. For instance, the latest version of
`apache-beam` caps `numpy<1.25.0` and `pyarrow<12.0.0`, whereas NumPy
1.26.x and PyArrow 13.0.0 have already been out for some time.

Am I correct in my understanding that Beam is only a dependency in order
to create a Beam runner/integration? If so, can Beam be an extra/optional
dependency of PyFlink, instead of being required for everybody?

Thank you!

Best regards,
Deepyaman


RE: Flink custom parallel data source

2023-10-31 Thread Kamal Mittal via user
Hello,

We are writing TCP server socket custom source function in which TCP server 
socket listener will accept connections and read data.
Single Custom source server socket function – ServerSocket serversocket = new 
ServerSocket();
Now using thread pool accept multiple connections in separate threads = new 
Runnable () -> serversocket.accept();
So client socket will be accepted and given to separate thread for read data 
from TCP stream.
Rgds,
Kamal
From: Alexander Fedulov 
Sent: 31 October 2023 04:03 PM
To: Kamal Mittal 
Cc: user@flink.apache.org
Subject: Re: Flink custom parallel data source

Please note that SourceFunction API is deprecated and is due to be removed, 
possibly in the next major version of Flink.
Ideally you should not be manually spawning threads in your Flink applications. 
Typically you would only perform data fetching in the sources and do processing 
in the subsequent operators which you can scale independently from the source 
parallelism. Can you describe what you are trying to achieve?

Best,
Alexander Fedulov

On Tue, 31 Oct 2023 at 07:22, Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello Community,

I need to have a custom parallel data source (Flink ParallelSourceFunction) for 
fetching data based on some custom logic. In this source function, opening 
multiple threads via java thread pool to distribute work further.

These threads share Flink provided ‘SourceContext’ and collect records via 
source_context.collect() method.

Is it ok to share source context in separate threads and get data?

Is there any issue for downstream operators due to above design?

Rgds,
Kamal


Re: Flink custom parallel data source

2023-10-31 Thread Alexander Fedulov
Please note that SourceFunction API is deprecated and is due to be removed,
possibly in the next major version of Flink.
Ideally you should not be manually spawning threads in your Flink
applications. Typically you would only perform data fetching in the sources
and do processing in the subsequent operators which you can scale
independently from the source parallelism. Can you describe what you are
trying to achieve?

Best,
Alexander Fedulov

On Tue, 31 Oct 2023 at 07:22, Kamal Mittal via user 
wrote:

> Hello Community,
>
>
>
> I need to have a custom parallel data source (Flink ParallelSourceFunction)
> for fetching data based on some custom logic. In this source function,
> opening multiple threads via java thread pool to distribute work further.
>
>
>
> These threads share Flink provided ‘SourceContext’ and collect records via
> source_context.collect() method.
>
>
>
> Is it ok to share source context in separate threads and get data?
>
>
>
> Is there any issue for downstream operators due to above design?
>
>
>
> Rgds,
>
> Kamal
>


Re: [DISCUSS][FLINK-33240] Document deprecated options as well

2023-10-31 Thread Alexander Fedulov
Hi Zhanghao,

Thanks for the proposition.
In general +1, this sounds like a good idea as long it is clear that the
usage of these settings is discouraged.
Just one minor concern - the configuration page is already very long, do
you have a rough estimate of how many more options would be added with this
change?

Best,
Alexander Fedulov

On Mon, 30 Oct 2023 at 18:24, Matthias Pohl 
wrote:

> Thanks for your proposal, Zhanghao Chen. I think it adds more transparency
> to the configuration documentation.
>
> +1 from my side on the proposal
>
> On Wed, Oct 11, 2023 at 2:09 PM Zhanghao Chen 
> wrote:
>
> > Hi Flink users and developers,
> >
> > Currently, Flink won't generate doc for the deprecated options. This
> might
> > confuse users when upgrading from an older version of Flink: they have to
> > either carefully read the release notes or check the source code for
> > upgrade guidance on deprecated options.
> >
> > I propose to document deprecated options as well, with a "(deprecated)"
> > tag placed at the beginning of the option description to highlight the
> > deprecation status [1].
> >
> > Looking forward to your feedbacks on it.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-33240
> >
> > Best,
> > Zhanghao Chen
> >
>


RE: Updating existing state with state processor API

2023-10-31 Thread Schwalbe Matthias
Hi Alexis,

Sorry for the late answer … got carried away with other tasks 

I hope I get this right as there is a mixture of concepts in my mind with 
respect for the old and the savepoint API. I’ll try to answer for the new API.

  *   If you want to patch an existing savepoint, you would load it into a 
SavepointWriter [1], this will basically copy the existing Savepoint, you then:
  *   can remove or/and add state for operators [2] [3]
  *   then you write the savepoint into the new location [4]
  *   the old API had the restriction, that you had to change at least one 
operator state in order to be able to write the savepoint out to a new 
location. I don’t believe this restriction applies for the new API
  *   If you want to patch an existing state (e.g. for changing an incompatible 
schema),
  *   you need to load/bootstrap this state by means of SavepointReader [5] and 
some of the readXXXState functions
  *   then remove the existing state from previous savepoint [2] and add the 
new state [3] with the bootstrap transformation obtained above
  *   if you want a state to be empty it suffices to [2] remove the existing 
state, if existed

I hope this helps, … happy to hear someone correct me, if mistaken


Sincere regards

Thias


[1] 
org.apache.flink.state.api.SavepointWriter#fromExistingSavepoint(java.lang.String)
[2] 
org.apache.flink.state.api.SavepointWriter#removeOperator(org.apache.flink.state.api.OperatorIdentifier)
[3] 
org.apache.flink.state.api.SavepointWriter#withOperator(org.apache.flink.state.api.OperatorIdentifier,
 org.apache.flink.state.api.StateBootstrapTransformation)
[4] org.apache.flink.state.api.SavepointWriter#write
[5] 
org.apache.flink.state.api.SavepointReader#read(org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
 java.lang.String, org.apache.flink.runtime.state.StateBackend)

From: Alexis Sarda-Espinosa 
Sent: Friday, October 27, 2023 4:29 PM
To: Schwalbe Matthias 
Cc: user 
Subject: Re: Updating existing state with state processor API

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Matthias,

Thanks for the response. I guess the specific question would be, if I work with 
an existing savepoint and pass an empty DataStream to 
OperatorTransformation#bootstrapWith, will the new savepoint end up with an 
empty state for the modified operator, or will it maintain the existing state 
because nothing was changed?

Regards,
Alexis.

Am Fr., 27. Okt. 2023 um 08:40 Uhr schrieb Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>>:
Good morning Alexis,

Something like this we do all the time.
Read and existing savepoint, copy some of the not to be changed operator states 
(keyed/non-keyed) over, and process/patch the remaining ones by transforming 
and bootstrapping to new state.

I could spare more details for more specific questions, if you like 

Regards

Thias

PS: I’m currently working on this ticket in order to get some glitches removed: 
FLINK-26585


From: Alexis Sarda-Espinosa 
mailto:sarda.espin...@gmail.com>>
Sent: Thursday, October 26, 2023 4:01 PM
To: user mailto:user@flink.apache.org>>
Subject: Updating existing state with state processor API

Hello,

The documentation of the state processor API has some examples to modify an 
existing savepoint by defining a StateBootstrapTransformation. In all cases, 
the entrypoint is OperatorTransformation#bootstrapWith, which expects a 
DataStream. If I pass an empty DataStream to bootstrapWith and then apply the 
resulting transformation to an existing savepoint, will the transformation 
still receive data from the existing state?

If the aforementioned is incorrect, I imagine I could instantiate a 
SavepointReader and create a DataStream of the existing state with it, which I 
could then pass to the bootstrapWith method directly or after "unioning" it 
with additional state. Would this work?

Regards,
Alexis.

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist 

Flink custom parallel data source

2023-10-31 Thread Kamal Mittal via user
Hello Community,

I need to have a custom parallel data source (Flink ParallelSourceFunction) for 
fetching data based on some custom logic. In this source function, opening 
multiple threads via java thread pool to distribute work further.

These threads share Flink provided 'SourceContext' and collect records via 
source_context.collect() method.

Is it ok to share source context in separate threads and get data?

Is there any issue for downstream operators due to above design?

Rgds,
Kamal