Re: Periodic flush sink?

2017-04-30 Thread Kamil Dziublinski
Hi Niels,

This sounds to me like a great use case for using window functions. You
could partition your data (use keyby) based on website and then hold your
window for certain amount of time. After that you could give your sink
already batched object and store it directly. On top of that if you are
worried that data might become too big in fixed window time you could use a
trigger that fires both based on time and size. Although imo its no problem
to have bigger put for hbase. But you need to test.
I have very similar use case with kafka and hbase and I solved it like
that.
Hope that helps.
On Sat, 29 Apr 2017 at 18:05, Niels Basjes  wrote:

> Thanks.
>
> The specific table I have here is used to debugging purposes so at the
> HBase level I set a TTL of the data of 12 hours.
> So I'm not worrying about the Hfiles.
> Doing a lot of 'small' calls has an impact on HBase as a whole (not just
> this table) so I want buffering.
> Having a buffer that can hold 1000 events and at times I create 10 events
> with a single page and I'm the only on on the site (at that moment) the
> events will be buffered for a much too long time.
>
> I did a quick test and this seems to work for my case.
> In what situations do you guys expect this code construct to fail? Any
> edge cases I missed?
>
> Niels
>
> private transient BufferedMutator mutator = null;
> private transient Timer timer = null;
>
> @Override
> public void open(Configuration parameters) throws Exception {
>   org.apache.hadoop.conf.Configuration hbaseConfig = 
> HBaseConfiguration.create();
>   Connection connection = ConnectionFactory.createConnection(hbaseConfig);
>
>   mutator = connection.getBufferedMutator(
> new BufferedMutatorParams(TableName.valueOf(tableName))
>   .pool(getDefaultExecutor(hbaseConfig))
>   .writeBufferSize(HBASE_BUFFER_SIZE)
>   );
>
>   timer = new Timer();
>   timer.schedule(new TimerTask(){
> @Override
> public void run() {
>   try {
> MySink.this.mutator.flush();
>   } catch (Exception e) {
> // Ignore
>   }
> }}, HBASE_BUFFER_AUTO_FLUSH_INTERVAL, HBASE_BUFFER_AUTO_FLUSH_INTERVAL);
> }
>
> @Override
> public void close() throws IOException {
>   timer.cancel();
>   mutator.close();
> }
>
>
>
>
>
> On Sat, Apr 29, 2017 at 4:57 PM, Ted Yu  wrote:
>
>> I expect Flink expert to answer your question.
>>
>> bq. I get a flush of the buffers atleast every few seconds
>>
>> From hbase point of view, during low traffic period, the above may result
>> in many small hfiles, leading to more work for the compaction.
>>
>> FYI
>>
>> On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes  wrote:
>>
>>> Hi,
>>>
>>> I have a sink that writes my records into HBase.
>>>
>>> The data stream is attached to measurements from an internal testing
>>> instance of the website.
>>> As a consequence there are periods of really high load (someone is doing
>>> a load test) and really low load (only a hand full of people are testing
>>> stuff).
>>>
>>> I read the records from Kafka and I want to write the records into HBase.
>>> Because under high load it is more efficient to buffer the writes
>>> between the client and the server and as indicated by HBase I use a
>>> BufferedMutator.
>>>
>>> This BufferedMutator works with a 'fixed size' buffer and under high
>>> load setting it to a few MiB improves the performance writing to HBase
>>> greatly.
>>> However under low load you have to wait until the buffer is full and
>>> that can be a LONG time (hours) when the load is really low.
>>>
>>> I want to fire a periodic event into my sink to ensure I get a flush of
>>> the buffers atleast every few seconds.
>>>
>>> Simply implement a standard Java  TimerTask and fire that using a Timer?
>>> Or is there a better way of doing that in Flink?
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Fault tolerance & idempotency on window functions

2017-04-29 Thread Kamil Dziublinski
Big thanks for replying Aljoscha, I spend quite some time on thinking how
to solve this problem and came to some conclusions. Would be cool if you
can verify if my logic is correct.

I decided that if I will partition data in kafka in the same way as I
partition my window with keyby. It's tenant, user combination (I would
still use hash out of it in kafka producer) and I will switch processing to
event time (currently it was processing time) then during replay I could be
100% sure that first element will always be first, and watermark for
triggering the window would also come at the same moment. This giving me
idempotent writes of this batched object to HBase.

