Re: Flink/Kafka POC performance issue

2018-04-16 Thread Niclas Hedhman
Have you checked memory usage? It could be as simple as either having
memory leaks, or aggregating more than you think (sometimes not obvious how
much is kept around in memory for longer than one first thinks). If
possible, connect FlightRecorder or similar tool and keep an eye on memory.
Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM
to disk like regular Linux, then that might be triggered if your JVM heap
is bigger than can be handled within the available RAM.

On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage  wrote:

> I am doing a short Proof of Concept for using Flink and Kafka in our
> product.  On my laptop I can process 10M inputs in about 90 min.  On 2
> different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and
> ssd storage) I see the process hit a wall around 50min into the test and
> short of 7M events processed.  This is running zookeeper, kafka broker,
> flink all on the same server in all cases.  My goal is to measure single
> node vs. multi-node and test horizontal scalability, but I would like to
> figure out why hit hits a wall first.  I have the task maanger configured
> with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and
> the EC2 instances have 4 threads. On smaller data sets and in the begining
> of each test the EC2 instances outpace the laptop.  I will try again with
> an m5.2xlarge which has 8 threads and 32GB ram to see if that works better
> for this workload.  Any pointers or ways to get metrics that would help
> diagnose this would be appreciated.
>
> Michael
>
>


-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Flink & Kafka multi-node config

2018-04-16 Thread TechnoMage
If I use defaults for the most part but configure flink to have parallelism 5 
and kafka to have 5 brokers (one of each on 5 nodes) will the connector and 
kafka be smart enough to use the kafka partition on the same node as the flink 
task manager for the 5 partitions?  Do I need to explicitly assign partitions 
to get them local to each other?

Michael



Flink/Kafka POC performance issue

2018-04-16 Thread TechnoMage
I am doing a short Proof of Concept for using Flink and Kafka in our product.  
On my laptop I can process 10M inputs in about 90 min.  On 2 different EC2 
instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see 
the process hit a wall around 50min into the test and short of 7M events 
processed.  This is running zookeeper, kafka broker, flink all on the same 
server in all cases.  My goal is to measure single node vs. multi-node and test 
horizontal scalability, but I would like to figure out why hit hits a wall 
first.  I have the task maanger configured with 6 slots and the job has 5 
parallelism.  The laptop has 8 threads, and the EC2 instances have 4 threads. 
On smaller data sets and in the begining of each test the EC2 instances outpace 
the laptop.  I will try again with an m5.2xlarge which has 8 threads and 32GB 
ram to see if that works better for this workload.  Any pointers or ways to get 
metrics that would help diagnose this would be appreciated.

Michael



Re: Scaling down Graphite metrics

2018-04-16 Thread ashish pok
Thanks for that tip about override, will give that a shot at some point. We are 
already using interval.

-- Ashish 
 
  On Sun, Apr 15, 2018 at 6:18 PM, Chesnay Schepler wrote:  
  Hello,
 
 you can configure the rate at which metrics are reported by setting 
"metrics.reporter..interval" as described in the reporter documentation.
 
 At this time there is no way to disable specific metrics.
 You can however extend the reporter that you are using and override 
"MetricReporter#notifyOfAddedMetric(Metric, String, MetricGroup)" to ignore 
metrics that you aren't interested in.
 
 On 13.04.2018 18:52, ashish pok wrote:
  
All, 
  We love Flinks OOTB metrics but boy there is a ton :) Any way to scale them 
down (frequency and metric itself)? 
  Flink apps are becoming huge source of data right now. 
  Thanks,
 
 -- Ashish  
 

 
   


Re: Question about parallelism

2018-04-16 Thread TechnoMage
The client was not using a config file, it is a stand-alone java app using the 
flink-client jar file.  Thanks for the clarification.

Michael

