Re: coordinate watermarks between jobs?

2018-05-04 Thread Eron Wright
It might be possible to apply backpressure to the channels that are
significantly ahead in event time.  Tao, it would not be trivial, but if
you'd like to investigate more deeply, take a look at the Flink runtime's
`StatusWatermarkValve` and the associated stream input processors to see
how an operator integrates incoming watermarks.   A key challenge would be
to apply backpressure to the upstream channel for reasons other than the
availability of network buffers.  Take a look at FLINK-7282 which
introduced a credit system that may be useful here.

On Fri, May 4, 2018 at 10:07 AM, Tao Xia  wrote:

> Without throttle, it will eventually ran out of memory.
> I think this is a very common use case for Flink users during stream
> replay or re-process.
> Do we have anything feature planed for it? Would like to contribute on the
> initiative.
>
> On Wed, May 2, 2018 at 2:43 AM, Fabian Hueske  wrote:
>
>> Hi Tao,
>>
>> The watermarks of operators that consume from two (or more) streams are
>> always synced to the lowest watermark.
>> This behavior guarantees that data won't be late (unless it was late when
>> watermarks were assigned). However, the operator will most likely need to
>> buffer more events from the "faster" streams.
>>
>> Right now, it is not possible to throttle faster streams to the pace of
>> the slowest stream.
>>
>> Best, Fabian
>>
>> 2018-04-27 1:05 GMT+02:00 Tao Xia :
>>
>>> Hi All,
>>>   I am trying to reply events from 3 different sources and hopefully in
>>> time sequence, say Stream1, Stream2, Stream3. Since their size vary a lot,
>>> the watermarks on one stream is much faster than other streams.  Is there
>>> any way to coordinate the watermarks between different input streams.
>>> Thanks,
>>> Tao
>>>
>>
>>
>


Use of AggregateFunction's merge() method

2018-05-04 Thread Ken Krugler
I’m trying to figure out when/why the AggregateFunction.merge() 

 method is called in a streaming job, to ensure I’ve implemented it properly.

The documentation for AggregateFunction says "Merging intermediate aggregates 
(partial aggregates) means merging the accumulators.”

But that sounds more like a combiner in batch processing, not streaming.

From the code, it seems like this could be called if a MergingWindowAssigner is 
used, right?

And is there any other situation in streaming where merge() could be called?

Thanks,

— Ken


http://about.me/kkrugler
+1 530-210-6378



Re: Stashing key with AggregateFunction

2018-05-04 Thread Ken Krugler
Hi Fabian & Stefan,

Thanks, and yes that does work more like what I’d expect.

Regards,

— Ken

PS - Just FYI the Java code examples in the documentation referenced below have 
a number of bugs, see FLINK-9299 
.


> On May 4, 2018, at 7:35 AM, Fabian Hueske  wrote:
> 
> Hi Ken,
> 
> You can also use an additional ProcessWindowFunction [1] that is applied on 
> the result of the AggregateFunction to set the key.
> Since the PWF is only applied on the final result, there no overhead 
> (actually, it might even be slightly cheaper because the AggregateFunction 
> can be simpler).
> 
> If you don't want to use a PWF, your approach is the right one.
> 
> Best, Fabian
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
>  
> 
> 
> 2018-05-03 19:53 GMT+02:00 Ken Krugler  >:
> Hi list,
> 
> I was trying different ways to implement a moving average (count based, not 
> time based).
> 
> The blunt instrument approach is to create a custom FlatMapFunction that 
> keeps track of the last N values.
> 
> It seemed like using an AggregateFunction would be most consistent with the 
> Flink API, along the lines of...
> 
> .keyBy(new MyKeySelector())
> .window(GlobalWindows.create())
> .trigger(CountTrigger.of(1))
> .aggregate(new MovingAverageAggregator(10))
> 
> This works, but the API for the AggregateFunction (MovingAverageAggregator) 
> feels a bit odd.
> 
> Specifically, I want to emit a  result from getResult(), 
> but the key isn’t passed to the createAccumulator() method, nor is it passed 
> to the getResult() method. So in the add() method I check if the accumulator 
> I’ve created has a key set, and if not then I extract the key from the record 
> and set it on the accumulator, so I can use it in the getResult() call.
> 
> Is this expected, or am I miss-using the functionality?
> 
> Thanks,
> 
> — Ken


http://about.me/kkrugler
+1 530-210-6378



Re: Flink + Marathon (Mesos) Memory Issues

2018-05-04 Thread hao gao
Hi,

Since you said BucketingSink, I think it may be related to your bucketer.
Let's say you bucket by hour.  In your stream, at a moment, your records'
timestamp ranges from hour 00 to hour 23. Which means in your task, it
needs 24 writers dedicated to each bucket. If you have 4 task slots in a
taskmanager, then there are 24 * 4 writers at the same time. If your writer
is parquet writer, overall they may need lots of memory.
Just my guess

2018-05-04 2:31 GMT-07:00 Stefan Richter :

> Hi,
>
> besides your configured heap size, there is also some off-heap memory used
> in the JVM process, in particular by RocksDB. Each keyed operator instance
> on a TM has its own RocksDB instance, so the question is how many are
> running in one container and what is their configuration? For RocksDB for
> example write_buffer_size (32MB default), write_buffer_count (3 by default)
> and block_cache_size (16 MB default)  contribute per instance. For more
> details, please have a look here: https://github.com/
> facebook/rocksdb/wiki/Memory-usage-in-RocksDB. You might need adjust your
> RocksDB configuration and/or plan your container memory limits accordingly
> to be on the safe side.
>
> Best,
> Stefan
>
> Am 03.05.2018 um 21:59 schrieb ani.desh1512 :
>
> *Background*: We have a setup of Flink 1.4.0. We run this flink
> cluster via /flink-jobmanager.sh foreground/ and /flink-taskmanager.sh
> foreground/ command via Marathon (which launches them as mesos jobs). So,
> basically, jobmanager and taskmanagers run as mesos tasks.
>
>
> Now, say, we run the flink taskmanagers with taskmanager.heap.mb set to 7G
> in flink-conf.yaml and Marathon memory is set to 18G. Even after this, we
> frequently see the taskmanager containers getting killed because of OOM.
> The
> flink streaming job that we run is a basic job without any windowing or
> other stateful operations. Its just a job that reads from a stream, applies
> a bunch of transformations and writes it back via BucketingSink. It uses
> RocksDB as state backend.
>
> So what i am trying to understand is, how is Flink allocating taskmanager
> memory in containers? What would be a safe value for us to set as Marathon
> memory so that our taskmanagers dont keep getting killed? Are we seeing
> this
> behaviour because of starting flink taskmanagers in foreground mode as
> mesos
> task?
>
> Thanks
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>
>


-- 
Thanks
 - Hao


Why FoldFunction deprecated?

2018-05-04 Thread 陈梓立
I just write a code snip like

```
.fold(new Tuple2<>("", 0L), new FoldFunction>() {
@Override
public Tuple2 fold(Tuple2
acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
```