And for late events (by configuring lateness on the window itself) I would
configure the trigger to fire & purge, so that it doesn't hold fired data.
This way if late event arrives I could fire this late event with a
different timestamp treating it in hbase as totally separate increment, not
overriding my previous data.
The reason I want to purge data here on firing, is cause I would need to
have allowed lateness on window of at least 2 months. So holding all data
after firing for 2 months would be too costly.
Additional question here, is there any cost to having allowed lateness very
high (like 2 months) if we configure trigger to fire & purge. Like any
additional state or metadata that flinks need to maintain that would take
much memory from the cluster? Would I have to consider rocksdb here for
state or FS state could still work?

On Fri, Apr 28, 2017 at 5:54 PM Aljoscha Krettek 
wrote:

> Hi,
> Yes, your analysis is correct: Flink will not retry for individual
> elements but will restore from the latest consistent checkpoint in case of
> failure. This also means that you can get different window results based on
> which element arrives first, i.e. you have a different timestamp on your
> output in that case.
>
> One simple mitigation for the timestamp problem is to use the largest
> timestamp of elements within a window instead of the first timestamp. This
> will be stable across restores even if the order of arrival of elements
> changes. You can still get problems when it comes to late data and window
> triggering, if you cannot guarantee that your watermark is 100 % correct,
> though. I.e. it might be that, upon restore, an element with an even larger
> timestamp arrives late that was not considered when doing the first
> processing that failed.
>
> Best,
> Aljoscha
> > On 25. Apr 2017, at 19:54, Kamil Dziublinski <
> kamil.dziublin...@gmail.com> wrote:
> >
> > Hi guys,
> >
> > I have a flink streaming job that reads from kafka, creates some
> statistics increments and stores this in hbase (using normal puts).
> > I'm using fold function here of with window of few seconds.
> >
> > My tests showed me that restoring state with window functions is not
> exactly working how I expected.
> > I thought that if my window functions emits an aggregated object to a
> sink, and that object fails in a sink, this write to hbase will be
> replayed. So even if it actually got written to HBase, but flink thought it
> didnt (for instance during network problem) I could be sure of idempotent
> writes. I wanted to enforce that by using the timestamp of the first event
> used in that window for aggregation.
> >
> > Now correct me if I'm wrong but it seems that in the case of failure
> (even if its in sink) whole flow is getting replayed from last checkpoint
> which means that my window function might evict aggregated object in a
> different form. For instance not only having tuples that failed but also
> other ones, which would break my idempotency her and I might end up with
> having higher counters than I should have.
> >
> > Do you have any suggestion on how to solve/workaround such problem in
> flink?
> >
> > Thanks,
> > Kamil.
> >
> >
>
>


Fault tolerance & idempotency on window functions

2017-04-25 Thread Kamil Dziublinski
Hi guys,

I have a flink streaming job that reads from kafka, creates some statistics
increments and stores this in hbase (using normal puts).
I'm using fold function here of with window of few seconds.

My tests showed me that restoring state with window functions is not
exactly working how I expected.
I thought that if my window functions emits an aggregated object to a sink,
and that object fails in a sink, this write to hbase will be replayed. So
even if it actually got written to HBase, but flink thought it didnt (for
instance during network problem) I could be sure of idempotent writes. I
wanted to enforce that by using the timestamp of the first event used in
that window for aggregation.

Now correct me if I'm wrong but it seems that in the case of failure (even
if its in sink) whole flow is getting replayed from last checkpoint which
means that my window function might evict aggregated object in a different
form. For instance not only having tuples that failed but also other ones,
which would break my idempotency her and I might end up with having higher
counters than I should have.

Do you have any suggestion on how to solve/workaround such problem in flink?

Thanks,
Kamil.


Re: Key by Task number

2017-04-18 Thread Kamil Dziublinski
I am not sure if you really need a keyby, your load will be distributed
among your map function without it.  But could you explain a bit what is
your sink doing?


As for setting parallelism on the consumer remember that you wont have
higher parallelism than number of partitions in your topic.
If you have 240 partitions that's fine, but if you have less than other
subtasks will be idle. Only one task can read from one partition in
parallel.

