Thanks Edward for your response.
The problem I have is that I am not sure how to add or remove keyBy's at run 
time since the flink topology is based on that (as Caizhi mentioned).
I believe we can change the single keyBy in your example, but not add/remove 
them.  
Please let me know if I have missed anything.


    On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward 
<edward.colle...@fmr.com> wrote:  
 
 <!--#yiv1128348432 _filtered {} _filtered {} _filtered {} _filtered {} 
_filtered {} _filtered {}#yiv1128348432 #yiv1128348432 
p.yiv1128348432MsoNormal, #yiv1128348432 li.yiv1128348432MsoNormal, 
#yiv1128348432 div.yiv1128348432MsoNormal 
{margin:0in;font-size:11.0pt;font-family:"Calibri", sans-serif;}#yiv1128348432 
a:link, #yiv1128348432 span.yiv1128348432MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv1128348432 
p.yiv1128348432MsoListParagraph, #yiv1128348432 
li.yiv1128348432MsoListParagraph, #yiv1128348432 
div.yiv1128348432MsoListParagraph 
{margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;font-size:11.0pt;font-family:"Calibri",
 sans-serif;}#yiv1128348432 span.yiv1128348432EmailStyle19 
{font-family:"Calibri", sans-serif;color:windowtext;}#yiv1128348432 
.yiv1128348432MsoChpDefault {font-size:10.0pt;} _filtered {}#yiv1128348432 
div.yiv1128348432WordSection1 {}#yiv1128348432 _filtered {} _filtered {} 
_filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} 
_filtered {} _filtered {}#yiv1128348432 ol {margin-bottom:0in;}#yiv1128348432 
ul {margin-bottom:0in;}-->
  
 
A general pattern for dynamically adding new aggregations could be something 
like this
 
  
 
        BroadcastStream<AggregationInstructions> broadcastStream = 
aggregationInstructions
 
            .broadcast(broadcastStateDescriptor);
 
  
 
        DataStream<DataToAggregateEnrichedWithAggregationInstructions> 
streamReadyToAggregate = dataToAggregate
 
            .connect(broadcastStream)
 
            .process(new JoinFunction())
 
            .flatMap(new AddAggregationKeyAndDescriptor)
 
            .keyBy('aggregationKey')
 
  
 
Where
    
   - aggregationInstructions is a stream describing which fields to aggregate 
by.  It might contain a List<String> of the field names and another field which 
can be used to describe what the aggregation is doing.   Example  
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = 
‘bySession’, action = ‘Add’ or  ‘Delete’
   - JoinFunction is a KeyedBroadcastProcessFunction which adds the 
groupByFields and groupingType to each message in the dataToAggregate stream 
and possibly deletes groupings from state.
   - AddAggregationKeyAndDescriptor is a FlatMapFunction which adds 
aggregationKey to the stream based on the value of groupByFields
 
  
 
The flatMap means one message may be emitted several times with different 
values of aggregationKey so it may belong to multiple aggregations.
 
  
 
  
 
  
 
From: M Singh <mans2si...@yahoo.com> 
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng <tsreape...@gmail.com>; User-Flink <user@flink.apache.org>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application
 
  
 
NOTICE: This email is from an external sender -do not click on links or 
attachments unless you recognize the sender and know the content is safe.
 
  
 
Hi Caizhi:
 
  
 
Thanks for your reply.
 
  
 
I need to aggregate streams based on dynamic groupings.  All the groupings 
(keyBy) are not known upfront and can be added or removed after the streaming 
application is started and I don't want to restart the application/change the 
code.  So, I wanted to find out, what are the options to achieve this 
functionality.  Please let me know if you have any advice or recommendations.
 
  
 
Thanks
 
  
 
On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng 
<tsreape...@gmail.com> wrote:
 
  
 
  
 
Hi!
 
  
 
Adding/removing keyed streams will change the topology graph of the job. 
Currently it is not possible to do so without restarting the job and as far as 
I know there is no existing framework/pattern to achieve this.
 
  
 
By the way, why do you need this functionality? Could you elaborate more on 
your use case?
 
  
 
M Singh <mans2si...@yahoo.com>于2022年1月22日周六 21:32写道:
 

Hi Folks:
 
  
 
I am working on an exploratory project in which I would like to add/remove 
KeyedStreams 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
 without restarting the Flink streaming application.
 
  
 
Is it possible natively in Apache Flink ?  If not, is there any 
framework/pattern which can be used to implement this without restarting the 
application/changing the code ?
 
  
 
Thanks
 
  
 
  
 
  
 
  
 
  
 
  
 
  
 
  
 
  

Reply via email to