> On Apr 16, 2018, at 2:11 PM, Fabian Hueske  wrote:
> 
> The parallelism.default property that is configured in the flink-conf.yaml 
> file is only considered if the config file belongs to the submitting client.
> If you configured the property in the config file of your cluster setup and 
> used submitted from a client that used a different configuration file, the 
> property of the other config file is used.
> 
> I tested the behavior on Flink 1.4.2 and setting the parallelism in the 
> flink-conf.yaml of the client was working correctly in a simple local setup.
> 
> If this doesn't solve your problem, we'd need a bit more information about 
> the job submission and setup.
> 
> Best, Fabian
> 
> 
> 2018-04-16 18:37 GMT+02:00 TechnoMage  >:
> 1.4.2.  I have since set the parallelism explicitly after creating the env 
> and that is working.  I also made the stream object serializable which may 
> also be involved in this.  I will retest without the explicit parallelism 
> when I get a chance.
> 
> Michael
> 
> 
>> On Apr 16, 2018, at 2:05 AM, Fabian Hueske > > wrote:
>> 
>> (re-adding user mailing list)
>> 
>> A non-serializable function object should cause the job to fail, but not to 
>> ignore a parallelism setting.
>> 
>> This might be a bug. Most users specify the parallelism directly in the 
>> application code (via StreamExecutionEnvironment) or when submitting the 
>> application.
>> Which version are you using?
>> 
>> Best, Fabian
>> 
>> 2018-04-14 15:07 GMT+02:00 Michael Latta > >:
>> Parallelism in config. I think the issue is that some objects used oin the 
>> stream are not serializable (which I just discovered). I am surprised it 
>> supports that. 
>> 
>> 
>> Michael
>> 
>> On Apr 14, 2018, at 6:12 AM, Fabian Hueske > > wrote:
>> 
>>> Hi,
>>> 
>>> The number of Taskmanagers is irrelevant für the parallelism of a job or 
>>> operator. The scheduler only cares about the number of slots. 
>>> 
>>> How did you set the default parallelism? In the config or in the program / 
>>> StreamExecutionEnvironment? 
>>> 
>>> Best, Fabian
>>> 
>>> 
>>> TechnoMage > schrieb 
>>> am Fr., 13. Apr. 2018, 04:30:
>>> I am pretty new to flink.  I have a flink job that has 10 transforms 
>>> (mostly CoFlatMap with some simple filters and key extractrs as well.  I 
>>> have the config set for 6 slots and default parallelism of 6, but all my 
>>> stages show paralellism of 1.  Is that because there is only one task 
>>> manager?  Some of what I have read suggested separate slots were needed to 
>>> use multiple threads on a single box?  I have read the section on the docs 
>>> several times and still not totally sure about the execution model.
>>> 
>>> Michael
>> 
> 
> 



FlinkML

2018-04-16 Thread Szymon Szczypiński

Hi,

i wonder if there are possibility to build FlinkML streaming job not a 
batch job. Examples on 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/ml/ 
are only batch examples.


Is there any possibility?


Best regards.



Re: Question about parallelism

2018-04-16 Thread Fabian Hueske
The parallelism.default property that is configured in the flink-conf.yaml
file is only considered if the config file belongs to the submitting client.
If you configured the property in the config file of your cluster setup and
used submitted from a client that used a different configuration file, the
property of the other config file is used.

I tested the behavior on Flink 1.4.2 and setting the parallelism in the
flink-conf.yaml of the client was working correctly in a simple local setup.

If this doesn't solve your problem, we'd need a bit more information about
the job submission and setup.

Best, Fabian


2018-04-16 18:37 GMT+02:00 TechnoMage :

> 1.4.2.  I have since set the parallelism explicitly after creating the env
> and that is working.  I also made the stream object serializable which may
> also be involved in this.  I will retest without the explicit parallelism
> when I get a chance.
>
> Michael
>
>
> On Apr 16, 2018, at 2:05 AM, Fabian Hueske  wrote:
>
> (re-adding user mailing list)
>
> A non-serializable function object should cause the job to fail, but not
> to ignore a parallelism setting.
>
> This might be a bug. Most users specify the parallelism directly in the
> application code (via StreamExecutionEnvironment) or when submitting the
> application.
> Which version are you using?
>
> Best, Fabian
>
> 2018-04-14 15:07 GMT+02:00 Michael Latta :
>
>> Parallelism in config. I think the issue is that some objects used oin
>> the stream are not serializable (which I just discovered). I am surprised
>> it supports that.
>>
>>
>> Michael
>>
>> On Apr 14, 2018, at 6:12 AM, Fabian Hueske  wrote:
>>
>> Hi,
>>
>> The number of Taskmanagers is irrelevant für the parallelism of a job or
>> operator. The scheduler only cares about the number of slots.
>>
>> How did you set the default parallelism? In the config or in the program
>> / StreamExecutionEnvironment?
>>
>> Best, Fabian
>>
>>
>> TechnoMage  schrieb am Fr., 13. Apr. 2018, 04:30:
>>
>>> I am pretty new to flink.  I have a flink job that has 10 transforms
>>> (mostly CoFlatMap with some simple filters and key extractrs as well.  I
>>> have the config set for 6 slots and default parallelism of 6, but all my
>>> stages show paralellism of 1.  Is that because there is only one task
>>> manager?  Some of what I have read suggested separate slots were needed to
>>> use multiple threads on a single box?  I have read the section on the docs
>>> several times and still not totally sure about the execution model.
>>>
>>> Michael
>>
>>
>
>


Re: data enrichment with SQL use case

2018-04-16 Thread Ken Krugler
Hi Miki,

I haven’t tried mixing AsyncFunctions with SQL queries.

Normally I’d create a regular DataStream workflow that first reads from Kafka, 
then has an AsyncFunction to read from the SQL database.

If there are often duplicate keys in the Kafka-based stream, you could 
keyBy(key) before the AsyncFunction, and then cache the result of the SQL query.

— Ken

> On Apr 16, 2018, at 11:19 AM, miki haiat  wrote:
> 
> HI thanks  for the reply  i will try to break your reply to the flow 
> execution order .
> 
> First data stream Will use AsyncIO and select the table ,
> Second stream will be kafka and the i can join the stream and map it ?
> 
> If that   the case  then i will select the table only once on load ?
> How can i make sure that my stream table is "fresh" .
> 
> Im thinking to myself , is thire a way to use flink backend (ROKSDB)  and 
> create read/write through 
> macanisem ?
> 
> Thanks 
> 
> miki
> 
> 
> 
> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler  > wrote:
> If the SQL data is all (or mostly all) needed to join against the data from 
> Kafka, then I might try a regular join.
> 
> Otherwise it sounds like you want to use an AsyncFunction to do ad hoc 
> queries (in parallel) against your SQL DB.
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html
>  
> 
> 
> — Ken
> 
> 
>> On Apr 15, 2018, at 12:15 PM, miki haiat > > wrote:
>> 
>> Hi,
>> 
>> I have a case of meta data enrichment and im wondering if my approach is the 
>> correct way .
>> input stream from kafka. 
>> MD in msSQL .
>> map to new pojo 
>> I need to extract  a key from the kafka stream   and use it to select some 
>> values from the sql table  .
>> 
>> SO i thought  to use  the table SQL api in order to select the table MD 
>> then convert the kafka stream to table and join the data by  the stream key .
>> 
>> At the end i need to map the joined data to a new POJO and send it to 
>> elesticserch .
>> 
>> Any suggestions or different ways to solve this use case ?
>> 
>> thanks,
>> Miki  
>> 
>> 
>> 
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com 
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 
> 


http://about.me/kkrugler
+1 530-210-6378



Re: data enrichment with SQL use case

2018-04-16 Thread miki haiat
HI thanks  for the reply  i will try to break your reply to the flow
execution order .

First data stream Will use AsyncIO and select the table ,
Second stream will be kafka and the i can join the stream and map it ?

If that   the case  then i will select the table only once on load ?
How can i make sure that my stream table is "fresh" .

Im thinking to myself , is thire a way to use flink backend (ROKSDB)  and
create read/write through
macanisem ?

Thanks

miki



On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler 
wrote:

> If the SQL data is all (or mostly all) needed to join against the data
> from Kafka, then I might try a regular join.
>
> Otherwise it sounds like you want to use an AsyncFunction to do ad hoc
> queries (in parallel) against your SQL DB.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/
> operators/asyncio.html
>
> — Ken
>
>
> On Apr 15, 2018, at 12:15 PM, miki haiat  wrote:
>
> Hi,
>
> I have a case of meta data enrichment and im wondering if my approach is
> the correct way .
>
>1. input stream from kafka.
>2. MD in msSQL .
>3. map to new pojo
>
> I need to extract  a key from the kafka stream   and use it to select some
> values from the sql table  .
>
> SO i thought  to use  the table SQL api in order to select the table MD
> then convert the kafka stream to table and join the data by  the stream
> key .
>
> At the end i need to map the joined data to a new POJO and send it to
> elesticserch .
>
> Any suggestions or different ways to solve this use case ?
>
> thanks,
> Miki
>
>
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>


Re: Trying to understand KafkaConsumer_records_lag_max

2018-04-16 Thread Julio Biason
Hi Gordon (and list),

Yes, that's probably what's going on. I got another message from 徐骁 which
told me almost the same thing -- something I completely forgot (he also
mentioned auto.offset.reset, which could be forcing Flink to keep reading
from the top of Kafka instead of trying to go back and read older entries).

Now I need to figure out how to make my pipeline consume entries faster (or
at least on par) with the speed those are getting in Kafka -- but that's a
discussion for another email. ;)

On Mon, Apr 16, 2018 at 1:29 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Julio,
>
> I'm not really sure, but do you think it is possible that there could be
> some hard data retention setting for your Kafka topics in the staging
> environment?
> As in, at some point in time and maybe periodically, all data in the Kafka
> topics are dropped and therefore the consumers effectively jump directly
> back to the head again.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>



-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Re: Question about parallelism

2018-04-16 Thread TechnoMage
1.4.2.  I have since set the parallelism explicitly after creating the env and 
that is working.  I also made the stream object serializable which may also be 
involved in this.  I will retest without the explicit parallelism when I get a 
chance.

Michael

> On Apr 16, 2018, at 2:05 AM, Fabian Hueske  wrote:
> 
> (re-adding user mailing list)
> 
> A non-serializable function object should cause the job to fail, but not to 
> ignore a parallelism setting.
> 
> This might be a bug. Most users specify the parallelism directly in the 
> application code (via StreamExecutionEnvironment) or when submitting the 
> application.
> Which version are you using?
> 
> Best, Fabian
> 
> 2018-04-14 15:07 GMT+02:00 Michael Latta  >:
> Parallelism in config. I think the issue is that some objects used oin the 
> stream are not serializable (which I just discovered). I am surprised it 
> supports that. 
> 
> 
> Michael
> 
> On Apr 14, 2018, at 6:12 AM, Fabian Hueske  > wrote:
> 
>> Hi,
>> 
>> The number of Taskmanagers is irrelevant für the parallelism of a job or 
>> operator. The scheduler only cares about the number of slots. 
>> 
>> How did you set the default parallelism? In the config or in the program / 
>> StreamExecutionEnvironment? 
>> 
>> Best, Fabian
>> 
>> 
>> TechnoMage > schrieb am 
>> Fr., 13. Apr. 2018, 04:30:
>> I am pretty new to flink.  I have a flink job that has 10 transforms (mostly 
>> CoFlatMap with some simple filters and key extractrs as well.  I have the 
>> config set for 6 slots and default parallelism of 6, but all my stages show 
>> paralellism of 1.  Is that because there is only one task manager?  Some of 
>> what I have read suggested separate slots were needed to use multiple 
>> threads on a single box?  I have read the section on the docs several times 
>> and still not totally sure about the execution model.
>> 
>> Michael
> 



Re: Tiemrs and restore

2018-04-16 Thread Aljoscha Krettek
Gordon is correct: there was a bug on a very old version of Flink that caused 
processing-timers not to be invoked after restore but that was fixed.

Aljoscha

> On 16. Apr 2018, at 06:20, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Alberto,
> 
> Looking at the code, I think the current behavior is that all timers (both 
> processing time and event time) are re-registered on restore, and therefore 
> should be triggered automatically.
> So, for processing time timers, on restore all timers that were supposed to 
> be fired while the job was down should fire automatically; for event time 
> timers, they will be triggered once the watermark passes their timestamps.
> 
> Also looped in Aljoscha on this, in case I misunderstood anything.
> 
> Cheers,
> Gordon
> 
> On 16 April 2018 at 1:20:00 AM, Alberto Mancini (ab.manc...@gmail.com 
> ) wrote:
> 
>> Hello,
>> according to this stackoverflow response 
>> https://stackoverflow.com/questions/36306136/will-apache-flink-restore-trigger-timers-after-failure
>>  
>> 
>> IIUC we should expect that after a restore the timers will be not executed 
>> until a new timer is scheduled. 
>> I wonder if this is still true and if there is any chance of forcing the 
>> restart of the timer task.
>> 
>> Thank you. 
>> 
>> Regards,
>>Alberto. 



Re: Annotation in UDF dropped

2018-04-16 Thread Fabian Hueske
Hi Viktor,

Flink does not modify user code.
It distributes the job JAR file to the cluster and serializes the function
objects using Java serialization to ship them to the worker nodes where
they are deserialized.

What type of annotation gets dropped?
Can you show us a small example of the code?

Thank you,
Fabian

2018-04-16 17:32 GMT+02:00 Rosenfeld, Viktor 
:

> Hi,
>
> I have a UDF that uses an annotation to the loop variable inside a for
> loop. I noticed that this annotation gets dropped at some point when the
> UDF is shipped to the TaskManager. I was told that this happens in the
> Optimizer but I would like to know where this happens exactly and if there
> is an easy way to influence this behavior. The annotation is needed by
> another tool that interoperates with Flink.
>
> Any pointer to the source code would be appreciated.
>
> Best,
> Viktor
>


Annotation in UDF dropped

2018-04-16 Thread Rosenfeld, Viktor
Hi,

I have a UDF that uses an annotation to the loop variable inside a for loop. I 
noticed that this annotation gets dropped at some point when the UDF is shipped 
to the TaskManager. I was told that this happens in the Optimizer but I would 
like to know where this happens exactly and if there is an easy way to 
influence this behavior. The annotation is needed by another tool that 
interoperates with Flink.

Any pointer to the source code would be appreciated.

Best,
Viktor


signature.asc
Description: Message signed with OpenPGP


Re: Kafka consumer to sync topics by event time?

2018-04-16 Thread Fabian Hueske
Awesome!

I've given you contributor permissions and assigned FLINK-9183 to you. With
the permissions you can also do that yourself in the future.
Here's a guide for contributions to the documentation [1].

Best, Fabian

[1] http://flink.apache.org/contribute-documentation.html

2018-04-16 15:38 GMT+02:00 Juho Autio :

> Great. I'd be happy to contribute. I added 2 sub-tasks in
> https://issues.apache.org/jira/browse/FLINK-5479.
>
> Someone with the privileges could assign this sub-task to me:
> https://issues.apache.org/jira/browse/FLINK-9183?
>
> On Mon, Apr 16, 2018 at 3:14 PM, Fabian Hueske  wrote:
>
>> Fully agree Juho!
>>
>> Do you want to contribute the docs fix?
>> If yes, we should update FLINK-5479 to make sure that the warning is
>> removed once the bug is fixed.
>>
>> Thanks, Fabian
>>
>> 2018-04-12 9:32 GMT+02:00 Juho Autio :
>>
>>> Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
>>> entirely preventing this feature to be used if there are any idle
>>> partitions. It would be nice to mention in documentation that currently
>>> this requires all subscribed partitions to have a constant stream of data
>>> with growing timestamps. When watermark gets stalled on an idle partition
>>> it blocks everything.
>>>
>>> Link to current documentation:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/c
>>> onnectors/kafka.html#kafka-consumers-and-timestamp-extractio
>>> nwatermark-emission
>>>
>>> On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske  wrote:
>>>
 You are right, offsets cannot be used for tracking processing progress.
 I think setting Kafka offsets with respect to some progress notion other
 than "has been consumed" would be highly application specific and hard to
 generalize.
 As you said, there might be a window (such as a session window) that is
 open much longer than all other windows and which would hold back the
 offset. Other applications might not use the built-in windows at all but
 custom ProcessFunctions.

 Have you considered tracking progress using watermarks?

 2017-12-04 14:42 GMT+01:00 Juho Autio :

> Thank you Fabian. Really clear explanation. That matches with my
> observation indeed (data is not dropped from either small or big topic, 
> but
> the offsets are advancing in kafka side already before those offsets have
> been triggered from a window operator).
>
> This means that it's a bit harder to meaningfully monitor the job's
> progress solely based on kafka consumer offsets. Is there a reason why
> Flink couldn't instead commit the offsets after they have been triggered
> from downstream windows? I could imagine that this might pose a problem if
> there are any windows that remain open for a very long time, but in 
> general
> it would be useful IMHO. Or Flink could even commit both (read vs.
> triggered) offsets to kafka for monitoring purposes.
>
> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske 
> wrote:
>
>> Hi Juho,
>>
>> the partitions of both topics are independently consumed, i.e., at
>> their own speed without coordination. With the configuration that Gordon
>> linked, watermarks are generated per partition.
>> Each source task maintains the latest (and highest) watermark per
>> partition and propagates the smallest watermark. The same mechanism is
>> applied for watermarks across tasks (this is what Kien referred to).
>>
>> In the case that you are describing, the partitions of the smaller
>> topic are faster consumed (hence the offsets are faster aligned) but
>> watermarks are emitted "at the speed" of the bigger topic.
>> Therefore, the timestamps of records from the smaller topic can be
>> much ahead of the watermark.
>> In principle, that does not pose a problem. Stateful operators (such
>> as windows) remember the "early" records and process them when they 
>> receive
>> a watermark passes the timestamps of the early records.
>>
>> Regarding your question "Are they committed to Kafka before their
>> watermark has passed on Flink's side?":
>> The offsets of the smaller topic might be checkpointed when all
>> partitions have been read to the "end" and the bigger topic is still
>> catching up.
>> The watermarks are moving at the speed of the bigger topic, but all
>> "early" events of the smaller topic are stored in stateful operators and
>> are checkpointed as well.
>>
>> So, you do not lose neither early nor late data.
>>
>> Best, Fabian
>>
>>
>>
>> 2017-12-01 13:43 GMT+01:00 Juho Autio :
>>
>>> Thanks for the answers, I still don't understand why I can see the
>>> offsets being quickly committed to Kafka for the 

Re: Kafka consumer to sync topics by event time?

2018-04-16 Thread Juho Autio
Great. I'd be happy to contribute. I added 2 sub-tasks in
https://issues.apache.org/jira/browse/FLINK-5479.

Someone with the privileges could assign this sub-task to me:
https://issues.apache.org/jira/browse/FLINK-9183?

On Mon, Apr 16, 2018 at 3:14 PM, Fabian Hueske  wrote:

> Fully agree Juho!
>
> Do you want to contribute the docs fix?
> If yes, we should update FLINK-5479 to make sure that the warning is
> removed once the bug is fixed.
>
> Thanks, Fabian
>
> 2018-04-12 9:32 GMT+02:00 Juho Autio :
>
>> Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
>> entirely preventing this feature to be used if there are any idle
>> partitions. It would be nice to mention in documentation that currently
>> this requires all subscribed partitions to have a constant stream of data
>> with growing timestamps. When watermark gets stalled on an idle partition
>> it blocks everything.
>>
>> Link to current documentation:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/
>> connectors/kafka.html#kafka-consumers-and-timestamp-
>> extractionwatermark-emission
>>
>> On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske  wrote:
>>
>>> You are right, offsets cannot be used for tracking processing progress.
>>> I think setting Kafka offsets with respect to some progress notion other
>>> than "has been consumed" would be highly application specific and hard to
>>> generalize.
>>> As you said, there might be a window (such as a session window) that is
>>> open much longer than all other windows and which would hold back the
>>> offset. Other applications might not use the built-in windows at all but
>>> custom ProcessFunctions.
>>>
>>> Have you considered tracking progress using watermarks?
>>>
>>> 2017-12-04 14:42 GMT+01:00 Juho Autio :
>>>
 Thank you Fabian. Really clear explanation. That matches with my
 observation indeed (data is not dropped from either small or big topic, but
 the offsets are advancing in kafka side already before those offsets have
 been triggered from a window operator).

 This means that it's a bit harder to meaningfully monitor the job's
 progress solely based on kafka consumer offsets. Is there a reason why
 Flink couldn't instead commit the offsets after they have been triggered
 from downstream windows? I could imagine that this might pose a problem if
 there are any windows that remain open for a very long time, but in general
 it would be useful IMHO. Or Flink could even commit both (read vs.
 triggered) offsets to kafka for monitoring purposes.

 On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske 
 wrote:

> Hi Juho,
>
> the partitions of both topics are independently consumed, i.e., at
> their own speed without coordination. With the configuration that Gordon
> linked, watermarks are generated per partition.
> Each source task maintains the latest (and highest) watermark per
> partition and propagates the smallest watermark. The same mechanism is
> applied for watermarks across tasks (this is what Kien referred to).
>
> In the case that you are describing, the partitions of the smaller
> topic are faster consumed (hence the offsets are faster aligned) but
> watermarks are emitted "at the speed" of the bigger topic.
> Therefore, the timestamps of records from the smaller topic can be
> much ahead of the watermark.
> In principle, that does not pose a problem. Stateful operators (such
> as windows) remember the "early" records and process them when they 
> receive
> a watermark passes the timestamps of the early records.
>
> Regarding your question "Are they committed to Kafka before their
> watermark has passed on Flink's side?":
> The offsets of the smaller topic might be checkpointed when all
> partitions have been read to the "end" and the bigger topic is still
> catching up.
> The watermarks are moving at the speed of the bigger topic, but all
> "early" events of the smaller topic are stored in stateful operators and
> are checkpointed as well.
>
> So, you do not lose neither early nor late data.
>
> Best, Fabian
>
>
>
> 2017-12-01 13:43 GMT+01:00 Juho Autio :
>
>> Thanks for the answers, I still don't understand why I can see the
>> offsets being quickly committed to Kafka for the "small topic"? Are they
>> committed to Kafka before their watermark has passed on Flink's side? 
>> That
>> would be quite confusing.. Indeed when Flink handles the state/offsets
>> internally, the consumer offsets are committed to Kafka just for 
>> reference.
>>
>> Otherwise, what you're saying sounds very good to me. The
>> documentation just doesn't explicitly say anything about how it works
>> across topics.
>>

Detecting Patterns in CEP

2018-04-16 Thread Main Frame
Hi flink users!

Are there any ways to force using custom comparator for pattern stream.
In my use-case I have no event time but I have sequence number field(not a 
timestamp).
Events can comes to platform with random short delay and can be of the 
sequence, I hope I can use constant watermark and use custom comparator to 
check order of messages using sequence field. Is this possible in flink?

———

Timofeev Dmitry
VoIP Engineer
Open source telephony solutions
Skype: itsroot
Linkedin: https://www.linkedin.com/in/itsroot



Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-16 Thread Chesnay Schepler
ah yes, currently when you use that method the UI is started on a random 
port. I'm currently fixing that in this PR 
 that will be merged today. 
For now you will enable logging and search for something along the lines 
of "http://: was granted leadership"


Sorry for the inconvenience.

On 16.04.2018 15:04, Miguel Coimbra wrote:

Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency 
flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml 
would make it so the web front-end is running when I call the 
following line?


LocalEnvironment lenv = (LocalEnvironment) 
ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);


I added flink-runtime-web in my pom.xml, recompiled and launched the 
program but I simply got "Unable to connect" in my browser (Firefox) 
on localhost:8081.

Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26-- http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection 
refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: 
Connection refused.


It seems something was bound to localhost:8081 but the connection is 
not working for some reason.

I probably am skipping some important detail.
These are some of my dependencies:


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}


