Re: [ANNOUNCE] Apache Flink 1.17.0 released

2023-03-24 Thread Panagiotis Garefalakis
Congrats all! Well done!

Cheers,
Panagiotis

On Fri, Mar 24, 2023 at 2:46 AM Qingsheng Ren  wrote:

> I'd like to say thank you to all contributors of Flink 1.17. Your support
> and great work together make this giant step forward!
>
> Also like Matthias mentioned, feel free to leave us any suggestions and
> let's improve the releasing procedure together.
>
> Cheers,
> Qingsheng
>
> On Fri, Mar 24, 2023 at 5:00 PM Etienne Chauchot 
> wrote:
>
>> Congrats to all the people involved!
>>
>> Best
>>
>> Etienne
>>
>> Le 23/03/2023 à 10:19, Leonard Xu a écrit :
>> > The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.17.0, which is the first release for the Apache Flink 1.17
>> series.
>> >
>> > Apache Flink® is an open-source unified stream and batch data
>> processing framework for distributed, high-performing, always-available,
>> and accurate data applications.
>> >
>> > The release is available for download at:
>> > https://flink.apache.org/downloads.html
>> >
>> > Please check out the release blog post for an overview of the
>> improvements for this release:
>> >
>> https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/
>> >
>> > The full release notes are available in Jira:
>> >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351585
>> >
>> > We would like to thank all contributors of the Apache Flink community
>> who made this release possible!
>> >
>> > Best regards,
>> > Qingsheng, Martijn, Matthias and Leonard
>>
>


Re: Bootstrapping multiple state within same operator

2023-03-24 Thread David Artiga
That sounds like a great hack :D
I'll give it a try for sure. Thank you!
/David

On Fri, Mar 24, 2023 at 5:25 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi David,
>
> … coming in late into this discussion
>
>
>
> We had a very similar problem and I found a simple way to implement
> priming savepoints with mixed keyed/operator state.
>
> The trick is this:
>
>- In your KeyedStateBootstrapFunction also implement
>CheckpointedFunction
>- In initializeState() you can initialize the broad state primitive
>(the code skeleton below uses getUnionListState, same principle)
>- Then in the processElement() function I process a tuple of state
>collections for each state primitive, i.e. event object per key
>- For the union list state I forge special key “broadcast”, and only
>the 3rd tuple vector contains anything,
>   - (the upstream operator feeding into this bootstrap function makes
>   sure only one event with “broadcast” key is generated)
>
>
>
> Peruse the code skeleton (scala) if you like (I removed some stuff I’m not
> supposed to disclose):
>
>
>
>
>
> /** Event type for state updates for savepoint generation. A 4-tuple of
>
> * 1) vector of valid [[CSC]] entries
>
> * 2) a vector of valid [[DAS]] entries
>
> * 3) a vector of valid broadcast FFR.
>
> * 4) a vector of timers for state cleanup */
>
> type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long])
>
>
>
> /** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key
> context. */
>
> type DAFFSHU = StateUpdate[String, DAFFS]
>
>
>
> class DAFFOperatorStateBootstrapFunction
>
>   extends KeyedStateBootstrapFunction[String, DAFFSHU]
>
> with CheckpointedFunction {
>
>
>
>   override def open(parameters: Configuration): Unit = {
>
> super.open(parameters)
>
> val rtc: RuntimeContext = getRuntimeContext
>
> //keyed state setup:
>
> // cSC = rtc.getListState(new ListStateDescriptor[CSC](...
>
> // dAS = rtc.getListState(new ListStateDescriptor[DAS](...
>
>   }
>
>
>
>   override def processElement(value: DAFFSHU, ctx:
> KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = {
>
>
>
> val daffs = value.state
>
> val ts = ctx.timerService()
>
>
>
> for (csc <- daffs._1) {
>
>   cSC.add(csc)
>
> }
>
> for (das <- daffs._2) {
>
>   dAS.add(das)
>
> }
>
> for (ffr <- daffs._3) {
>
>   fFRState.add(ffr)
>
> }
>
> for (timer <- daffs._4) {
>
>   ts.registerEventTimeTimer(timer)
>
> }
>
>
>
> val stop = 0
>
>   }
>
>
>
>   @transient var fFRState: ListState[FFR] = null
>
>
>
>   override def snapshotState(context: FunctionSnapshotContext): Unit = {
>
>   }
>
>
>
>   override def initializeState(context: FunctionInitializationContext):
> Unit = {
>
> val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI)
>
> fFRState =
> context.getOperatorStateStore.getUnionListState(fFRStateDescriptor)
>
>   }
>
> }
>
>
>
> Hope this helps …
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* David Artiga 
> *Sent:* Wednesday, March 22, 2023 11:31 AM
> *To:* Hang Ruan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Bootstrapping multiple state within same operator
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Not familiar with the implementation but thinking some options:
>
> - composable transformations
> - underlying MultiMap
> - ...
>
>
>
> On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan  wrote:
>
> Hi, David,
>
> I also read the code about the `SavepointWriter#withOperator`. The
> transformations are stored in a `Map` whose key is `OperatorID`. I don't
> come up with a way that we could register multi transformations for one
> operator with the provided API.
>
>
>
> Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
> states at one time.
>
>
>
> Best,
>
> Hang
>
>
>
> David Artiga  于2023年3月22日周三 15:22写道:
>
> We are using state
> 
>  processor
> API
> 
>  to
> bootstrap the state of some operators. It has been working fine until now,
> when we tried to bootstrap an operator that has both a keyed state and a
> broadcasted state. Seems the API does not provide a convenient method to
> apply multiple transformations on the same *uid...*
>
>
>
> Is there a way to do that and we just missed it? Any insights appreciated.
>
>
>
> Cheers,
>
> /David
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachric

RE: Bootstrapping multiple state within same operator

2023-03-24 Thread Schwalbe Matthias
Hi David,
… coming in late into this discussion

We had a very similar problem and I found a simple way to implement priming 
savepoints with mixed keyed/operator state.
The trick is this:

  *   In your KeyedStateBootstrapFunction also implement CheckpointedFunction
  *   In initializeState() you can initialize the broad state primitive (the 
code skeleton below uses getUnionListState, same principle)
  *   Then in the processElement() function I process a tuple of state 
collections for each state primitive, i.e. event object per key
  *   For the union list state I forge special key “broadcast”, and only the 
3rd tuple vector contains anything,
 *   (the upstream operator feeding into this bootstrap function makes sure 
only one event with “broadcast” key is generated)

Peruse the code skeleton (scala) if you like (I removed some stuff I’m not 
supposed to disclose):


/** Event type for state updates for savepoint generation. A 4-tuple of
* 1) vector of valid [[CSC]] entries
* 2) a vector of valid [[DAS]] entries
* 3) a vector of valid broadcast FFR.
* 4) a vector of timers for state cleanup */
type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long])

/** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key 
context. */
type DAFFSHU = StateUpdate[String, DAFFS]

class DAFFOperatorStateBootstrapFunction
  extends KeyedStateBootstrapFunction[String, DAFFSHU]
with CheckpointedFunction {

  override def open(parameters: Configuration): Unit = {
super.open(parameters)
val rtc: RuntimeContext = getRuntimeContext
//keyed state setup:
// cSC = rtc.getListState(new ListStateDescriptor[CSC](...
// dAS = rtc.getListState(new ListStateDescriptor[DAS](...
  }

  override def processElement(value: DAFFSHU, ctx: 
KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = {

val daffs = value.state
val ts = ctx.timerService()

for (csc <- daffs._1) {
  cSC.add(csc)
}
for (das <- daffs._2) {
  dAS.add(das)
}
for (ffr <- daffs._3) {
  fFRState.add(ffr)
}
for (timer <- daffs._4) {
  ts.registerEventTimeTimer(timer)
}

val stop = 0
  }

  @transient var fFRState: ListState[FFR] = null

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI)
fFRState = 
context.getOperatorStateStore.getUnionListState(fFRStateDescriptor)
  }
}

