Re: Pre shuffle aggregation in flink is not working

2021-08-21 Thread suman shil
gger<>(2);
KeySelector taxiFareLongKeySelector = new
KeySelector() {@Overridepublic Long
getKey(TaxiFare value) throws Exception {return
value.driverId;}};MyAggregator aggregator = new MyAggregator<>(10,
mapBundleFunction, taxiFareLongKeySelector);DataStream> hourlyTips =fares.keyBy((TaxiFare
fare) -> fare.driverId)
 .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
AddTips());;DataStream> hourlyMax =

hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).maxBy(2);
  printOrTest(hourlyMax);// execute the transformation
pipelineenv.execute("Hourly Tips (java)");}/* * Wraps
the pre-aggregated result into a tuple along with the window's timestamp
and key. */public static class AddTipsextends
ProcessWindowFunction, Long,
TimeWindow> {@Overridepublic void process(
Long key,Context context,Iterable
fares,Collector> out) {
  float sumOfTips = 0F;for (TaxiFare f : fares) {
  sumOfTips += f.tip;}
out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}}}*

On Fri, Aug 20, 2021 at 4:18 AM JING ZHANG  wrote:

> Hi Suman,
> > But I am always seeing the following code of `
> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0.
> It is weird, please set a breakpoint at line `
> *bundleTrigger.onElement(input);*`  in `*processElement*` method to see
> what happens when a record is processed by `*processElement*`.
>
> > One more question, you mentioned that I need to test with `
> *LinkedHashMap*` instead of `*HashMap*`. Where should I make this change?
> You could copy the class  `AbstractMapBundleOperator`, and update the
> bundle initialization code in the `open` method.
> Besides, MapBundleFunction, MapBundleOperator, and CountBundleTrigger are
> not marked  as @public, they have no guarantee of compatibility.
> You'd better copy them for your own use.
>
> Best,
> JING ZHANG
>
> suman shil  于2021年8月20日周五 下午2:18写道:
>
>> Hi Jing,
>> I tried using `*MapBundleOperator*` also (I am yet to test with
>> LinkedHashMap) . But I am always seeing that the following code of `
>> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
>> never getting incremented. I replaced `*TaxiFareStream*` with `
>> *MapBundleOperator*` in the above code. It should increment by 1
>> each time an element is processed but that is not happening.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public void processElement(StreamRecord element) throws
>> Exception {// get the key and value for the map bundlefinal
>> IN input = element.getValue();final K bundleKey = getKey(input);
>> final V bundleValue = bundle.get(bundleKey);// get a new value
>> after adding this element to bundlefinal V newBundleValue =
>> function.addInput(bundleValue, input);// update to map bundle
>>   bundle.put(bundleKey, newBundleValue);numOfElements++;
>> bundleTrigger.onElement(input);}*
>>
>> One more question, you mentioned that I need to test with `
>> *LinkedHashMap*` instead of `*HashMap*`. Where should I make this
>> change? Do I need to create a class which extends from `MapBundleOperator`
>> and add it there?
>>
>> Thanks
>>
>>
>> On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG  wrote:
>>
>>> Hi Suman,
>>> Please try copy `*MapBundleOperator*`, update the `HashMap` to
>>> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> suman shil  于2021年8月20日周五 上午2:23写道:
>>>
>>>> Hi Jing,
>>>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I
>>>> was following this link
>>>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>>>> . Please let me know if there is any other way of aggregating elements
>>>> locally.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *public class TaxiFareStream extends MapBundleOperator>>> TaxiFare, TaxiFare> {private KeySelector keySelector;
>>>>   public TaxiFareStream(MapBundleFunction>>> TaxiFare>

Re: Pre shuffle aggregation in flink is not working

2021-08-20 Thread suman shil
Hi Jing,
I tried using `*MapBundleOperator*` also (I am yet to test with
LinkedHashMap) . But I am always seeing that the following code of `
*AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
never getting incremented. I replaced `*TaxiFareStream*` with `
*MapBundleOperator*` in the above code. It should increment by 1 each time
an element is processed but that is not happening.
















*public void processElement(StreamRecord element) throws Exception
{// get the key and value for the map bundlefinal IN input
= element.getValue();final K bundleKey = getKey(input);
final V bundleValue = bundle.get(bundleKey);// get a new value
after adding this element to bundlefinal V newBundleValue =
function.addInput(bundleValue, input);// update to map bundle
  bundle.put(bundleKey, newBundleValue);numOfElements++;
bundleTrigger.onElement(input);}*

One more question, you mentioned that I need to test with `*LinkedHashMap*`
instead of `*HashMap*`. Where should I make this change? Do I need to
create a class which extends from `MapBundleOperator` and add it there?

Thanks


On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG  wrote:

> Hi Suman,
> Please try copy `*MapBundleOperator*`, update the `HashMap` to
> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>
> Best,
> JING ZHANG
>
> suman shil  于2021年8月20日周五 上午2:23写道:
>
>> Hi Jing,
>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
>> following this link
>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>> . Please let me know if there is any other way of aggregating elements
>> locally.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class TaxiFareStream extends MapBundleOperator> TaxiFare, TaxiFare> {private KeySelector keySelector;
>>   public TaxiFareStream(MapBundleFunction> TaxiFare> userFunction,  BundleTrigger
>> bundleTrigger,  KeySelector
>> keySelector) {super(userFunction, bundleTrigger, keySelector);
>>   this.keySelector = keySelector;}@Overrideprotected Long
>> getKey(TaxiFare input) throws Exception {return
>> keySelector.getKey(input);}}*
>>
>> Thanks
>>
>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG  wrote:
>>
>>> Hi Suman,
>>> Would you please provide the code about `*TaxiFareStream*`? It seems we
>>> could use `MapBundleOperator` directly here.
>>> BTW, I have some concerns about using the solution to do
>>> local-aggregation for window aggregation because `MapBundleOperator`
>>> would save input data in a bundle which is a HashMap object which could
>>> not keep the data input sequence. I'm afraid there exists
>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it
>>> is reasonable to assign a watermark based on an unordered
>>> timestamp.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>>
>>>
>>> suman shil  于2021年8月19日周四 下午12:43写道:
>>>
>>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>>> MapBundle implementation.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *public class TaxiFareMapBundleFunction extends MapBundleFunction>>> TaxiFare, TaxiFare, TaxiFare> {@Overridepublic TaxiFare
>>>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>>>   if (value == null) {return input;}value.tip =
>>>> value.tip + input.tip;return value;}@Overridepublic
>>>> void finishBundle(Map buffer, Collector out)
>>>> throws Exception {for (Map.Entry entry :
>>>> buffer.entrySet()) {out.collect(entry.getValue());}
>>>> }}*
>>>>
>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
>>>> is not working as the "*count*" variable is always 0. Please let me
>>>> know If I am missing something.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *@Overridepublic void onElement(T el

Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread suman shil
Hi Jing,
Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
following this link
http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
. Please let me know if there is any other way of aggregating elements
locally.














*public class TaxiFareStream extends MapBundleOperator {private KeySelector keySelector;
  public TaxiFareStream(MapBundleFunction userFunction,  BundleTrigger
bundleTrigger,  KeySelector
keySelector) {super(userFunction, bundleTrigger, keySelector);
  this.keySelector = keySelector;}@Overrideprotected Long
getKey(TaxiFare input) throws Exception {return
keySelector.getKey(input);}}*

Thanks

On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG  wrote:

> Hi Suman,
> Would you please provide the code about `*TaxiFareStream*`? It seems we
> could use `MapBundleOperator` directly here.
> BTW, I have some concerns about using the solution to do local-aggregation
> for window aggregation because `MapBundleOperator`
> would save input data in a bundle which is a HashMap object which could
> not keep the data input sequence. I'm afraid there exists
> unorder in a bundle (in your case 10) problem. I'm not sure whether it is
> reasonable to assign a watermark based on an unordered
> timestamp.
>
> Best,
> JING ZHANG
>
>
>
> suman shil  于2021年8月19日周四 下午12:43写道:
>
>> I am trying to do pre shuffle aggregation in flink. Following is the
>> MapBundle implementation.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class TaxiFareMapBundleFunction extends MapBundleFunction> TaxiFare, TaxiFare, TaxiFare> {@Overridepublic TaxiFare
>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>   if (value == null) {return input;}value.tip =
>> value.tip + input.tip;return value;}@Overridepublic
>> void finishBundle(Map buffer, Collector out)
>> throws Exception {for (Map.Entry entry :
>> buffer.entrySet()) {out.collect(entry.getValue());}
>> }}*
>>
>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
>> not working as the "*count*" variable is always 0. Please let me know If
>> I am missing something.
>>
>>
>>
>>
>>
>>
>>
>>
>> *@Overridepublic void onElement(T element) throws Exception {
>> count++;if (count >= maxCount) {
>> callback.finishBundle();reset();}}*
>>
>> Here is the main code.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *MapBundleFunction
>> mapBundleFunction = new TaxiFareMapBundleFunction();
>> BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
>>   KeySelector taxiFareLongKeySelector = new
>> KeySelector() {@Overridepublic Long
>> getKey(TaxiFare value) throws Exception {return
>> value.driverId;}};DataStream> Float>> hourlyTips =//fares.keyBy((TaxiFare
>> fare) -> fare.driverId)//
>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>> AddTips());;fares.transform("preshuffle",
>> TypeInformation.of(TaxiFare.class),new
>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>> )).assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) {
>>   @Overridepublic long
>> extractTimestamp(TaxiFare element) {return
>> element.startTime.getEpochSecond();}
>> }).keyBy((TaxiFare fare) ->
>> fare.driverId)
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>   .process(new AddTips());DataStream>
>> hourlyMax =
>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>
>> Thanks
>> Suman
>>
>


Pre shuffle aggregation in flink is not working

2021-08-18 Thread suman shil
I am trying to do pre shuffle aggregation in flink. Following is the
MapBundle implementation.



















*public class TaxiFareMapBundleFunction extends MapBundleFunction {@Overridepublic TaxiFare
addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
  if (value == null) {return input;}value.tip =
value.tip + input.tip;return value;}@Overridepublic
void finishBundle(Map buffer, Collector out)
throws Exception {for (Map.Entry entry :
buffer.entrySet()) {out.collect(entry.getValue());}
}}*

I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
not working as the "*count*" variable is always 0. Please let me know If I
am missing something.








*@Overridepublic void onElement(T element) throws Exception {
  count++;if (count >= maxCount) {
callback.finishBundle();reset();}}*

Here is the main code.


























*MapBundleFunction
mapBundleFunction = new TaxiFareMapBundleFunction();
BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
  KeySelector taxiFareLongKeySelector = new
KeySelector() {@Overridepublic Long
getKey(TaxiFare value) throws Exception {return
value.driverId;}};DataStream> hourlyTips =//fares.keyBy((TaxiFare
fare) -> fare.driverId)//
.window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
AddTips());;fares.transform("preshuffle",
TypeInformation.of(TaxiFare.class),new
TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
)).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) {
  @Overridepublic long
extractTimestamp(TaxiFare element) {return
element.startTime.getEpochSecond();}
}).keyBy((TaxiFare fare) ->
fare.driverId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
  .process(new AddTips());DataStream>
hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*

Thanks
Suman


Flink application question

2021-08-09 Thread suman shil
I am writing a Flink application which consumes time series data from kafka
topic. Time series data has components like metric name, tag key value
pair, timestamp and a value. I have created a tumbling window to aggregate
data based on a metric key (which is a combination of metric name, key
value pair and timestamp). Here is the main stream looks like

kafka source -> Flat Map which parses and emits Metric ->  Key by metric
key  -> Tumbling window of 60 seconds -> Aggregate the data -> write to the
sync.

I also want to check if there is any metric which arrived late outside the
above window. I want to check how many metrics arrived late and calculate
the percentage of late metrics compared to original metrics. I am thinking
of using the "allowedLateness" feature of flink to send the late metrics to
a different stream. I am planning to add a "MapState" in the main
"Aggregate the data" operator which will have the key as the metric key and
value as the count of the metrics that arrived in the main window.


kafka source -> Flat Map which parses and emits Metric -> Key by metric key
->  Tumbling window of 60 seconds -> Aggregate the data (Maintain a map
state of metric count) -> write to the sync.

   \

\

  Late data -> Key by
metric key ->  Collect late metrics and find the percentage of late metrics
-> Write the result in sink

My question is can "Collect late metrics and find the percentage of late
metrics" operator access the "MapState" which got updated by the
mainstream. Even though they are keyed by the same metric key, I guess they
are two different tasks. I want to calculate (number of late metrics /
(number of late metrics + number of metrics arrived on time)).

Thanks
Suman


Is FlinkKafkaConsumer setStartFromLatest() method needed when we use auto.offset.reset=latest kafka properties

2021-08-04 Thread suman shil
In my flink streaming application I have kafka datasource. I am using the
kafka property auto.offset.reset=latest. I am wondering if I need to use
FlinkKafkaConsumer.setStartFromLatest(). Are they similar? Can I use either
of them? Following is the documentation from flink code.
/**
 * Specifies the consumer to start reading from the latest offset for all
partitions. This lets
 * the consumer ignore any committed group offsets in Zookeeper / Kafka
brokers.
 *
 * This method does not affect where partitions are read from when the
consumer is restored
 * from a checkpoint or savepoint. When the consumer is restored from a
checkpoint or savepoint,
 * only the offsets in the restored state will be used.
 *
 * @return The consumer object, to allow function chaining.
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}

Thanks