and replace it using `aggregate()`

```
.aggregate(new AggregateFunction,
Tuple2>() {
@Override
public Tuple2 createAccumulator() {
return new Tuple2<>("", 0L);
}

@Override
public Tuple2 add(WikipediaEditEvent
event, Tuple2 acc) {
return new Tuple2<>(event.getUser(), acc.f1 +
event.getByteDiff());
}

@Override
public Tuple2 getResult(Tuple2 acc) {
return acc;
}

@Override
public Tuple2 merge(Tuple2
a, Tuple2 b) {
return new Tuple2<>(a.f0, a.f1 + b.f1);
}
});
```

It seems I have to write much more code using `aggregate()`

Is there something I miss so that write so much code? Or say, maybe
`aggregate()` is expressive, but why `fold()` deprecated? Since `fold` is a
general concept people can understand.


Re: Init RocksDB state backend during startup

2018-05-04 Thread Tao Xia
Also would like to know how to do this if it is possible.

On Fri, May 4, 2018 at 9:31 AM, Peter Zende  wrote:

> Hi,
>
> We use RocksDB with FsStateBackend (HDFS) to store state used by the
> mapWithState operator. Is it possible to initialize / populate this state
> during the streaming application startup?
>
> Our intention is to reprocess the historical data from HDFS in a batch job
> and save the latest state of the records onto HDFS. Thus when we restart
> the streaming job we can just build up or load the most recent view of this
> store.
>
> Many thanks,
> Peter
>


Re: coordinate watermarks between jobs?

2018-05-04 Thread Tao Xia
Without throttle, it will eventually ran out of memory.
I think this is a very common use case for Flink users during stream replay
or re-process.
Do we have anything feature planed for it? Would like to contribute on the
initiative.

On Wed, May 2, 2018 at 2:43 AM, Fabian Hueske  wrote:

> Hi Tao,
>
> The watermarks of operators that consume from two (or more) streams are
> always synced to the lowest watermark.
> This behavior guarantees that data won't be late (unless it was late when
> watermarks were assigned). However, the operator will most likely need to
> buffer more events from the "faster" streams.
>
> Right now, it is not possible to throttle faster streams to the pace of
> the slowest stream.
>
> Best, Fabian
>
> 2018-04-27 1:05 GMT+02:00 Tao Xia :
>
>> Hi All,
>>   I am trying to reply events from 3 different sources and hopefully in
>> time sequence, say Stream1, Stream2, Stream3. Since their size vary a lot,
>> the watermarks on one stream is much faster than other streams.  Is there
>> any way to coordinate the watermarks between different input streams.
>> Thanks,
>> Tao
>>
>
>


Re: Wrong joda lib

2018-05-04 Thread Stefan Richter
Then that is the jar from which it currently takes the code for your DateTime 
class at that point in the code.

> Am 04.05.2018 um 18:29 schrieb Flavio Pompermaier :
> 
> The output of that code is 
> file:/opt/cloudera/parcels/CDH-5.11.2-1.cdh5.11.2.p0.4/jars/jruby-cloudera-1.0.0.jar
> 
> On Fri, May 4, 2018 at 6:11 PM, Stefan Richter  > wrote:
> Hi,
> 
> you can try to figure out the jar with 
> org.joda.time.DateTime.class.getProtectionDomain().getCodeSource().getLocation()
>  in the right context.
> 
> Best,
> Stefan
> 
> > Am 04.05.2018 um 18:02 schrieb Flavio Pompermaier  > >:
> > 
> > Hi to all,
> > I'm trying to run a job on a test cluster with Flink 1.3.1 but my job fails 
> > with the following error:
> > 
> > java.lang.NoSuchMethodError: org.joda.time.DateTime.(I)V
> > 
> > 
> > The job works when I run it from the IDE and in our production 
> > environment...I've looked into all jars within libs and that class is not 
> > present. The class within my shaded jar is indeed correct (I've checked the 
> > decompiled version of the class file)...how can I discover which jar on the 
> > classpath is "obscuring" the class contained in my shaded jar??
> > 
> > Best,
> > Flavio
> 
> 
> 



Init RocksDB state backend during startup

2018-05-04 Thread Peter Zende
Hi,

We use RocksDB with FsStateBackend (HDFS) to store state used by the
mapWithState operator. Is it possible to initialize / populate this state
during the streaming application startup?

Our intention is to reprocess the historical data from HDFS in a batch job
and save the latest state of the records onto HDFS. Thus when we restart
the streaming job we can just build up or load the most recent view of this
store.

Many thanks,
Peter


Re: Wrong joda lib

2018-05-04 Thread Flavio Pompermaier
The output of that code is
file:/opt/cloudera/parcels/CDH-5.11.2-1.cdh5.11.2.p0.4/jars/jruby-cloudera-1.0.0.jar

On Fri, May 4, 2018 at 6:11 PM, Stefan Richter 
wrote:

> Hi,
>
> you can try to figure out the jar with org.joda.time.DateTime.class.
> getProtectionDomain().getCodeSource().getLocation() in the right context.
>
> Best,
> Stefan
>
> > Am 04.05.2018 um 18:02 schrieb Flavio Pompermaier  >:
> >
> > Hi to all,
> > I'm trying to run a job on a test cluster with Flink 1.3.1 but my job
> fails with the following error:
> >
> > java.lang.NoSuchMethodError: org.joda.time.DateTime.(I)V
> >
> >
> > The job works when I run it from the IDE and in our production
> environment...I've looked into all jars within libs and that class is not
> present. The class within my shaded jar is indeed correct (I've checked the
> decompiled version of the class file)...how can I discover which jar on the
> classpath is "obscuring" the class contained in my shaded jar??
> >
> > Best,
> > Flavio
>
>


Re: Wrong joda lib

2018-05-04 Thread Stefan Richter
Why does it make this a problem? You could also run method in a static block 
and move the initialization of your variable at the bottom of that static block 
(or comment it out). 

> Am 04.05.2018 um 18:14 schrieb Flavio Pompermaier :
> 
> The problem is that the problem occurs during the initialization of a static 
> variable:
> 
> private static final DateTime MIN_DATE = new DateTime(1850, 01, 01, 0, 0);
> 
> On Fri, May 4, 2018 at 6:11 PM, Stefan Richter  > wrote:
> Hi,
> 
> you can try to figure out the jar with 
> org.joda.time.DateTime.class.getProtectionDomain().getCodeSource().getLocation()
>  in the right context.
> 
> Best,
> Stefan
> 
> > Am 04.05.2018 um 18:02 schrieb Flavio Pompermaier  > >:
> > 
> > Hi to all,
> > I'm trying to run a job on a test cluster with Flink 1.3.1 but my job fails 
> > with the following error:
> > 
> > java.lang.NoSuchMethodError: org.joda.time.DateTime.(I)V
> > 
> > 
> > The job works when I run it from the IDE and in our production 
> > environment...I've looked into all jars within libs and that class is not 
> > present. The class within my shaded jar is indeed correct (I've checked the 
> > decompiled version of the class file)...how can I discover which jar on the 
> > classpath is "obscuring" the class contained in my shaded jar??
> > 
> > Best,
> > Flavio
> 
> 