On Tue, Apr 18, 2017 at 3:38 PM Telco Phone  wrote:

>
> I am trying to use the task number as a keyby value to help fan out the
> work load reading from kafka.
>
>
> Given:
>
>DataStream stream =
> env.addSource(new
> FlinkKafkaConsumer010("topicA", schema, properties)
> ).setParallelism(240).flatMap(new
> SchemaRecordSplit()).setParallelism(240).
> name("TopicA splitter").keyBy("partition",
> "keyByHelper", "schemaId");
>
> stream.addSink(new CustomMaprFsSink()).name("TopicA
> Sink").setParallelism(240);
>
>
> In the DeserialClass I am trying to get to the
>
> getRuntimeContext().getIndexOfThisSubtask();
>
> Which is only avaliable in the RichSinkFunction
>
>
>
> The above is partition (by hour) , schemaID (avro schemaId) and I would
> like to add the task number so that all 240 readers / writers have
> something to do.
>
> Any ideas ?
>
>
>
>


Re: Submit Flink job programatically

2017-04-07 Thread Kamil Dziublinski
Hey,

I had a similar problem when I tried to list the jobs and kill one by name
in yarn cluster. Initially I also tried to set YARN_CONF_DIR but it didn't
work.
What helped tho was passing hadoop conf dir to my application when starting
it. Like that:
java -cp application.jar:/etc/hadoop/conf

Reason was that my application was finding default configuration coming
from hadoop dependency in fat jar and was not even trying to look for
anything in environment variable.
When I passed hadoop conf dir to it, it started working properly.

Hope it helps,

Cheers,
Kamil.

On Fri, Apr 7, 2017 at 8:04 AM, Jins George  wrote:

> Hello Community,
>
> I have a need to submit  flink job to a remote Yarn cluster
> programatically . I tried to use YarnClusterDescriptor.deploy() , but I get
> message
> *RMProxy.java:92:main] - Connecting to ResourceManager at /0.0.0.0:8032
> . *It is trying to connect the resouce manager on
> the client machine.  I have set the YARN_CONF_DIR on the client machine
> and placed yarn-site.xml , core-site.xml etc.  However it does not seems to
> be picking these files.
>
> Is this the right way to sumit to a Remote Yarn cluster ?
>
>
> Thanks,
> Jins George
>


Re: PartitionNotFoundException on deploying streaming job

2017-04-05 Thread Kamil Dziublinski
Ok thanks I will try to debug it.
But my initial thought was that it should be possible to increase some
timeout/wait value to not have it. If it only occurs during initial start
and after restarting works fine.
Any idea of such property in flink?

On Tue, Apr 4, 2017 at 6:03 PM, Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Hi Kamil,
>
>  When the producer receives the PartitionRequest from downstream task,
> first it will check whether the requested partition is already registered.
> If not, it will reponse PartitionNotFoundException.
> And the upstream task is submitted and begins to run, it will registered
> all its partitions into ResultPartitionManager. So your case is that the
> partition request is arrived before the partition registration.
> Maybe the upstream task is submitted delay by JobManager or some logics
> delay before register task in NetworkEnvironment. You can debug the
> specific status in upstream when response the PartitionNotFound to track
> the reason. Wish your further findings!
>
> Cheers,
> Zhijiang
>
> ------
> 发件人:Kamil Dziublinski 
> 发送时间:2017年4月4日(星期二) 17:20
> 收件人:user 
> 主 题:PartitionNotFoundException on deploying streaming job
>
> Hi guys,
>
> When I run my streaming job I almost always have initially
> PartitionNotFoundException. Job fails, after that restarts and it runs ok.
> I wonder what is causing that and if I can adjust some parameters to not
> have this initial failure.
>
> I have flink session on yarn with 55 task managers. 4 cores and 4gb per TM.
> This setup is using 77% of my yarn cluster.
>
> Any ideas?
>
> Thanks,
> Kamil.
>
>
>


PartitionNotFoundException on deploying streaming job

2017-04-04 Thread Kamil Dziublinski
Hi guys,

When I run my streaming job I almost always have initially
PartitionNotFoundException. Job fails, after that restarts and it runs ok.
I wonder what is causing that and if I can adjust some parameters to not
have this initial failure.