org.apache.flink
flink-gelly_${scala.binary.version}
${flink.version}


org.apache.flink
flink-gelly-examples_${scala.binary.version}
${flink.version}


org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}


 org.apache.flink
 flink-streaming-scala_${scala.binary.version}
 ${flink.version}



*
 org.apache.flink
 flink-runtime-web_${scala.binary.version}
 ${flink.version}
**
*

Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 

On 16 April 2018 at 05:12, Chesnay Schepler > wrote:


The thing with createLocalEnvironmentWithWebUI is that it requires
flink-runtime-web to be on the classpath, which is rarely the
class when running things in the IDE.
It should work fine in the IDE if you add it as a dependency to
your project. This should've been logged as a warning.

Chaining is unrelated to this issue as join operators are never
chained to one another.
Lambda functions are also not the issue, if they were the job
would fail much earlier.

It is reasonable that T3 is blocked if T1 is blocked. T1 gets no
input hence produces no output, which now also blocks T3.

There are multiple possible explanations i can come up with:
* the preceding operators are blocked on something or /really /slow
* the preceding operators are actually finished, but aren't
shutting down due to an implementation error
* a deadlock in Flink's join logic
* a deadlock in Flink's network stack

For the first 2 we will have to consult the UI or logs. You said
you were dumping the input DataSets into files, but were they
actually complete?