Re: Wrong joda lib

2018-05-04 Thread Flavio Pompermaier
The problem is that the problem occurs during the initialization of a
static variable:

private static final DateTime MIN_DATE = new DateTime(1850, 01, 01, 0, 0);

On Fri, May 4, 2018 at 6:11 PM, Stefan Richter 
wrote:

> Hi,
>
> you can try to figure out the jar with org.joda.time.DateTime.class.g
> etProtectionDomain().getCodeSource().getLocation() in the right context.
>
> Best,
> Stefan
>
> > Am 04.05.2018 um 18:02 schrieb Flavio Pompermaier  >:
> >
> > Hi to all,
> > I'm trying to run a job on a test cluster with Flink 1.3.1 but my job
> fails with the following error:
> >
> > java.lang.NoSuchMethodError: org.joda.time.DateTime.(I)V
> >
> >
> > The job works when I run it from the IDE and in our production
> environment...I've looked into all jars within libs and that class is not
> present. The class within my shaded jar is indeed correct (I've checked the
> decompiled version of the class file)...how can I discover which jar on the
> classpath is "obscuring" the class contained in my shaded jar??
> >
> > Best,
> > Flavio
>
>


Re: Wrong joda lib

2018-05-04 Thread Stefan Richter
Hi,

you can try to figure out the jar with 
org.joda.time.DateTime.class.getProtectionDomain().getCodeSource().getLocation()
 in the right context.

Best,
Stefan

> Am 04.05.2018 um 18:02 schrieb Flavio Pompermaier :
> 
> Hi to all,
> I'm trying to run a job on a test cluster with Flink 1.3.1 but my job fails 
> with the following error:
> 
> java.lang.NoSuchMethodError: org.joda.time.DateTime.(I)V
> 
> 
> The job works when I run it from the IDE and in our production 
> environment...I've looked into all jars within libs and that class is not 
> present. The class within my shaded jar is indeed correct (I've checked the 
> decompiled version of the class file)...how can I discover which jar on the 
> classpath is "obscuring" the class contained in my shaded jar??
> 
> Best,
> Flavio



Re: Question about datasource replication

2018-05-04 Thread Fabian Hueske
The spilling will only happen when joining the branched data sets.
If you keep them separate and eventually emit them, no intermediate data
will be spilled.

2018-05-04 18:05 GMT+02:00 Flavio Pompermaier :