Hope this helps …

Sincere greetings

Thias


From: David Artiga 
Sent: Wednesday, March 22, 2023 11:31 AM
To: Hang Ruan 
Cc: user@flink.apache.org
Subject: Re: Bootstrapping multiple state within same operator

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Not familiar with the implementation but thinking some options:

- composable transformations
- underlying MultiMap
- ...

On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan 
mailto:ruanhang1...@gmail.com>> wrote:
Hi, David,
I also read the code about the `SavepointWriter#withOperator`. The 
transformations are stored in a `Map` whose key is `OperatorID`. I don't come 
up with a way that we could register multi transformations for one operator 
with the provided API.

Maybe we need a new type of  `XXXStateBootstrapFunction` to change more states 
at one time.

Best,
Hang

David Artiga mailto:david.art...@gmail.com>> 
于2023年3月22日周三 15:22写道:
We are using 
state
 processor 
API
 to bootstrap the state of some operators. It has been working fine until now, 
when we tried to bootstrap an operator that has both a keyed state and a 
broadcasted state. Seems the API does not provide a convenient method to apply 
multiple transformations on the same uid...

Is there a way to do that and we just missed it? Any insights appreciated.

Cheers,
/David
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthoris

Share Your Apache Flink Expertise at Community Over Code 2023 Fintech Track

2023-03-24 Thread Javier Borkenztain
Hello Apache Flink Community,

I hope you are all doing well. As active Apache Software Foundation (ASF)
community members, we share a strong interest in open-source technologies
and their ability to drive innovation across various industries.

In this spirit, I am thrilled to invite you to participate in the Fintech
Track at Community Over Code 2023 (previously known as ApacheCon), the
ASF's premier event. The Fintech Track presents an excellent opportunity
for experts within the ASF community to collaborate, exchange knowledge,
and explore the latest trends, challenges, and opportunities at the
intersection of Fintech and open source.

Apache Flink is known for its impressive distributed stream and batch data
processing capabilities, making it a valuable tool for Fintech applications
that require real-time analytics, fraud detection, and risk management.
Your experience and insights with Apache Flink can greatly enrich the
Fintech Track's discussions and promote collaboration and innovation within
our community.

We encourage you to submit a talk proposal on any topic related to the
intersection of Fintech and Apache Flink. Some examples include:


   - Leveraging Apache Flink for real-time analytics in Fintech applications
   - Building fraud detection and risk management solutions with Flink
   - Developing scalable and high-performance Fintech applications using
   Flink
   - Integrating Flink with other ASF projects to enhance Fintech ecosystems

To submit your talk, please visit
https://communityovercode.org/call-for-presentations/ and follow the
instructions here. The Call For Presentations (CFP) is open and will close
on July 13th, 2023, at 23:59 UTC.

We are excited to have you participate in the Fintech Track at Community
Over Code 2023 and can't wait to learn from your insights into the
potential of Apache Flink in the Fintech industry.

Warm regards,

Javier Borkenztain
Apache Fineract PMC member
Member of the ASF
Fintech Track Chair Community over Code.


RE: Is it possible to preserve chaining for multi-input operators?

2023-03-24 Thread Schwalbe Matthias
Hi Viacheslav,

… back from vacation

… you are welcome, glad to hear it worked out 😊

Thias


From: Viacheslav Chernyshev 
Sent: Thursday, March 16, 2023 5:34 PM
To: user@flink.apache.org
Subject: Re: Is it possible to preserve chaining for multi-input operators?

Hi Matthias,

Just wanted to thank you for the hints! I've successfully developed a 
multi-stream operator that allows doing things like this:

KeyedMultiInputStream.builder(environment, new UserDefinedFunction())
.addKeyedStream(fooSource, fooMapper, UserDefinedFunction::processFoo)
.addKeyedStream(barSource, barMapper, UserDefinedFunction::processBar)
.addBroadcastStream(bazSource, UserDefinedFunction::processBaz)
.build();