A deadlock in the network stack should appear as all existing
operator threads being blocked.
We can probably rule out a problem with the join logic by removing
the second join and trying again.



On 16.04.2018 03:10, Miguel Coimbra wrote:

Hello,

It would seem that the function which is supposed to launch local
mode with the web front-end doesn't launch the front-end at all...
This function seems not to be doing what it is supposed to do, if
I'm not mistaken:

LocalEnvironment lenv = (LocalEnvironment)
ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Regarding the preceding operators, the thread dumps I got were
pointing to a specific set of operations over DataSet instances
that were passed into my function.
Below I show the code segment and put the lines where threads are
waiting in *bold*:

public static  DataSet> selectEdges(final
Graph originalGraph, final DataSet>
vertices) {
return vertices
.joinWithHuge(originalGraph.getEdges())
.where(0).equalTo(0)
*.with((source, edge) -> edge)* *// Thread 1 is
blocked here*
.returns(originalGraph.getEdges().getType())
.join(vertices)
.where(1).equalTo(0)
*.with((e, v) -> e) // Thread 3 is blocked here*

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-16 Thread Miguel Coimbra
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency
flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would
make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
alEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the
program but I simply got "Unable to connect" in my browser (Firefox) on
localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection
refused.

It seems something was bound to localhost:8081 but the connection is not
working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}