> Does this duplication happen when I write directly to disk after the
> flatMaps?
>
> On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske  wrote:
>
>> That will happen if you join (or coGroup) the branched DataSets, i.e.,
>> you have branching and merging pattern in your stream.
>>
>> The problem in that case is that one of the inputs is pipelined (e.g.,
>> the probe side of a hash join) and the other one is blocking.
>> In order to execute such a plan, we must spill the pipelined data set to
>> disk to ensure that the other input can be fully consumed (to build the
>> hash table).
>>
>> There's not really a solution to this.
>> You could change the join strategy to sort-merge-join but this will sort
>> both inputs and also result in spilling both to disk.
>>
>> 2018-05-04 17:25 GMT+02:00 Flavio Pompermaier :
>>
>>> Hi Fabian,
>>> thanks for the detailed reply.
>>> The problem I see is that the source dataset is huge and, since it
>>> doesn't fit in memory, it's spilled twice to disk (I checked the increasing
>>> disk usage during the job and it was corresponding exactly to the size
>>> estimated by the Flink UI, that is twice it's initial size).
>>> Probably there are no problem until you keep data in memory but in my
>>> case it's very problematic this memory explosion :(
>>>
>>> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske  wrote:
>>>
 Hi Flavio,

 No, there's no way around it.
 DataSets that are processed by more than one operator cannot be
 processed by chained operators.
 The records need to be copied to avoid concurrent modifications.
 However, the data should not be shipped over the network if all operators
 have the same parallelism.
 Instead records are serialized and handed over via local byte[]
 in-memory channels.

 Best, Fabian


 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier :

> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>
> On Fri, May 4, 2018 at 2:50 PM, Amit Jain  wrote:
>
>> Hi Flavio,
>>
>> Which version of Flink are you using?
>>
>> --
>> Thanks,
>> Amit
>>
>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>> > Hi all,
>> > I've a Flink batch job that reads a parquet dataset and then
>> applies 2
>> > flatMap to it (see pseudocode below).
>> > The problem is that this dataset is quite big and Flink duplicates
>> it before
>> > sending the data to these 2 operators (I've guessed this from the
>> doubling
>> > amount of sent bytes) .
>> > Is there a way to avoid this behaviour?
>> >
>> > ---
>> > Here's the pseudo code of my job:
>> >
>> > DataSet X = readParquetDir();
>> > X1 = X.flatMap(...);
>> > X2 = X.flatMap(...);
>> >
>> > Best,
>> > Flavio
>>
>
>

>>>
>


Re: Question about datasource replication

2018-05-04 Thread Flavio Pompermaier
Does this duplication happen when I write directly to disk after the
flatMaps?

On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske  wrote:

> That will happen if you join (or coGroup) the branched DataSets, i.e., you
> have branching and merging pattern in your stream.
>
> The problem in that case is that one of the inputs is pipelined (e.g., the
> probe side of a hash join) and the other one is blocking.
> In order to execute such a plan, we must spill the pipelined data set to
> disk to ensure that the other input can be fully consumed (to build the
> hash table).
>
> There's not really a solution to this.
> You could change the join strategy to sort-merge-join but this will sort
> both inputs and also result in spilling both to disk.
>
> 2018-05-04 17:25 GMT+02:00 Flavio Pompermaier :
>
>> Hi Fabian,
>> thanks for the detailed reply.
>> The problem I see is that the source dataset is huge and, since it
>> doesn't fit in memory, it's spilled twice to disk (I checked the increasing
>> disk usage during the job and it was corresponding exactly to the size
>> estimated by the Flink UI, that is twice it's initial size).
>> Probably there are no problem until you keep data in memory but in my
>> case it's very problematic this memory explosion :(
>>
>> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske  wrote:
>>
>>> Hi Flavio,
>>>
>>> No, there's no way around it.
>>> DataSets that are processed by more than one operator cannot be
>>> processed by chained operators.
>>> The records need to be copied to avoid concurrent modifications.
>>> However, the data should not be shipped over the network if all operators
>>> have the same parallelism.
>>> Instead records are serialized and handed over via local byte[]
>>> in-memory channels.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier :
>>>
 Flink 1.3.1 (I'm waiting 1.5 before upgrading..)

 On Fri, May 4, 2018 at 2:50 PM, Amit Jain  wrote:

> Hi Flavio,
>
> Which version of Flink are you using?
>
> --
> Thanks,
> Amit
>
> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
> > Hi all,
> > I've a Flink batch job that reads a parquet dataset and then applies
> 2
> > flatMap to it (see pseudocode below).
> > The problem is that this dataset is quite big and Flink duplicates
> it before
> > sending the data to these 2 operators (I've guessed this from the
> doubling
> > amount of sent bytes) .
> > Is there a way to avoid this behaviour?
> >
> > ---
> > Here's the pseudo code of my job:
> >
> > DataSet X = readParquetDir();
> > X1 = X.flatMap(...);
> > X2 = X.flatMap(...);
> >
> > Best,
> > Flavio
>


>>>
>>


Wrong joda lib

2018-05-04 Thread Flavio Pompermaier
Hi to all,
I'm trying to run a job on a test cluster with Flink 1.3.1 but my job fails
with the following error:

java.lang.NoSuchMethodError: org.joda.time.DateTime.(I)V


The job works when I run it from the IDE and in our production
environment...I've looked into all jars within libs and that class is not
present. The class within my shaded jar is indeed correct (I've checked the
decompiled version of the class file)...how can I discover which jar on the
classpath is "obscuring" the class contained in my shaded jar??

Best,
Flavio


Re: Question about datasource replication

2018-05-04 Thread Fabian Hueske
That will happen if you join (or coGroup) the branched DataSets, i.e., you
have branching and merging pattern in your stream.

The problem in that case is that one of the inputs is pipelined (e.g., the
probe side of a hash join) and the other one is blocking.
In order to execute such a plan, we must spill the pipelined data set to
disk to ensure that the other input can be fully consumed (to build the
hash table).

There's not really a solution to this.
You could change the join strategy to sort-merge-join but this will sort
both inputs and also result in spilling both to disk.

2018-05-04 17:25 GMT+02:00 Flavio Pompermaier :

> Hi Fabian,
> thanks for the detailed reply.
> The problem I see is that the source dataset is huge and, since it doesn't
> fit in memory, it's spilled twice to disk (I checked the increasing disk
> usage during the job and it was corresponding exactly to the size estimated
> by the Flink UI, that is twice it's initial size).
> Probably there are no problem until you keep data in memory but in my case
> it's very problematic this memory explosion :(
>
> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske  wrote:
>
>> Hi Flavio,
>>
>> No, there's no way around it.
>> DataSets that are processed by more than one operator cannot be processed
>> by chained operators.
>> The records need to be copied to avoid concurrent modifications. However,
>> the data should not be shipped over the network if all operators have the
>> same parallelism.
>> Instead records are serialized and handed over via local byte[] in-memory
>> channels.
>>
>> Best, Fabian
>>
>>
>> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier :
>>
>>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>>>
>>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain  wrote:
>>>
 Hi Flavio,

 Which version of Flink are you using?

 --
 Thanks,
 Amit

 On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <
 pomperma...@okkam.it> wrote:
 > Hi all,
 > I've a Flink batch job that reads a parquet dataset and then applies 2
 > flatMap to it (see pseudocode below).
 > The problem is that this dataset is quite big and Flink duplicates it
 before
 > sending the data to these 2 operators (I've guessed this from the
 doubling
 > amount of sent bytes) .
 > Is there a way to avoid this behaviour?
 >
 > ---
 > Here's the pseudo code of my job:
 >
 > DataSet X = readParquetDir();
 > X1 = X.flatMap(...);
 > X2 = X.flatMap(...);
 >
 > Best,
 > Flavio

>>>
>>>
>>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>


Re: This server is not the leader for that topic-partition

2018-05-04 Thread Alexander Smirnov
Thanks for quick turnaround Stefan, Piotr

This is a rare reproducible issue and I will keep an eye on it

searching on the Stack Overflow I found
https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash

They say that the problem is fixed in 0.10.2.1 of kafka producer so I
wonder which version is used in FlinkKafkaProducer integration. For earlier
versions it is proposed to use configuration:

final Properties props = new Properties();...
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
Integer.toString(Integer.MAX_VALUE));props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
2);




On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski 
wrote:

> Hi,
>
> I think Stefan is right. Quick google search points to this:
> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>
> Please let us know if changing your configuration will solve the problem!
>
> Piotrek
>
> On 4 May 2018, at 15:53, Stefan Richter 
> wrote:
>
> Hi,
>
> I think in general this means that your producer client does not connect
> to the correct Broker (the leader) but to a broker that is just a follower
> and the follower can not execute that request. However, I am not sure what
> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC)
> has an idea?
>
> Best,
> Stefan
>
> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <
> alexander.smirn...@gmail.com>:
>
> Hi,
>
> what could cause the following exception?
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed
> to send data to Kafka: This server is not the leader for that
> topic-partition.
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>
>
> Thank you,
> Alex
>
>
>
>


Re: Question about datasource replication

2018-05-04 Thread Flavio Pompermaier
Hi Fabian,
thanks for the detailed reply.
The problem I see is that the source dataset is huge and, since it doesn't
fit in memory, it's spilled twice to disk (I checked the increasing disk
usage during the job and it was corresponding exactly to the size estimated
by the Flink UI, that is twice it's initial size).
Probably there are no problem until you keep data in memory but in my case
it's very problematic this memory explosion :(

On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske  wrote:

> Hi Flavio,
>
> No, there's no way around it.
> DataSets that are processed by more than one operator cannot be processed
> by chained operators.
> The records need to be copied to avoid concurrent modifications. However,
> the data should not be shipped over the network if all operators have the
> same parallelism.
> Instead records are serialized and handed over via local byte[] in-memory
> channels.
>
> Best, Fabian
>
>
> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier :
>
>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>>
>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain  wrote:
>>
>>> Hi Flavio,
>>>
>>> Which version of Flink are you using?
>>>
>>> --
>>> Thanks,
>>> Amit
>>>
>>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier 
>>> wrote:
>>> > Hi all,
>>> > I've a Flink batch job that reads a parquet dataset and then applies 2
>>> > flatMap to it (see pseudocode below).
>>> > The problem is that this dataset is quite big and Flink duplicates it
>>> before
>>> > sending the data to these 2 operators (I've guessed this from the
>>> doubling
>>> > amount of sent bytes) .
>>> > Is there a way to avoid this behaviour?
>>> >
>>> > ---
>>> > Here's the pseudo code of my job:
>>> >
>>> > DataSet X = readParquetDir();
>>> > X1 = X.flatMap(...);
>>> > X2 = X.flatMap(...);
>>> >
>>> > Best,
>>> > Flavio
>>>
>>
>>
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


Re: Question about datasource replication

2018-05-04 Thread Fabian Hueske
Hi Flavio,

No, there's no way around it.
DataSets that are processed by more than one operator cannot be processed
by chained operators.
The records need to be copied to avoid concurrent modifications. However,
the data should not be shipped over the network if all operators have the
same parallelism.
Instead records are serialized and handed over via local byte[] in-memory
channels.

Best, Fabian


2018-05-04 14:55 GMT+02:00 Flavio Pompermaier :

> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>
> On Fri, May 4, 2018 at 2:50 PM, Amit Jain  wrote:
>
>> Hi Flavio,
>>
>> Which version of Flink are you using?
>>
>> --
>> Thanks,
>> Amit
>>
>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier 
>> wrote:
>> > Hi all,
>> > I've a Flink batch job that reads a parquet dataset and then applies 2
>> > flatMap to it (see pseudocode below).
>> > The problem is that this dataset is quite big and Flink duplicates it
>> before
>> > sending the data to these 2 operators (I've guessed this from the
>> doubling
>> > amount of sent bytes) .
>> > Is there a way to avoid this behaviour?
>> >
>> > ---
>> > Here's the pseudo code of my job:
>> >
>> > DataSet X = readParquetDir();
>> > X1 = X.flatMap(...);
>> > X2 = X.flatMap(...);
>> >
>> > Best,
>> > Flavio
>>
>
>


Re: Stashing key with AggregateFunction

2018-05-04 Thread Fabian Hueske
Hi Ken,

You can also use an additional ProcessWindowFunction [1] that is applied on
the result of the AggregateFunction to set the key.
Since the PWF is only applied on the final result, there no overhead
(actually, it might even be slightly cheaper because the AggregateFunction
can be simpler).

If you don't want to use a PWF, your approach is the right one.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation

2018-05-03 19:53 GMT+02:00 Ken Krugler :

> Hi list,
>
> I was trying different ways to implement a moving average (count based,
> not time based).
>
> The blunt instrument approach is to create a custom FlatMapFunction that
> keeps track of the last N values.
>
> It seemed like using an AggregateFunction would be most consistent with
> the Flink API, along the lines of...
>
> .keyBy(new MyKeySelector())
> .window(GlobalWindows.create())
> .trigger(CountTrigger.of(1))
> .aggregate(new MovingAverageAggregator(10))
>
> This works, but the API for the AggregateFunction
> (MovingAverageAggregator) feels a bit odd.
>
> Specifically, I want to emit a  result from
> getResult(), but the key isn’t passed to the createAccumulator() method,
> nor is it passed to the getResult() method. So in the add() method I check
> if the accumulator I’ve created has a key set, and if not then I extract
> the key from the record and set it on the accumulator, so I can use it in
> the getResult() call.
>
> Is this expected, or am I miss-using the functionality?
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: Stashing key with AggregateFunction

2018-05-04 Thread Stefan Richter
Hi,

I have two possible options to achieve this. The first option is that you could 
obviously always derive the key again from the input of the aggregate function. 
The second option is combining an AggregateFunction with a 
ProcessWindowFunction. With the second solution you get incremental aggregation 
and the ProcessWindowFunction is only called once in the end with the result.

Best,
Stefan

> Am 03.05.2018 um 19:53 schrieb Ken Krugler :
> 
> Hi list,
> 
> I was trying different ways to implement a moving average (count based, not 
> time based).
> 
> The blunt instrument approach is to create a custom FlatMapFunction that 
> keeps track of the last N values.
> 
> It seemed like using an AggregateFunction would be most consistent with the 
> Flink API, along the lines of...
> 
> .keyBy(new MyKeySelector())
> .window(GlobalWindows.create())
> .trigger(CountTrigger.of(1))
> .aggregate(new MovingAverageAggregator(10))
> 
> This works, but the API for the AggregateFunction (MovingAverageAggregator) 
> feels a bit odd.
> 
> Specifically, I want to emit a  result from getResult(), 
> but the key isn’t passed to the createAccumulator() method, nor is it passed 
> to the getResult() method. So in the add() method I check if the accumulator 
> I’ve created has a key set, and if not then I extract the key from the record 
> and set it on the accumulator, so I can use it in the getResult() call.
> 
> Is this expected, or am I miss-using the functionality?
> 
> Thanks,
> 
> — Ken
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com 
> custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 



Re: This server is not the leader for that topic-partition

2018-05-04 Thread Stefan Richter
Hi,

I think in general this means that your producer client does not connect to the 
correct Broker (the leader) but to a broker that is just a follower and the 
follower can not execute that request. However, I am not sure what causes this 
in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) has an idea?

Best,
Stefan

> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov 
> :
> 
> Hi,
> 
> what could cause the following exception?
> 
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to 
> send data to Kafka: This server is not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
> 
> 
> Thank you,
> Alex



This server is not the leader for that topic-partition

2018-05-04 Thread Alexander Smirnov
Hi,

what could cause the following exception?

org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed
to send data to Kafka: This server is not the leader for that
topic-partition.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)


Thank you,
Alex


Re: PartitionNotFoundException after deployment

2018-05-04 Thread Gyula Fóra
Looks pretty clear that one operator takes too long to start (even on the
UI it shows it in the created state for far too long). Any idea what might
cause this delay? It actually often crashes on Akka ask timeout during
scheduling the node.

Gyula

Piotr Nowojski  ezt írta (időpont: 2018. máj. 4.,
P, 15:33):

> Ufuk: I don’t know why.
>
> +1 for your other suggestions.
>
> Piotrek
>
> > On 4 May 2018, at 14:52, Ufuk Celebi  wrote:
> >
> > Hey Gyula!
> >
> > I'm including Piotr and Nico (cc'd) who have worked on the network
> > stack in the last releases.
> >
> > Registering the network structures including the intermediate results
> > actually happens **before** any state is restored. I'm not sure why
> > this reproducibly happens when you restore state. @Nico, Piotr: any
> > ideas here?
> >
> > In general I think what happens here is the following:
> > - a task requests the result of a local upstream producer, but that
> > one has not registered its intermediate result yet
> > - this should result in a retry of the request with some backoff
> > (controlled via the config params you mention
> > taskmanager.network.request-backoff.max,
> > taskmanager.network.request-backoff.initial)
> >
> > As a first step I would set logging to DEBUG and check the TM logs for
> > messages like "Retriggering partition request {}:{}."
> >
> > You can also check the SingleInputGate code which has the logic for
> > retriggering requests.
> >
> > – Ufuk
> >
> >
> > On Fri, May 4, 2018 at 10:27 AM, Gyula Fóra 
> wrote:
> >> Hi Ufuk,
> >>
> >> Do you have any quick idea what could cause this problems in flink
> 1.4.2?
> >> Seems like one operator takes too long to deploy and downstream tasks
> error
> >> out on partition not found. This only seems to happen when the job is
> >> restored from state and in fact that operator has some keyed and
> operator
> >> state as well.
> >>
> >> Deploying the same job from empty state works well. We tried increasing
> the
> >> taskmanager.network.request-backoff.max that didnt help.
> >>
> >> It would be great if you have some pointers where to look further, I
> havent
> >> seen this happening before.
> >>
> >> Thank you!
> >> Gyula
> >>
> >> The errror:
> >> org.apache.flink.runtime.io.network.partition.: Partition
> >> 4c5e9cd5dd410331103f51127996068a@b35ef4ffe25e3d17c5d6051ebe2860cd not
> found.
> >>at
> >> org.apache.flink.runtime.io
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
> >>at
> >> org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:115)
> >>at
> >> org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:159)
> >>at java.util.TimerThread.mainLoop(Timer.java:555)
> >>at java.util.TimerThread.run(Timer.java:505)
> >
> >
> >
> > --
> > Data Artisans GmbH | Stresemannstr. 121a | 10963 Berlin
> >
> > i...@data-artisans.com
> > +49-30-43208879 <+49%2030%2043208879>
> >
> > Registered at Amtsgericht Charlottenburg - HRB 158244 B
> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>