Direct connectivity to the sources and optional on-the-fly mappers have 
completely eliminated the performance issues that we had been facing before.

Kind regards,
Viacheslav

From: Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>>
Sent: 28 February 2023 15:50
To: Viacheslav Chernyshev 
mailto:v.chernys...@outlook.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: RE: Is it possible to preserve chaining for multi-input operators?


Hi Viacheslav,



Certainly I can …



There is two parts to it,

  *   setting up such MultipleInputStreamOperator, which is documented (sort 
of), but not quite complete

 *   I can prepare some boiler-plate, not today, but in the next days (if 
you are interested)

  *   Second part is about how to put all joins and other operations into a 
single operator implementation (well, you exactly do that 😊 ):

 *   Equi-joins on the key, you can process per Input() implementation and 
state kept from other inputs
 *   Windowing is restricted to a single window key type (a Namespace in 
Flink-speak) for your operator

*   Windowing can be implemented manually and modelled after the 
official Flink windowing operators

 *   Should you absolutely need more than one windowing namespace, then you 
need to become creative with state primitives

  *   You mentioned also broadcast streams, that is in the end you’ll have more 
than 2 input streams, the keyed ones + the broadcast streams

 *   This is where MultipleInputStreamOperator comes into play, because you 
are not restricted to only 2 input streams as in the KeyedCoProcessFunction case
 *   That gives you more freedom to combine data in a single operator 
instead of being forced to split/chain multiple operators



Kind regards



Thias









From: Viacheslav Chernyshev 
mailto:v.chernys...@outlook.com>>
Sent: Tuesday, February 28, 2023 3:42 PM
To: user@flink.apache.org
Subject: Re: Is it possible to preserve chaining for multi-input operators?



Hi Matthias,



Thank you for the reply. You are absolutely right, the first keyBy is 
unavoidable, but after that we fix the parallelism and maintain the same key 
throughout the pipeline.



The MultipleInputStreamOperator approach that you've described looks very 
interesting! Unfortunately, I have never used it before. Would you be able to 
share the details for how to force the chaining with e.g. two input streams?



Kind regards,

Viacheslav



From: Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>>
Sent: 28 February 2023 14:12
To: Viacheslav Chernyshev 
mailto:v.chernys...@outlook.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: RE: Is it possible to preserve chaining for multi-input operators?





Hi Viacheslav,



These are two very interesting questions…



You have found out about the chaining restriction to single input operators to 
be chained, it does also not help to union() multiple streams into a single 
input, they still count as multiple inputs.



  *   The harder way to go would be to patch the relevant parts of Flink to 
allow chaining with multiple inputs

 *   This is very complicated to get right, especially for the then 
multiple inputs and outputs that need to get façaded
 *   We once did it (successfully) and abandoned the idea because of its 
complexity and maintenance cost

  *   The other way might be to implement all into one 
org.apache.flink.streaming.api.operators.MultipleInputStreamOperator that 
allows to have any (reasonable) number of inputs, keyed, non-keyed, broadcast ; 
mixed …. Let me explain:

 *   From what you say I assume, that after the Kafka source you need to 
.keyBy() the instrument-id anyway, which means a shuffle and 
(de-/)serialization … unavoidable.
 *   However, after that shuffle, the MultipleInputStreamOperator could 
force-chain all your logic as long as it stays to be on the same key/partition 
domain
 *   Integration of broadcast inputs is a no-brainer there
 *   We do these things all the time and it really helps cutting down 
serialization cost, amo

Re: [ANNOUNCE] Apache Flink 1.17.0 released

2023-03-24 Thread Qingsheng Ren
I'd like to say thank you to all contributors of Flink 1.17. Your support
and great work together make this giant step forward!

Also like Matthias mentioned, feel free to leave us any suggestions and
let's improve the releasing procedure together.

Cheers,
Qingsheng

On Fri, Mar 24, 2023 at 5:00 PM Etienne Chauchot 
wrote:

> Congrats to all the people involved!
>
> Best
>
> Etienne
>
> Le 23/03/2023 à 10:19, Leonard Xu a écrit :
> > The Apache Flink community is very happy to announce the release of
> Apache Flink 1.17.0, which is the first release for the Apache Flink 1.17
> series.
> >
> > Apache Flink® is an open-source unified stream and batch data processing
> framework for distributed, high-performing, always-available, and accurate
> data applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> improvements for this release:
> >
> https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/
> >
> > The full release notes are available in Jira:
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351585
> >
> > We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> >
> > Best regards,
> > Qingsheng, Martijn, Matthias and Leonard
>


Table API function and expression vs SQL

2023-03-24 Thread ravi_suryavanshi.yahoo.com via user
Hello Team,Need your advice on which method is recommended considering don't 
want to change my query code when the Flink is updated/upgraded to the higher 
version.
Here I am seeking advice for writing the SQL using java code(Table API  
function and Expression) or using pure SQL.
I am assuming that SQL will not have any impact if upgraded to the higher 
version.
Thanks and Regards,Ravi

Re: Is there a way to control the parallelism of auto-generated Flink operators of the FlinkSQL job graph?

2023-03-24 Thread Hang Ruan
Hi, Elkhan,

I think this is an intended behavior. If the parallelism of an operator is
not specified, it will be the same as the previous one instead of the
default parallelism.
Actually the table planner will help us to do most jobs. There should not
be a way to modify the parallelism for every operator. After all we don't
know what operators will be contained when we write the sql.

Best,
Hang

Elkhan Dadashov  于2023年3月24日周五 14:14写道:

> Checking with the community again, if anyone explored this before.
>
> Thanks.
>
>
> On Fri, Mar 17, 2023 at 1:56 PM Elkhan Dadashov  >
> wrote:
>
> > Dear Flink developers,
> >
> > Wanted to check, if there is a way to control the parallelism of
> > auto-generated Flink operators of the FlinkSQL job graph?
> >
> > In Java API, it is possible to have full control of the parallelism of
> > each operator.
> >
> > On FlinkSQL some source and sink connectors support `source.parallelism`
> > and `sink.parallelism`, and the rest can be set via
> `default.parallelism`.
> >
> > In this particular scenario, enchancedEvents gets chained to the
> > KafkaSource operator, it can be separated by calling disableChain() on
> > KafkaSource  stream on Kafka connector side, but even with disabled
> > chaining on the source stream, `enhancedEvents` operator parallelism is
> > still set to 5 (same as Kafka Source operator parallelism), instead of 3
> > (which is default parallelism) :
> >
> > ```sql
> > SET 'parallelism.default' = '3';
> >
> > CREATE TABLE input_kafka_table
> > (
> > ...
> > ts AS TO_TIMESTAMP_LTZ(CAST(`timestamp` AS BIGINT),3),
> > WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE
> > ) WITH (
> > 'connector' = 'kafka',
> > 'source.parallelism' = '5' // this is supported by cutomization of
> > kafka connector
> > ...
> > );
> >
> > CREATE TEMPORARY VIEW enhancedEvents AS (
> >  SELECT x, y
> >  FROM input_kafka_table, LATERAL TABLE(udf.doIt(x, y)
> > );
> >
> > CREATE TABLE other_table_source (...) WITH(...);
> > CREATE TABLE other_table_sink (...) WITH(...);
> >
> > BEGIN STATEMENT SET;
> >  INSERT into enhancedEventsSink (Select * from enhancedEvents);
> >  INSERT into other_table_sink (Select z from other_table_source );
> > END;
> > ```
> >
> > Is there a way to force override parallelism of auto-generated operators
> > for FlinkSQL pipeline?
> >
> > Or is this expected behavior of some operator's parallelism not assigned
> > from default parallelism but from another operator's parallelism?
> >
> > Want to understand if this is a bug or intended behavior.
> >
> > Thank you.
> >
> >
>