org.apache.flink
flink-gelly_${scala.binary.version}
${flink.version}


org.apache.flink
flink-gelly-examples_${scala.binary.version}
${flink.version}


org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}


 org.apache.flink
 flink-streaming-scala_${scala.binary.version}
 ${flink.version}






* org.apache.flink
 flink-runtime-web_${scala.binary.version}
 ${flink.version}*

Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 

On 16 April 2018 at 05:12, Chesnay Schepler  wrote:

> The thing with createLocalEnvironmentWithWebUI is that it requires
> flink-runtime-web to be on the classpath, which is rarely the class when
> running things in the IDE.
> It should work fine in the IDE if you add it as a dependency to your
> project. This should've been logged as a warning.
>
> Chaining is unrelated to this issue as join operators are never chained to
> one another.
> Lambda functions are also not the issue, if they were the job would fail
> much earlier.
>
> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input
> hence produces no output, which now also blocks T3.
>
> There are multiple possible explanations i can come up with:
> * the preceding operators are blocked on something or *really *slow
> * the preceding operators are actually finished, but aren't shutting down
> due to an implementation error
> * a deadlock in Flink's join logic
> * a deadlock in Flink's network stack
>
> For the first 2 we will have to consult the UI or logs. You said you were
> dumping the input DataSets into files, but were they actually complete?
>
> A deadlock in the network stack should appear as all existing operator
> threads being blocked.
> We can probably rule out a problem with the join logic by removing the
> second join and trying again.
>
>
>
> On 16.04.2018 03:10, Miguel Coimbra wrote:
>
> Hello,
>
> It would seem that the function which is supposed to launch local mode
> with the web front-end doesn't launch the front-end at all...
> This function seems not to be doing what it is supposed to do, if I'm not
> mistaken:
>
> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
> alEnvironmentWithWebUI(conf);
>
> Regarding the preceding operators, the thread dumps I got were pointing to
> a specific set of operations over DataSet instances that were passed into
> my function.
> Below I show the code segment and put the lines where threads are waiting
> in *bold*:
>
> public static  DataSet> selectEdges(final Graph VV, EV> originalGraph, final DataSet> vertices) {
> return vertices
> .joinWithHuge(originalGraph.getEdges())
> .where(0).equalTo(0)
> *.with((source, edge) -> edge)* *// Thread 1 is blocked here*
> .returns(originalGraph.getEdges().getType())
> .join(vertices)
> .where(1).equalTo(0)
> *.with((e, v) -> e) // Thread 3 is blocked here*
> .returns(originalGraph.getEdges().getType())
> .distinct(0, 1);
> }
>
> Note: the edges inside the graph originalGraph edge DataSet are much
> greater in number than the elements of the vertices DataSet, so I believe
> that function is being used correctly.
>
> I will try testing with remote (cluster) mode to have access to the web
> front-end, but I have some questions for now:
>
> - The fact that they are blocked in different ​JoinOperator instances
> that are chained, is this a result of Flink's default pipeline 

Re: User-defined aggregation function and parallelism

2018-04-16 Thread Fabian Hueske
Hi Bill,

Flink's built-in aggregation functions are implemented against the same
interface as UDAGGs and are applied in parallel.
The performance depends of course on the implementation of the UDAGG. For
example, you should try to keep the size of the accumulator as small as
possible because it will be stored in the state backend.
If you are using the RocksDBStatebackend, this means that the accumulator
is de/serialized for every records.

Best, Fabian

2018-04-16 5:21 GMT+02:00 杨力 :

> I am running flink SQL in streaming mode and implemented a UDAGG, which is
> used in keyed HOP windows. But I found that the throughput decreases
> dramatically when the function is used. Does UDAGG run in parallell? Or
> does it run only in one thread?
>
> Regards,
> Bill
>