Re: PartitionNotFoundException after deployment

2018-05-04 Thread Piotr Nowojski
Ufuk: I don’t know why.

+1 for your other suggestions.

Piotrek

> On 4 May 2018, at 14:52, Ufuk Celebi  wrote:
> 
> Hey Gyula!
> 
> I'm including Piotr and Nico (cc'd) who have worked on the network
> stack in the last releases.
> 
> Registering the network structures including the intermediate results
> actually happens **before** any state is restored. I'm not sure why
> this reproducibly happens when you restore state. @Nico, Piotr: any
> ideas here?
> 
> In general I think what happens here is the following:
> - a task requests the result of a local upstream producer, but that
> one has not registered its intermediate result yet
> - this should result in a retry of the request with some backoff
> (controlled via the config params you mention
> taskmanager.network.request-backoff.max,
> taskmanager.network.request-backoff.initial)
> 
> As a first step I would set logging to DEBUG and check the TM logs for
> messages like "Retriggering partition request {}:{}."
> 
> You can also check the SingleInputGate code which has the logic for
> retriggering requests.
> 
> – Ufuk
> 
> 
> On Fri, May 4, 2018 at 10:27 AM, Gyula Fóra  wrote:
>> Hi Ufuk,
>> 
>> Do you have any quick idea what could cause this problems in flink 1.4.2?
>> Seems like one operator takes too long to deploy and downstream tasks error
>> out on partition not found. This only seems to happen when the job is
>> restored from state and in fact that operator has some keyed and operator
>> state as well.
>> 
>> Deploying the same job from empty state works well. We tried increasing the
>> taskmanager.network.request-backoff.max that didnt help.
>> 
>> It would be great if you have some pointers where to look further, I havent
>> seen this happening before.
>> 
>> Thank you!
>> Gyula
>> 
>> The errror:
>> org.apache.flink.runtime.io.network.partition.: Partition
>> 4c5e9cd5dd410331103f51127996068a@b35ef4ffe25e3d17c5d6051ebe2860cd not found.
>>at
>> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
>>at
>> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:115)
>>at
>> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:159)
>>at java.util.TimerThread.mainLoop(Timer.java:555)
>>at java.util.TimerThread.run(Timer.java:505)
> 
> 
> 
> -- 
> Data Artisans GmbH | Stresemannstr. 121a | 10963 Berlin
> 
> i...@data-artisans.com
> +49-30-43208879
> 
> Registered at Amtsgericht Charlottenburg - HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



Re: Question about datasource replication

2018-05-04 Thread Flavio Pompermaier
Flink 1.3.1 (I'm waiting 1.5 before upgrading..)

On Fri, May 4, 2018 at 2:50 PM, Amit Jain  wrote:

> Hi Flavio,
>
> Which version of Flink are you using?
>
> --
> Thanks,
> Amit
>
> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier 
> wrote:
> > Hi all,
> > I've a Flink batch job that reads a parquet dataset and then applies 2
> > flatMap to it (see pseudocode below).
> > The problem is that this dataset is quite big and Flink duplicates it
> before
> > sending the data to these 2 operators (I've guessed this from the
> doubling
> > amount of sent bytes) .
> > Is there a way to avoid this behaviour?
> >
> > ---
> > Here's the pseudo code of my job:
> >
> > DataSet X = readParquetDir();
> > X1 = X.flatMap(...);
> > X2 = X.flatMap(...);
> >
> > Best,
> > Flavio
>


Re: PartitionNotFoundException after deployment

2018-05-04 Thread Ufuk Celebi
Hey Gyula!

I'm including Piotr and Nico (cc'd) who have worked on the network
stack in the last releases.

Registering the network structures including the intermediate results
actually happens **before** any state is restored. I'm not sure why
this reproducibly happens when you restore state. @Nico, Piotr: any
ideas here?

In general I think what happens here is the following:
- a task requests the result of a local upstream producer, but that
one has not registered its intermediate result yet
- this should result in a retry of the request with some backoff
(controlled via the config params you mention
taskmanager.network.request-backoff.max,
taskmanager.network.request-backoff.initial)

As a first step I would set logging to DEBUG and check the TM logs for
messages like "Retriggering partition request {}:{}."

You can also check the SingleInputGate code which has the logic for
retriggering requests.

– Ufuk


On Fri, May 4, 2018 at 10:27 AM, Gyula Fóra  wrote:
> Hi Ufuk,
>
> Do you have any quick idea what could cause this problems in flink 1.4.2?
> Seems like one operator takes too long to deploy and downstream tasks error
> out on partition not found. This only seems to happen when the job is
> restored from state and in fact that operator has some keyed and operator
> state as well.
>
> Deploying the same job from empty state works well. We tried increasing the
> taskmanager.network.request-backoff.max that didnt help.
>
> It would be great if you have some pointers where to look further, I havent
> seen this happening before.
>
> Thank you!
> Gyula
>
> The errror:
> org.apache.flink.runtime.io.network.partition.: Partition
> 4c5e9cd5dd410331103f51127996068a@b35ef4ffe25e3d17c5d6051ebe2860cd not found.
> at
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:115)
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:159)
> at java.util.TimerThread.mainLoop(Timer.java:555)
> at java.util.TimerThread.run(Timer.java:505)



-- 
Data Artisans GmbH | Stresemannstr. 121a | 10963 Berlin

i...@data-artisans.com
+49-30-43208879

Registered at Amtsgericht Charlottenburg - HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Question about datasource replication

2018-05-04 Thread Amit Jain
Hi Flavio,

Which version of Flink are you using?

--
Thanks,
Amit

On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier  wrote:
> Hi all,
> I've a Flink batch job that reads a parquet dataset and then applies 2
> flatMap to it (see pseudocode below).
> The problem is that this dataset is quite big and Flink duplicates it before
> sending the data to these 2 operators (I've guessed this from the doubling
> amount of sent bytes) .
> Is there a way to avoid this behaviour?
>
> ---
> Here's the pseudo code of my job:
>
> DataSet X = readParquetDir();
> X1 = X.flatMap(...);
> X2 = X.flatMap(...);
>
> Best,
> Flavio


Question about datasource replication

2018-05-04 Thread Flavio Pompermaier
Hi all,
I've a Flink batch job that reads a parquet dataset and then applies 2
flatMap to it (see pseudocode below).
The problem is that this dataset is quite big and Flink duplicates it before
sending the data to these 2 operators (I've guessed this from the doubling
amount of sent bytes) .
Is there a way to avoid this behaviour?