I have flink session on yarn with 55 task managers. 4 cores and 4gb per TM.
This setup is using 77% of my yarn cluster.

Any ideas?

Thanks,
Kamil.


Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-31 Thread Kamil Dziublinski
yep I meant 120 per second :)

On Fri, Mar 31, 2017 at 11:19 AM, Ted Yu  wrote:

> The 1,2million seems to be European notation.
>
> You meant 1.2 million, right ?
>
> On Mar 31, 2017, at 1:19 AM, Kamil Dziublinski <
> kamil.dziublin...@gmail.com> wrote:
>
> Hi,
>
> Thanks for the tip man. I tried playing with this.
> Was changing fetch.message.max.bytes (I still have 0.8 kafka) and
> also socket.receive.buffer.bytes. With some optimal settings I was able
> to get to 1,2 million reads per second. So 50% increase.
> But that unfortunately does not increase when I enable hbase sink again.
> So it means that backpressure kicks in and hbase writing is here limiting
> factor. I will try to tweak this a bit more if I find something I will
> share.
>
> Cheers,
> Kamil.
>
> On Thu, Mar 30, 2017 at 12:45 PM, Tzu-Li (Gordon) Tai  > wrote:
>
>> I'm wondering what I can tweak further to increase this. I was reading in
>> this blog: https://data-artisans.com/extending-the-yahoo-streamin
>> g-benchmark/
>> about 3 millions per sec with only 20 partitions. So i'm sure I should be
>> able to squeeze out more out of it.
>>
>>
>> Not really sure if it is relevant under the context of your case, but you
>> could perhaps try tweaking the maximum size of Kafka records fetched on
>> each poll on the partitions.
>> You can do this by setting a higher value for “max.partition.fetch.bytes”
>> in the provided config properties when instantiating the consumer; that
>> will directly configure the internal Kafka clients.
>> Generally, all Kafka settings are applicable through the provided config
>> properties, so you can perhaps take a look at the Kafka docs to see what
>> else there is to tune for the clients.
>>
>> On March 30, 2017 at 6:11:27 PM, Kamil Dziublinski (
>> kamil.dziublin...@gmail.com) wrote:
>>
>> I'm wondering what I can tweak further to increase this. I was reading in
>> this blog: https://data-artisans.com/extending-the-yahoo-streamin
>> g-benchmark/
>> about 3 millions per sec with only 20 partitions. So i'm sure I should be
>> able to squeeze out more out of it.
>>
>>
>


Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-31 Thread Kamil Dziublinski
Hi,

Thanks for the tip man. I tried playing with this.
Was changing fetch.message.max.bytes (I still have 0.8 kafka) and
also socket.receive.buffer.bytes. With some optimal settings I was able to
get to 1,2 million reads per second. So 50% increase.
But that unfortunately does not increase when I enable hbase sink again. So
it means that backpressure kicks in and hbase writing is here limiting
factor. I will try to tweak this a bit more if I find something I will
share.

Cheers,
Kamil.

On Thu, Mar 30, 2017 at 12:45 PM, Tzu-Li (Gordon) Tai 
wrote:

> I'm wondering what I can tweak further to increase this. I was reading in
> this blog: https://data-artisans.com/extending-the-yahoo-
> streaming-benchmark/
> about 3 millions per sec with only 20 partitions. So i'm sure I should be
> able to squeeze out more out of it.
>
>
> Not really sure if it is relevant under the context of your case, but you
> could perhaps try tweaking the maximum size of Kafka records fetched on
> each poll on the partitions.
> You can do this by setting a higher value for “max.partition.fetch.bytes”
> in the provided config properties when instantiating the consumer; that
> will directly configure the internal Kafka clients.
> Generally, all Kafka settings are applicable through the provided config
> properties, so you can perhaps take a look at the Kafka docs to see what
> else there is to tune for the clients.
>
> On March 30, 2017 at 6:11:27 PM, Kamil Dziublinski (
> kamil.dziublin...@gmail.com) wrote:
>
> I'm wondering what I can tweak further to increase this. I was reading in
> this blog: https://data-artisans.com/extending-the-yahoo-
> streaming-benchmark/
> about 3 millions per sec with only 20 partitions. So i'm sure I should be
> able to squeeze out more out of it.
>
>


Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-30 Thread Kamil Dziublinski
Thanks Ted, will read about it.