Re: How to rebalance a table without converting to dataset

2018-04-16 Thread Fabian Hueske
Hi Darshan,

You are right. there's currently no rebalancing operation on the Table API.
I see that this might be a good feature, not sure though how easy it would
be to integrate because we need to pass it through the Calcite optimizer
and rebalancing is not a relational operation.

For now, converting to DataSet and back to Table is the only option.

Best, Fabian

2018-04-13 14:33 GMT+02:00 Darshan Singh :

> Hi
>
> I have a table and I want to rebalance the data so that each partition is
> equal. I cna convert to dataset and rebalance and then convert to table.
>
> I couldnt find any rebalance on table api. Does anyone know any better
> idea to rebalance table data?
>
> Thanks
>


Re: Unable to launch job with 100 SQL queries in yarn cluster

2018-04-16 Thread Fabian Hueske
Hi Adrian,

Thanks reaching out to the community. I don't think that this is an issue
with Flink's SQL support. SQL queries are translated into regular streaming
(or batch) jobs.

The JM might just be overloaded by too many jobs. Since you are running in
a YARN environment, it might make sense to try to start more Flink clusters
and distribute the queries to more JMs.
In the upcoming Flink 1.5 release, the scheduling and integration with
resource managers will be completely reworked and make a
one-cluster-per-job deployment easier to maintain.

I've added some details to FLINK-9166.

Best,
Fabian

2018-04-13 8:59 GMT+02:00 Adrian Hains :

> Hi,
> We are having trouble scaling up Flink to execute a collection of SQL
> queries on a yarn cluster. Has anyone run this kind of workload on a
> cluster? Any tips on how to get past this issue?
>
> With a high number of Flink SQL queries (100 instances of the query at the
> bottom of this message), the Flink command line client fails with a
> “JobManager did not respond within 60 ms” on a Yarn cluster. JobManager
> logs has nothing after the last TaskManager started indicating its hung
> (creating the ExecutionGraph?). This configuration of queries works as a
> standalone program locally. I can also successfully launch and process 2
> instances of the query in cluster mode.
>
> When attempting 10 query instances in cluster mode, we are able to submit
> but the job errors out with “Insufficient number of network buffers:
> required 725, but only 135 available. The total number of network buffers
> is currently set to 61035 of 32768 bytes each. ”. Though surprisingly with
> a query count of 1, 15000 is all the network buffers that are needed. So it
> seems like the network buffer count quickly scales with the number of
> queries.
>
> Note: Each Row in structStream contains 515 columns (very sparse table,
> >500 are null for each row) including a column that has the raw message.
>
> In the YARN cluster we specify 18GB for TaskManager, 18GB for the
> JobManager, 5 slots and parallelism of 725 (the number of partitions in our
> Kafka source).
>
> The query is a simple filter and aggregation:
>
> select count(*), 'idnumber' as criteria, Environment, CollectedTimestamp, 
> EventTimestamp, RawMsg, Source \n" +
> "from structStream \n" +
> "where Environment='MyEnvironment' and Rule='MyRule' and 
> LogType='MyLogType' and Outcome='Success'\n" +
> "group by tumble(proctime, INTERVAL '1' SECOND), Environment, 
> CollectedTimestamp, EventTimestamp, RawMsg, Source"
>
>
> The code is included in https://issues.apache.org/jira/browse/FLINK-9166
>
> thanks!
> -a
>
> --
>
>
> 
>


Re: Kafka consumer to sync topics by event time?

2018-04-16 Thread Fabian Hueske
Fully agree Juho!

Do you want to contribute the docs fix?
If yes, we should update FLINK-5479 to make sure that the warning is
removed once the bug is fixed.

Thanks, Fabian

2018-04-12 9:32 GMT+02:00 Juho Autio :

> Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
> entirely preventing this feature to be used if there are any idle
> partitions. It would be nice to mention in documentation that currently
> this requires all subscribed partitions to have a constant stream of data
> with growing timestamps. When watermark gets stalled on an idle partition
> it blocks everything.
>
> Link to current documentation:
> https://ci.apache.org/projects/flink/flink-docs-
> master/dev/connectors/kafka.html#kafka-consumers-and-
> timestamp-extractionwatermark-emission
>
> On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske  wrote:
>
>> You are right, offsets cannot be used for tracking processing progress. I
>> think setting Kafka offsets with respect to some progress notion other than
>> "has been consumed" would be highly application specific and hard to
>> generalize.
>> As you said, there might be a window (such as a session window) that is
>> open much longer than all other windows and which would hold back the
>> offset. Other applications might not use the built-in windows at all but
>> custom ProcessFunctions.
>>
>> Have you considered tracking progress using watermarks?
>>
>> 2017-12-04 14:42 GMT+01:00 Juho Autio :
>>
>>> Thank you Fabian. Really clear explanation. That matches with my
>>> observation indeed (data is not dropped from either small or big topic, but
>>> the offsets are advancing in kafka side already before those offsets have
>>> been triggered from a window operator).
>>>
>>> This means that it's a bit harder to meaningfully monitor the job's
>>> progress solely based on kafka consumer offsets. Is there a reason why
>>> Flink couldn't instead commit the offsets after they have been triggered
>>> from downstream windows? I could imagine that this might pose a problem if
>>> there are any windows that remain open for a very long time, but in general
>>> it would be useful IMHO. Or Flink could even commit both (read vs.
>>> triggered) offsets to kafka for monitoring purposes.
>>>
>>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske  wrote:
>>>
 Hi Juho,

 the partitions of both topics are independently consumed, i.e., at
 their own speed without coordination. With the configuration that Gordon
 linked, watermarks are generated per partition.
 Each source task maintains the latest (and highest) watermark per
 partition and propagates the smallest watermark. The same mechanism is
 applied for watermarks across tasks (this is what Kien referred to).

 In the case that you are describing, the partitions of the smaller
 topic are faster consumed (hence the offsets are faster aligned) but
 watermarks are emitted "at the speed" of the bigger topic.
 Therefore, the timestamps of records from the smaller topic can be much
 ahead of the watermark.
 In principle, that does not pose a problem. Stateful operators (such as
 windows) remember the "early" records and process them when they receive a
 watermark passes the timestamps of the early records.

 Regarding your question "Are they committed to Kafka before their
 watermark has passed on Flink's side?":
 The offsets of the smaller topic might be checkpointed when all
 partitions have been read to the "end" and the bigger topic is still
 catching up.
 The watermarks are moving at the speed of the bigger topic, but all
 "early" events of the smaller topic are stored in stateful operators and
 are checkpointed as well.

 So, you do not lose neither early nor late data.

 Best, Fabian



 2017-12-01 13:43 GMT+01:00 Juho Autio :