---
Here's the pseudo code of my job:

DataSet X = readParquetDir();
X1 = X.flatMap(...);
X2 = X.flatMap(...);

Best,
Flavio


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-05-04 Thread Tony Wei
Hi Stefan, Sihua,

We finally found out the root cause. Just as you said, why the performance
had been downgraded is due to EBS.
My team and I wasn't familiar with EBS before. We thought its performance
is not so weak as the monitor showed us.
But we visited this page [1]

and found that we had a big misunderstanding about EBS.

All in all, our checkpoint procedure hit the burst IO performance over the
maximum duration and made the IO performance downgraded.
We have replaced to local SSDs and enabled incremental checkpoint mechanism
as well. Our job has run healthily for more than two weeks.

Thank you all for helping me to investigate and solve this issue.

Best Regards,
Tony Wei

[1] EBS: I/O Credits and Burst Performance


2018-03-09 17:47 GMT+08:00 Tony Wei :

> Hi Stefan,
>
> We prepared to  run it on local SSDs yesterday. However, as I said, the
> problem just disappeared. Of course we will replace it to local SSDs, but
> I'm afraid that I might be able to reproduce the same situation for both
> environments to compare the difference.
>
> Best Regards,
> Tony Wei.
>
> 2018-03-09 16:59 GMT+08:00 Stefan Richter :
>
>> Hi,
>>
>> if processing and checkpointing are stuck in RocksDB, this could indeed
>> hint to a problem with your IO system. The comment about using EBS could be
>> important, as it might be a bad idea from a performance point of view to
>> run RocksDB on EBS; did you ever compare against running it on local SSDs?
>>
>> Best,
>> Stefan
>>
>> Am 09.03.2018 um 05:08 schrieb Tony Wei :
>>
>> Hi Stefan, Sihua,
>>
>> TLDR; after the experiment, I found that this problem might not about s3
>> filesystem or network io with s3. It might caused by rocksdb and io
>> performance, but I still can't recognize who caused this problem.
>>
>> For more specific details, I have to introduce my flink application's
>> detail and what I found in the experiment. The disks I used for EC2 are
>> SSD, but they are EBS.
>>
>> For the application detail, there is only one keyed ProcessFunction with
>> ValueState with scala collection data type, which represents the counting
>> by event and date
>> This operator with receive two type of message: one is event message and
>> the other is overwrite state message
>> When operator received an event message, it would update the
>> corresponding value by event and client time and emit the event to the next
>> operator with the "whole" collection, that's why I used ValueState not
>> MapState or ListState.
>> When operator received a overwrite state message, it would overwrite the
>> corresponding value in the state. This is the design that we want to replay
>> the state constructed by the new rules.
>> Moreover, my key is something like user cookie, and I have a timer
>> callback to remove those out-dated state, for example: I'm only care about
>> the state in 6 months.
>>
>> For the experiment, I tried to catch the first failure to find out some
>> cues, so I extended the checkpoint interval to a long time and use
>> savepoint. I know savepoint is not actually same as checkpoint, but I guess
>> the parts of store state and upload to remote filesystem are similar.
>> After some savepoints triggered, I found that asynchronous part was stuck
>> in full snapshot operation and time triggers in that machine were also
>> stock and blocked the operator to process element.
>> I recalled that I had replayed lots of states for 60 days in 4 ~ 5 hours,
>> and the first problem happened during the replay procedure. It is just a
>> coincidence that the callback from those keys that I replayed happened when
>> I run the experiment.
>> However, when I tried to disable all checkpoints and savepoints to
>> observed if the massive keys to access rocksdb get stuck, I found the
>> problem was disappeared. Moreover, I roll back to the original setting that
>> checkpoint got stuck. The problem didn't happened again yet.
>>
>> In summary, I sill can't tell where the problem happened, since the io
>> performance didn't touch the limitation and the problem couldn't reproduce
>> based on the same initial states.
>> I decide to try out incremental checkpoint to reduce this risk. I will
>> reopen this thread  with more information I can provide when the problem
>> happen again. If you have any suggestion about my implementation that might
>> leads to some problems or about this issue, please advice me.
>> Thank you ver much for taking your time to pay attention on this issue!!
>> = )
>>
>> p.s. the attachment is about the experiment I mentioned above. I didn't
>> record the stack trace because the only difference is only Time Trigger's
>> state were runnable and the operator were blocked.
>>
>> Best Regards,
>> Tony Wei
>>
>>
>> 2018-03-06 17:00 

Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

2018-05-04 Thread Edward Rojas
Hello all,

We have a kafka consumer listening to a topic pattern "topic-*" with a
partition discovery interval.
We eventually add new topics and this is working perfectly, the consumer
discover the new topics (and partitions) and listen to them.

But we also remove topics eventually and in this case the consumer is not
updated. The consumer continue listen to the removed partitions *forever*
and we get logs like:


2018-05-04 11:32:11,462 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1154 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:11,965 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1156 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:12,468 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1158 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:12,970 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1160 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:13,473 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1162 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
...

This requests continue *forever* and the logs are shown several times per
second hiding other possible problems and it's using resources that could be
freed for other processing.

I think the partition discovery mechanism should be modified to take into
account not only new partitions but also removing no longer available
partitions.