While we are on throughput.
Do you guys have any suggestion on how to optimise kafka reading from
flink?
In my current setup:
Flink is on 15 machines on yarn
Kafka on 9 brokers with 40 partitions. Source parallelism is 40 for flink,
And just for testing I left only filters there without sink to see max
throughput.
I am getting max 800-900k per sec. And definitely not utilising 1gb/s
network. Im more or less utilising only 20-30% of network bandwith.

I'm wondering what I can tweak further to increase this. I was reading in
this blog:
https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be
able to squeeze out more out of it.

On Thu, Mar 30, 2017 at 11:51 AM, Ted Yu  wrote:

> Kamil:
> In the upcoming hbase 2.0 release, there are more write path optimizations
> which would boost write performance further.
>
> FYI
>
> On Mar 30, 2017, at 1:07 AM, Kamil Dziublinski <
> kamil.dziublin...@gmail.com> wrote:
>
> Hey guys,
>
> Sorry for confusion it turned out that I had a bug in my code, when I was
> not clearing this list in my batch object on each apply call. Forgot it has
> to be added since its different than fold.
> Which led to so high throughput. When I fixed this I was back to 160k per
> sec. I'm still investigating how I can speed it up.
>
> As a side note its quite interesting that hbase was able to do 2millions
> puts per second. But most of them were already stored with previous call so
> perhaps internally he is able to distinguish in memory if a put was stored
> or not. Not sure.
>
> Anyway my claim about window vs fold performance difference was wrong. So
> forget about it ;)
>
> On Wed, Mar 29, 2017 at 12:21 PM, Timo Walther  wrote:
>
>> Hi Kamil,
>>
>> the performance implications might be the result of which state the
>> underlying functions are using internally. WindowFunctions use ListState or
>> ReducingState, fold() uses FoldingState. It also depends on the size of
>> your state and the state backend you are using. I recommend the following
>> documentation page. The FoldingState might be deprecated soon, once a
>> better alternative is available: https://ci.apache.org/projects
>> /flink/flink-docs-release-1.2/dev/stream/state.html#using-
>> managed-keyed-state
>>
>> I hope that helps.
>>
>> Regards,
>> Timo
>>
>> Am 29/03/17 um 11:27 schrieb Kamil Dziublinski:
>>
>> Hi guys,
>>
>> I’m using flink on production in Mapp. We recently swapped from storm.
>> Before I have put this live I was doing performance tests and I found
>> something that “feels” a bit off.
>> I have a simple streaming job reading from kafka, doing window for 3
>> seconds and then storing into hbase.
>>
>> Initially we had this second step written with a fold function, since I
>> thought performance and resource wise it’s a better idea.
>> But I couldn’t reach more than 120k writes per second to HBase and I
>> thought hbase sink is a bottlenck here. But then I tried doing the same
>> with window function and my performance jumped to 2 millions writes per
>> second. Just wow :) Comparing to storm where I had max 320k per second it
>> is amazing.
>>
>> Both fold and window functions were doing the same thing, taking together
>> all the records for the same tenant and user (key by is used for that) and
>> putting it in one batched object with arraylists for the mutations on user
>> profile. After that passing this object to the sink. I can post the code if
>> its needed.
>>
>> In case of fold I was just adding profile mutation to the list and in
>> case of window function iterating over all of it and returning this batched
>> entity in one go.
>>
>> I’m wondering if this is expected to have 20 times slower performance
>> just by using fold function. I would like to know what is so costly about
>> this, as intuitively I would expect fold function being a better choice
>> here since I assume that window function is using more memory for buffering.
>>
>> Also my colleagues when they were doing PoC on flink evaluation they were
>> seeing very similar results to what I am seeing now. But they were still
>> using fold function. This was on flink version 1.0.3 and now I am using
>> 1.2.0. So perhaps there is some regression?
>>
>> Please let me know what you think.
>>
>> Cheers,
>> Kamil.
>>
>>
>>
>


Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-30 Thread Kamil Dziublinski
Hey guys,