> Thanks for the answers, I still don't understand why I can see the
> offsets being quickly committed to Kafka for the "small topic"? Are they
> committed to Kafka before their watermark has passed on Flink's side? That
> would be quite confusing.. Indeed when Flink handles the state/offsets
> internally, the consumer offsets are committed to Kafka just for 
> reference.
>
> Otherwise, what you're saying sounds very good to me. The
> documentation just doesn't explicitly say anything about how it works
> across topics.
>
> On Kien's answer: "When you join multiple stream with different
> watermarks", note that I'm not joining any topics myself, I get them as a
> single stream from the Flink kafka consumer based on the list of topics
> that I asked for.
>
> Thanks,
> Juho
>
> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
> tzuli...@apache.org> wrote:
>
>> Hi!
>>
>> The 

Re: Tracking deserialization errors

2018-04-16 Thread Fabian Hueske
Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register
metrics. Each source would need to check for the interface and call it to
setup metrics.
2) Check for null returns in the source functions and increment a
respective counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset
are important debugging information. However, I don't think that metrics
would be good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support
such error tracking.
Adding Gordon to the thread who knows the internals of the connectors.

Best, Fabian

2018-04-08 17:53 GMT+02:00 Alexander Smirnov :

> I have the same question. In case of kafka source, it would be good to
> know topic name and offset of the corrupted message for further
> investigation.
> Looks like the only option is to write messages into a log file
>
> On Fri, Apr 6, 2018 at 9:12 PM Elias Levy 
> wrote:
>
>> I was wondering how are folks tracking deserialization errors.  The 
>> AbstractDeserializationSchema
>> interface provides no mechanism for the deserializer to instantiate a
>> metric counter, and "deserialize" must return a null instead of raising an
>> exception in case of error if you want your job to continue functioning
>> during a deserialization error.  But that means such errors are invisible.
>>
>> Thoughts?
>>
>


Re: 答复: Slow flink checkpoint

2018-04-16 Thread Fabian Hueske
Thanks MaKeyang!

I've given you contributor permissions and assigned the issue to you.

Best, Fabian

2018-04-16 13:19 GMT+02:00 ma ky :

> Fabian:
> thanks for u replay.
> I have create a jira issue:
> https://issues.apache.org/jira/browse/FLINK-9182?jql=
> project%20%3D%20FLINK%20AND%20issuetype%20%3D%
> 20Improvement%20AND%20created%20%3E%3D%20-10m
>
>I'll pull the code ASAP.
>
>
>
>
> MaKeyang
> TIG.JD.COM
> 京东基础架构 
> tig.jd.com
> TIG官网
>
>
> --
> *发件人:* Fabian Hueske 
> *发送时间:* 2018年4月16日 16:21
> *收件人:* makeyang
> *抄送:* user; Aljoscha Krettek
> *主题:* Re: Slow flink checkpoint
>
> Hi everybody,
>
> Thanks so much for looking into this issue and posting the detailed
> description of your approach.
> As said before, improving the checkpointing performance for timers is a
> very important improvement for Flink.
>
> I'm not familiar with the internals of the timer service checkpointing,
> but adding an add and delete version field and perform async checkpoints
> based on these fields seems like a good approach to me.
> IIRC, Aljoscha (in CC) implemented the timer service and its
> checkpointing. He might have more comments.
>
> I'd suggest to create a JIRA (everybody can do that) and repost the
> description of your approach there.
> If you have the code ready, you can also open a PR and reference the JIRA.
>
> Best, Fabian
>
> 2018-04-16 9:03 GMT+02:00 makeyang :
>
> since flink forward SF has done.
> can you guys give some minutes to take a look at this issue and give some
> thoughts on it? help to review/comments on my desgin? or give us a design
> so
> that I can help to implement it.
>
> thanks a lot.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/
>
>
>


答复: Slow flink checkpoint

2018-04-16 Thread ma ky
Fabian:
thanks for u replay.
I have create a jira issue:
https://issues.apache.org/jira/browse/FLINK-9182?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Improvement%20AND%20created%20%3E%3D%20-10m

   I'll pull the code ASAP.



MaKeyang
TIG.JD.COM
京东基础架构
tig.jd.com
TIG官网




发件人: Fabian Hueske 
发送时间: 2018年4月16日 16:21
收件人: makeyang
抄送: user; Aljoscha Krettek
主题: Re: Slow flink checkpoint

Hi everybody,

Thanks so much for looking into this issue and posting the detailed description 
of your approach.
As said before, improving the checkpointing performance for timers is a very 
important improvement for Flink.

I'm not familiar with the internals of the timer service checkpointing, but 
adding an add and delete version field and perform async checkpoints based on 
these fields seems like a good approach to me.
IIRC, Aljoscha (in CC) implemented the timer service and its checkpointing. He 
might have more comments.

I'd suggest to create a JIRA (everybody can do that) and repost the description 
of your approach there.
If you have the code ready, you can also open a PR and reference the JIRA.

Best, Fabian

2018-04-16 9:03 GMT+02:00 makeyang 
>:
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some
thoughts on it? help to review/comments on my desgin? or give us a design so
that I can help to implement it.

thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Question about parallelism

2018-04-16 Thread Fabian Hueske
(re-adding user mailing list)

A non-serializable function object should cause the job to fail, but not to
ignore a parallelism setting.

This might be a bug. Most users specify the parallelism directly in the
application code (via StreamExecutionEnvironment) or when submitting the
application.
Which version are you using?

Best, Fabian

2018-04-14 15:07 GMT+02:00 Michael Latta :