What do you think ?

Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: intentional back-pressure (or a poor man's side-input)

2018-05-04 Thread Piotr Nowojski
> running a batch or "bounded stream" job first to generate a "cache state", 
> and then launching the main streaming job, which loads this initial state 
> load in open()... not sure how to work out the keying.
> 

This is the recommended workaround this issue - first start a job to precompute 
some values for an initial state and then pass those values to the main job as 
(for example) a startup argument. I think for now it’s the cleanest and the 
easiest to maintain solution. If initial state is too large, you could imagine 
saving it on a DFS and loading it in initialise phase of the main job.

Piotrek

> On 3 May 2018, at 19:03, Derek VerLee  wrote:
> 
> Thanks for the thoughts Piotr.
> 
> Seems I have a talent for asking (nearly) the same question as someone else 
> at the same time, and the check-pointing was raised in that thread as well.
> 
> I guess one way to conceptualize it is that you have is a stream job that has 
> "phases" and transitions between those phases.  Maybe there would be a new 
> type of barrier to indicate a change between phases?  But now I'm way outside 
> the bounds of hoping to have a "quick and dirty" version of a proper side 
> input implementation.
> 
> I'm chewing on two new ideas now:  Using a "union" stream instead of two 
> streams, and custom source backed by two different sources under the hood, so 
> the "state machine" logic transitioning from initialization to normal 
> operation all happen in the same operator.  Or, running a batch or "bounded 
> stream" job first to generate a "cache state", and then launching the main 
> streaming job, which loads this initial state load in open()... not sure how 
> to work out the keying.
> 
> I'll post back if I get anywhere with these ideas.
> 
> On 5/3/18 10:49 AM, Piotr Nowojski wrote:
>> Maybe it could work with Flink’s 1.5 credit base flow control. But you would 
>> need a way to express state “block one input side of the CoProcessFunction”, 
>> pass this information up to the input gate and handle it probably similar to 
>> how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks 
>> inputs in case of checkpoint barrier. You can not just block inside 
>> `processElement1` method.
>> 
>> However I haven’t thought it through and maybe there could be some issues 
>> regarding checkpointing (what should happen to checkpoint barriers if you 
>> are blocking one side of the input? Should this block checkpoint barrier as 
>> well? Should you cancel checkpoint?).
>> 
>> Piotrek
>> 
>>> On 2 May 2018, at 16:31, Derek VerLee  
>>>  wrote:
>>> 
>>> 
>>> I was just thinking about about letting a coprocessfunction "block" or 
>>> cause back pressure on one of it's streams?
>>> Has this been discussed as an option?
>>> Does anyone know a way to effectively accomplish this?
>>> 
>>> I think I could get a lot of mileage out of something like that without 
>>> needing a full implementation of FLIP-17 (which I would eagerly await 
>>> still). 
>>> 
>>> As mentioned on another thread, one could use a liststate to buffer the 
>>> main input until the "side input" was sufficiently processed.  However the 
>>> downside of this is that I have no way to control the size of those 
>>> buffers, whereas with backpressure, the system will naturally take care of 
>>> it.
> 



Re: Flink + Marathon (Mesos) Memory Issues

2018-05-04 Thread Stefan Richter
Hi,

besides your configured heap size, there is also some off-heap memory used in 
the JVM process, in particular by RocksDB. Each keyed operator instance on a TM 
has its own RocksDB instance, so the question is how many are running in one 
container and what is their configuration? For RocksDB for example 
write_buffer_size (32MB default), write_buffer_count (3 by default) and 
block_cache_size (16 MB default)  contribute per instance. For more details, 
please have a look here: 
https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB 
. You might 
need adjust your RocksDB configuration and/or plan your container memory limits 
accordingly to be on the safe side.

Best,
Stefan

> Am 03.05.2018 um 21:59 schrieb ani.desh1512 :
> 
> *Background*: We have a setup of Flink 1.4.0. We run this flink 
> cluster via /flink-jobmanager.sh foreground/ and /flink-taskmanager.sh 
> foreground/ command via Marathon (which launches them as mesos jobs). So,
> basically, jobmanager and taskmanagers run as mesos tasks.
> 
> 
> Now, say, we run the flink taskmanagers with taskmanager.heap.mb set to 7G
> in flink-conf.yaml and Marathon memory is set to 18G. Even after this, we
> frequently see the taskmanager containers getting killed because of OOM. The
> flink streaming job that we run is a basic job without any windowing or
> other stateful operations. Its just a job that reads from a stream, applies
> a bunch of transformations and writes it back via BucketingSink. It uses
> RocksDB as state backend. 
> 
> So what i am trying to understand is, how is Flink allocating taskmanager
> memory in containers? What would be a safe value for us to set as Marathon
> memory so that our taskmanagers dont keep getting killed? Are we seeing this
> behaviour because of starting flink taskmanagers in foreground mode as mesos
> task? 
> 
> Thanks
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



RE: use of values of previously accepted event

2018-05-04 Thread Esa Heikkinen
That would be enough, but I would appreciate the full source (Scala) code 
examples of using IterativeConditions.

How to find correct imports for getEventsForPattern ?

Best, Esa

From: Dawid Wysakowicz 
Sent: Thursday, May 3, 2018 2:53 PM
To: Esa Heikkinen 
Subject: Re: use of values of previously accepted event

Unfortunately you cannot set any parameters in the pattern that will be 
accessible in further patterns in any way. With IterativeConditions you can 
only access accepted events so far.

czw., 3 maj 2018 o 12:38 Esa Heikkinen 
> napisał(a):
Hi

Thanks for the reply.

I have tried to understand IterativeCondition, but I have not yet fully 
understood. How can it apply to my case ?

If I have more (than one) variables to set (and read) in pattern, is that 
possible by IterativeCondition ?

Are there exist more examples how to use it ? or documentation ?

Or should I use some other method than CEP, because my case is more batch 
processing than stream processing ?

Best, Esa

From: Dawid Wysakowicz 
>
Sent: Thursday, May 3, 2018 12:54 PM
To: Esa Heikkinen 
>
Subject: Re: use of values of previously accepted event

Hi Esa,
You cannot just simply assign value to some variable and read it in another 
pattern.
It is possible though to access event accepted in one pattern in next ones of 
the sequence via IterativeCondition[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html#conditions-on-properties

pon., 30 kwi 2018 o 12:48 Esa Heikkinen 
> napisał(a):
Hi

I am still interested in getting an answer, if anyone can help ?

If I have a pattern sequence like this:

val eventPattern = Pattern
   .begin[TestData](“start”).where( // Sets variable X here //)
   .followedBy(“end”).where( // Reads value of variable X here //)

How to set variable X in “start” and how to read the value of the variable X in 
“end” ?

Should or can I use global variables ?

Should variables be declared in TestData ? And how ?

Best, Esa

From: Esa Heikkinen
Sent: Thursday, April 26, 2018 3:18 PM
To: user@flink.apache.org
Subject: RE: use of values of previously accepted event

Hi

Or is it possible to use global or local variables inside in pattern sequence ?
And how (by Scala) ?

Best, Esa

From: Esa Heikkinen 
>
Sent: Wednesday, April 25, 2018 4:16 PM
To: user@flink.apache.org
Subject: CEP: use of values of previously accepted event

Hi

I have tried to read [1] and understand how to get values of previously 
accepted event to use in current event (or pattern).

Iterative conditions (with context.getEventsForPatterns) do something like 
that, but it gets all previously accepter events..
How to get only last one (by Scala) ? Are there exist examples about that ?

For example, if i have consecutive patterns:

1.   Search event A and read its value: B (that can also be different, like 
C)

2.   Search the next event based on value of A, that is B, so event B is 
searched

This is little bit like searching links of list. It is also possible that an 
event can include many next events, so that means it is like tree- or DAG- 
pattern structure.

The pattern structure of list, tree or DAG is not known before processing, but 
during (and after) the processing.. Is that problem ?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html

Best, Esa


PartitionNotFoundException after deployment

2018-05-04 Thread Gyula Fóra
Hi Ufuk,

Do you have any quick idea what could cause this problems in flink 1.4.2?
Seems like one operator takes too long to deploy and downstream tasks error
out on partition not found. This only seems to happen when the job is
restored from state and in fact that operator has some keyed and operator
state as well.

Deploying the same job from empty state works well. We tried increasing
the taskmanager.network.request-backoff.max that didnt help.

It would be great if you have some pointers where to look further, I havent
seen this happening before.

Thank you!
Gyula

The errror:
org.apache.flink.runtime.io.network.partition.: Partition
4c5e9cd5dd410331103f51127996068a@b35ef4ffe25e3d17c5d6051ebe2860cd not found.
at
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:115)
at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:159)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)