Sorry for confusion it turned out that I had a bug in my code, when I was
not clearing this list in my batch object on each apply call. Forgot it has
to be added since its different than fold.
Which led to so high throughput. When I fixed this I was back to 160k per
sec. I'm still investigating how I can speed it up.

As a side note its quite interesting that hbase was able to do 2millions
puts per second. But most of them were already stored with previous call so
perhaps internally he is able to distinguish in memory if a put was stored
or not. Not sure.

Anyway my claim about window vs fold performance difference was wrong. So
forget about it ;)

On Wed, Mar 29, 2017 at 12:21 PM, Timo Walther  wrote:

> Hi Kamil,
>
> the performance implications might be the result of which state the
> underlying functions are using internally. WindowFunctions use ListState or
> ReducingState, fold() uses FoldingState. It also depends on the size of
> your state and the state backend you are using. I recommend the following
> documentation page. The FoldingState might be deprecated soon, once a
> better alternative is available: https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/dev/stream/state.
> html#using-managed-keyed-state
>
> I hope that helps.
>
> Regards,
> Timo
>
> Am 29/03/17 um 11:27 schrieb Kamil Dziublinski:
>
> Hi guys,
>
> I’m using flink on production in Mapp. We recently swapped from storm.
> Before I have put this live I was doing performance tests and I found
> something that “feels” a bit off.
> I have a simple streaming job reading from kafka, doing window for 3
> seconds and then storing into hbase.
>
> Initially we had this second step written with a fold function, since I
> thought performance and resource wise it’s a better idea.
> But I couldn’t reach more than 120k writes per second to HBase and I
> thought hbase sink is a bottlenck here. But then I tried doing the same
> with window function and my performance jumped to 2 millions writes per
> second. Just wow :) Comparing to storm where I had max 320k per second it
> is amazing.
>
> Both fold and window functions were doing the same thing, taking together
> all the records for the same tenant and user (key by is used for that) and
> putting it in one batched object with arraylists for the mutations on user
> profile. After that passing this object to the sink. I can post the code if
> its needed.
>
> In case of fold I was just adding profile mutation to the list and in case
> of window function iterating over all of it and returning this batched
> entity in one go.
>
> I’m wondering if this is expected to have 20 times slower performance just
> by using fold function. I would like to know what is so costly about this,
> as intuitively I would expect fold function being a better choice here
> since I assume that window function is using more memory for buffering.
>
> Also my colleagues when they were doing PoC on flink evaluation they were
> seeing very similar results to what I am seeing now. But they were still
> using fold function. This was on flink version 1.0.3 and now I am using
> 1.2.0. So perhaps there is some regression?
>
> Please let me know what you think.
>
> Cheers,
> Kamil.
>
>
>


20 times higher throughput with Window function vs fold function, intended?

2017-03-29 Thread Kamil Dziublinski
Hi guys,

I’m using flink on production in Mapp. We recently swapped from storm.
Before I have put this live I was doing performance tests and I found
something that “feels” a bit off.
I have a simple streaming job reading from kafka, doing window for 3
seconds and then storing into hbase.

Initially we had this second step written with a fold function, since I
thought performance and resource wise it’s a better idea.
But I couldn’t reach more than 120k writes per second to HBase and I
thought hbase sink is a bottlenck here. But then I tried doing the same
with window function and my performance jumped to 2 millions writes per
second. Just wow :) Comparing to storm where I had max 320k per second it
is amazing.

Both fold and window functions were doing the same thing, taking together
all the records for the same tenant and user (key by is used for that) and
putting it in one batched object with arraylists for the mutations on user
profile. After that passing this object to the sink. I can post the code if
its needed.

In case of fold I was just adding profile mutation to the list and in case
of window function iterating over all of it and returning this batched
entity in one go.

I’m wondering if this is expected to have 20 times slower performance just
by using fold function. I would like to know what is so costly about this,
as intuitively I would expect fold function being a better choice here
since I assume that window function is using more memory for buffering.

Also my colleagues when they were doing PoC on flink evaluation they were
seeing very similar results to what I am seeing now. But they were still
using fold function. This was on flink version 1.0.3 and now I am using
1.2.0. So perhaps there is some regression?

Please let me know what you think.

Cheers,
Kamil.