> Parallelism in config. I think the issue is that some objects used oin the
> stream are not serializable (which I just discovered). I am surprised it
> supports that.
>
>
> Michael
>
> On Apr 14, 2018, at 6:12 AM, Fabian Hueske  wrote:
>
> Hi,
>
> The number of Taskmanagers is irrelevant für the parallelism of a job or
> operator. The scheduler only cares about the number of slots.
>
> How did you set the default parallelism? In the config or in the program /
> StreamExecutionEnvironment?
>
> Best, Fabian
>
>
> TechnoMage  schrieb am Fr., 13. Apr. 2018, 04:30:
>
>> I am pretty new to flink.  I have a flink job that has 10 transforms
>> (mostly CoFlatMap with some simple filters and key extractrs as well.  I
>> have the config set for 6 slots and default parallelism of 6, but all my
>> stages show paralellism of 1.  Is that because there is only one task
>> manager?  Some of what I have read suggested separate slots were needed to
>> use multiple threads on a single box?  I have read the section on the docs
>> several times and still not totally sure about the execution model.
>>
>> Michael
>
>


Re: assign time attribute after first window group when using Flink SQL

2018-04-16 Thread Fabian Hueske
Sorry, I forgot to CC the user mailing list in my reply.

2018-04-12 17:27 GMT+02:00 Fabian Hueske :

> Hi,
>
> Assuming you are using event time, the right function to generate a row
> time attribute from a window would be "w1.rowtime" instead of "w1.start".
>
> The reason why Flink is picky about this is that we must ensure that the
> result rows of the windows are aligned with the watermarks of the stream.
>
> Best, Fabian
>
>
> Ivan Wang  schrieb am So., 8. Apr. 2018, 22:26:
>
>> Hi all,
>>
>>
>>
>> I'd like to use 2 window group in a chain in my program as below.
>>
>>
>>
>> Table myTable = cTable
>> .window(Tumble.*over*("15.seconds").on("timeMill").as("w1"))
>> .groupBy("symbol, w1").select("w1.start as start, w1.end as end,
>> symbol, price.max as p_max, price.min as p_min")
>> .window(Slide.*over*("150.rows").every("1.rows").on("start").as(
>> "w2"))
>> .groupBy("symbol, w2").select("w2.start, w2.end, symbol,
>> p_max.max, p_min.min")
>> ;
>>
>>
>>
>>
>>
>> However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows,
>> 1.rows) is invalid: Sliding window expects a time attribute for grouping in
>> a stream environment.
>>
>>  at org.apache.flink.table.plan.logical.LogicalNode.
>> failValidation(LogicalNode.scala:149)
>>
>>  at org.apache.flink.table.plan.logical.WindowAggregate.
>> validate(operators.scala:658)
>>
>>  at org.apache.flink.table.api.WindowGroupedTable.select(
>> table.scala:1159)
>>
>>  at org.apache.flink.table.api.WindowGroupedTable.select(
>> table.scala:1179)
>>
>>  at minno.gundam.ReadPattern.main(ReadPattern.java:156)
>>
>>
>>
>> Is there any way to assign time attribute after the first groupBy (w1)?
>>
>>
>>
>> Thanks
>>
>> Ivan
>>
>>
>>
>>


Re: Volume question

2018-04-16 Thread Fabian Hueske
Sorry, I forgot to CC the user mailing list in my reply.

2018-04-12 17:26 GMT+02:00 Fabian Hueske :

> Hi,
>
> Sorry for the long delay. Many contributors are traveling due to Flink
> Forward.
>
> Your use case should be well supported by Flink. Flink will partition and
> distribute the keys across all parallel instances of an operator and can
> handle very large stage (up to several TBs).
>
> Best, Fabian
>
> TechnoMage  schrieb am Sa., 7. Apr. 2018, 10:57:
>
>> I have a use case that I wonder if Flink handles well:
>>
>> 1) 5M+ keys in a KeyedStream
>> 2) Using RichFlatMap to track data on each key
>>
>> Will Flink spread one operator’s partitions over multiple
>> machines/taskmanager/jobmanager?
>>
>> Michael
>
>


Re: Slow flink checkpoint

2018-04-16 Thread makeyang
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some
thoughts on it? help to review/comments on my desgin? or give us a design so
that I can help to implement it.

thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Slow flink checkpoint

2018-04-16 Thread 林德强
Hi Stefan , Fabian ,
Keyang  is engineer in our team, he has do a lot of efforts on 
the timers' snapshot async. What do you think of his idea?


Best,
Deqiang
TIG.JD.COM 


> 在 2018年4月1日,下午7:21,makeyang  写道:
> 
> I have put a lot of efforts on this issue and try to resolve it:
> 1. let me describe current timers' snapshot path first:
>a) for each keygroup, invoke
> InternalTimeServiceManager.snapshotStateForKeyGroup
>b) InternalTimeServiceManager create a
> InternalTimerServiceSerializationProxy to write snapshot
>c) InternalTimerServiceSerializationProxy iterat  service name,
>HeapInternalTimerService> tuple and write service name and
> snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to
> writeTimersSnapshot
>d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first
> write keyserializer and namespaceserializer, then get eventTimers and
> processingTimers of InternalTimersSnapshot, which is Set of InternalTimer
> and serializer them.
> 
> 2. my first try is shallow copy the  tuples
> and then shallow copy the eventTimers and processingTimers, then use another
> thread to snapshot them without blocking the event processing thread. but it
> turns out that shallow copy of the eventTimers and processingTimers are time
> consumed and this solution failed
> 
> 3. then I try to borrow the idea of data structure CopyOnWriteStateTable and
> try to manage timers with it. but after digging more, I found out that there
> is a more easy way to achieve asynchronous snapshot timers due to one fact:
> InternalTimer is immutable. we can achieve asynchronous with a more easy way
> based on this fact: 
>a)maintain a stateTableVersion, which is exactly the same thing as
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing
> as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a
> readwrite lock, which is used to protect snapshotVersions and
> stateTableVersion
>b)for each InternalTimer, add 2 more properties: create version and
> delete version beside 3 existing properties: timestamp, key and namespace.
> each time a Timer is registered in timerservice, it is created with
> stateTableVersion as its create version while delete version is -1. each
> time when timer is deleted in timerservice, it is marked delete for giving
> it a delete verison equals to stateTableVersion without physically delete it
> from timerservice. 
>c)each time when try to snapshot timers, InternalTimeServiceManager
> increase its stateTableVersion and add this stateTableVersion in
> snapshotVersions. these 2 operators are protected by write lock of
> InternalTimeServiceManager. that current stateTableVersion take as snapshot
> version of this snapshot
>d)shallow copy  tuples 
>e)then use a another thread asynchronous snapshot whole things:
> keyserialized, namespaceserializer and timers. for timers which is not
> deleted(delete version is -1) and create version less than snapshot version,
> serialized it. for timers whose delete version is not -1 and is bigger than
> or equals snapshot version, serialized it. otherwise, it will not be
> serialized by this snapshot. 
>f)when everything is serialized, remove snapshot version in
> snapshotVersions, which is still in another thread and this action is
> guarded by write lock.
>g)last thing: timer physical deletion. 2 places to physically delete
> timers: each time when timer is deleted in timerservice, it is marked delete
> for giving it a delete verison equals to stateTableVersion without
> physically delete it from timerservice. after this, check if timer's delete
> version is less than min value of snapshotVersions with read lock
> guarded(which means there is no active timer snapshot running) and if that
> is true, physically delete it. the other place to delete is in snapshot
> timer's iterat: when timer's delete version is less than min value of
> snapshotVersions, which means the timer is deleted and no running snapshot
> should keep it.
>h) some more additions: processingTimeTimers and eventTimeTimers for
> each group used to be hashset and now it is changed to concurrenthashmap
> with key+namesapce+timestamp as its hash key.
> 
> the code is done and test is still runnng. I post this comments not only try
> to hear u guys voice, but also try to figure out some more questios related
> to currently timer snapshot code path. my questions are below:
> 1. in method onProcessingTime of HeapInternalTimerService, it is invoked by
> another thread of ProcessingTimeService, and in this thread, it will remove
> timer in HeapInternalTimerService. while in current timer snapshot path, I
> haven't found there is any shallow copy of processingTimeTimers and
> eventTimeTimers. how could this won't cause concurrent modification
> exception?
>