Lower Parallelism derives better latency

2018-01-02 Thread Netzer, Liron
Hi group,

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs 
and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator 
has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms


This was a surprise for us, as we expected that higher parallelism will derive 
better latency.
Could you try to assist us to understand this behavior?
I know that when there are more threads that are involved, there is probably 
more serialization/deserialization, but this can't be the only reason for this 
behavior.

We have two Kafka sources, and the rest of the operators are fixed windows, 
flatmaps, coMappers and several KeyBys.
Except for the Kafka sources and some internal logging, there is no other I/O 
(i.e. we do not connect to any external DB/queue)
We use Flink 1.3.


Thanks,
Liron



JobManager not receiving resource offers from Mesos

2018-01-02 Thread 김동원
Hi,

I try to launch a Flink cluster on top of dc/os but TaskManagers are not 
launched at all.

What I do to launch a Flink cluster is as follows:
- Click "flink" from "Catalog" on the left panel of dc/os GUI.
- Click "Run service" without any modification on configuration for the purpose 
of testing (Figure 1).

Until now, everything seems okay as shown in Figure 2.
However, Figure 3 shows that TaskManager has never been launched.

So I take a look at JobManager log (see the attached "log.txt" for full log).
LaunchCoordinator is spitting the same log messages while staying in 
"GetheringOffers" state as follows:
INFO  org.apache.flink.mesos.scheduler.LaunchCoordinator- 
Processing 1 task(s) against 0 new offer(s) plus outstanding off$
DEBUG com.netflix.fenzo.TaskScheduler   - Found 0 
VMs with non-zero offers to assign from
INFO  org.apache.flink.mesos.scheduler.LaunchCoordinator- Resources 
considered: (note: expired offers not deducted from be$
DEBUG org.apache.flink.mesos.scheduler.LaunchCoordinator- 
SchedulingResult{resultMap={}, failures={}, leasesAdded=0, lease$
INFO  org.apache.flink.mesos.scheduler.LaunchCoordinator- Waiting 
for more offers; 1 task(s) are not yet launched.
(FYI, ConnectionMonitor is in its "ConnectedState" as you can see in the full 
log file.)

Can anyone point out what's going wrong on my dc/os installation?
Thanks you for attention. I'm really looking forward to running Flink clusters 
on dc/os :-)

p.s. I tested whether dc/os is working correctly by using the following scripts 
and it works.
{
 "id": "simple-gpu-test",
 "acceptedResourceRoles":["slave_public", "*"],
 "cmd": "while [ true ] ; do nvidia-smi; sleep 5; done",
 "cpus": 1,
 "mem": 128,
 "disk": 0,
 "gpus": 1,
 "instances": 8
}




Re: BackPressure handling

2018-01-02 Thread Vishal Santoshi
Also note that if I were to start 2 pipelines

1. Working off the head of the topic and thus not prone to the pathological
case described above
2. Doing a replay and thus prone to the  pathological case described above

Than the 2nd pipe will stall the 1st pipeline. This seems to to point to

   - All channels multiplexed into the same TCP connection stall together,
   as soon as one channel has backpressure.


of the jira issue. This has to be a priority IMHO, in a shared VM where
jobs should have at least some isolation.

On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi 
wrote:

> Thank you.
>
> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber 
> wrote:
>
>> Hi Vishal,
>> let me already point you towards the JIRA issue for the credit-based
>> flow control: https://issues.apache.org/jira/browse/FLINK-7282
>>
>> I'll have a look at the rest of this email thread tomorrow...
>>
>>
>> Regards,
>> Nico
>>
>> On 02/01/18 17:52, Vishal Santoshi wrote:
>> > Could you please point me to any documentation on the  "credit-based
>> > flow control" approach
>> >
>> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther > > > wrote:
>> >
>> > Hi Vishal,
>> >
>> > your assumptions sound reasonable to me. The community is currently
>> > working on a more fine-grained back pressuring with credit-based
>> > flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
>> > Nico that might tell you more about the details. Until then I guess
>> > you have to implement a custom source/adapt an existing source to
>> > let the data flow in more realistic.
>> >
>> > Regards,
>> > Timo
>> >
>> > [1]
>> > http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5
>> -timeline.html
>> > > and-1.5-timeline.html>
>> > [2] https://www.youtube.com/watch?v=scStdhz9FHc
>> > 
>> >
>> >
>> > Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
>> >
>> > I did a simulation on session windows ( in 2 modes ) and let it
>> > rip for about 12 hours
>> >
>> > 1. Replay where a kafka topic with retention of 7 days was the
>> > source ( earliest )
>> > 2. Start the pipe with kafka source ( latest )
>> >
>> > I saw results that differed dramatically.
>> >
>> > On replay the pipeline stalled after  good ramp up while in the
>> > second case the pipeline hummed on without issues. For the same
>> > time period the data consumed is significantly more in the
>> > second case with the WM progression stalled in the first case
>> > with no hint of resolution ( the incoming data on source topic
>> > far outstrips the WM progression )  I think I know the reasons
>> > and this is my hypothesis.
>> >
>> > In replay mode the number of windows open do not have an upper
>> > bound. While buffer exhaustion ( and data in flight with
>> > watermark )  is the reason for throttle, it does not really
>> > limit the open windows and in fact creates windows that reflect
>> > futuristic data ( future is relative to the current WM ) . So if
>> > partition x has data for watermark time t(x) and partition y for
>> > watermark time t(y) and t(x) << t(y) where the overall watermark
>> > is t(x) nothing significantly throttles consumption from the y
>> > partition ( in fact for x too ) , the bounded buffer based
>> > approach does not give minute control AFAIK as one would hope
>> > and that implies there are far more open windows than the system
>> > can handle and that leads to the pathological case where the
>> > buffers fill up  ( I believe that happens way late ) and
>> > throttling occurs but the WM does not proceed and windows that
>> > could ease the glut the throttling cannot proceed. In the
>> > replay mode the amount of data implies that the Fetchers keep
>> > pulling data at the maximum consumption allowed by the open
>> > ended buffer approach.
>> >
>> > My question thus is, is there any way to have a finer control of
>> > back pressure, where in the consumption from a source is
>> > throttled preemptively ( by for example decreasing the buffers
>> > associated for a pipe or the size allocated ) or sleeps in the
>> > Fetcher code that can help aligning the performance to have real
>> > time consumption  characteristics
>> >
>> > Regards,
>> >
>> > Vishal.
>> >
>> >
>> >
>> >
>> >
>> >
>>
>>
>


Re: BackPressure handling

2018-01-02 Thread Vishal Santoshi
Thank you.

On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber  wrote:

> Hi Vishal,
> let me already point you towards the JIRA issue for the credit-based
> flow control: https://issues.apache.org/jira/browse/FLINK-7282
>
> I'll have a look at the rest of this email thread tomorrow...
>
>
> Regards,
> Nico
>
> On 02/01/18 17:52, Vishal Santoshi wrote:
> > Could you please point me to any documentation on the  "credit-based
> > flow control" approach
> >
> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther  > > wrote:
> >
> > Hi Vishal,
> >
> > your assumptions sound reasonable to me. The community is currently
> > working on a more fine-grained back pressuring with credit-based
> > flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
> > Nico that might tell you more about the details. Until then I guess
> > you have to implement a custom source/adapt an existing source to
> > let the data flow in more realistic.
> >
> > Regards,
> > Timo
> >
> > [1]
> > http://flink.apache.org/news/2017/11/22/release-1.4-and-1.
> 5-timeline.html
> >  5-timeline.html>
> > [2] https://www.youtube.com/watch?v=scStdhz9FHc
> > 
> >
> >
> > Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
> >
> > I did a simulation on session windows ( in 2 modes ) and let it
> > rip for about 12 hours
> >
> > 1. Replay where a kafka topic with retention of 7 days was the
> > source ( earliest )
> > 2. Start the pipe with kafka source ( latest )
> >
> > I saw results that differed dramatically.
> >
> > On replay the pipeline stalled after  good ramp up while in the
> > second case the pipeline hummed on without issues. For the same
> > time period the data consumed is significantly more in the
> > second case with the WM progression stalled in the first case
> > with no hint of resolution ( the incoming data on source topic
> > far outstrips the WM progression )  I think I know the reasons
> > and this is my hypothesis.
> >
> > In replay mode the number of windows open do not have an upper
> > bound. While buffer exhaustion ( and data in flight with
> > watermark )  is the reason for throttle, it does not really
> > limit the open windows and in fact creates windows that reflect
> > futuristic data ( future is relative to the current WM ) . So if
> > partition x has data for watermark time t(x) and partition y for
> > watermark time t(y) and t(x) << t(y) where the overall watermark
> > is t(x) nothing significantly throttles consumption from the y
> > partition ( in fact for x too ) , the bounded buffer based
> > approach does not give minute control AFAIK as one would hope
> > and that implies there are far more open windows than the system
> > can handle and that leads to the pathological case where the
> > buffers fill up  ( I believe that happens way late ) and
> > throttling occurs but the WM does not proceed and windows that
> > could ease the glut the throttling cannot proceed. In the
> > replay mode the amount of data implies that the Fetchers keep
> > pulling data at the maximum consumption allowed by the open
> > ended buffer approach.
> >
> > My question thus is, is there any way to have a finer control of
> > back pressure, where in the consumption from a source is
> > throttled preemptively ( by for example decreasing the buffers
> > associated for a pipe or the size allocated ) or sleeps in the
> > Fetcher code that can help aligning the performance to have real
> > time consumption  characteristics
> >
> > Regards,
> >
> > Vishal.
> >
> >
> >
> >
> >
> >
>
>


Re: BackPressure handling

2018-01-02 Thread Nico Kruber
Hi Vishal,
let me already point you towards the JIRA issue for the credit-based
flow control: https://issues.apache.org/jira/browse/FLINK-7282

I'll have a look at the rest of this email thread tomorrow...


Regards,
Nico

On 02/01/18 17:52, Vishal Santoshi wrote:
> Could you please point me to any documentation on the  "credit-based
> flow control" approach
> 
> On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther  > wrote:
> 
> Hi Vishal,
> 
> your assumptions sound reasonable to me. The community is currently
> working on a more fine-grained back pressuring with credit-based
> flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
> Nico that might tell you more about the details. Until then I guess
> you have to implement a custom source/adapt an existing source to
> let the data flow in more realistic.
> 
> Regards,
> Timo
> 
> [1]
> http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html
> 
> 
> [2] https://www.youtube.com/watch?v=scStdhz9FHc
> 
> 
> 
> Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
> 
> I did a simulation on session windows ( in 2 modes ) and let it
> rip for about 12 hours
> 
> 1. Replay where a kafka topic with retention of 7 days was the
> source ( earliest )
> 2. Start the pipe with kafka source ( latest )
> 
> I saw results that differed dramatically.
> 
> On replay the pipeline stalled after  good ramp up while in the
> second case the pipeline hummed on without issues. For the same
> time period the data consumed is significantly more in the
> second case with the WM progression stalled in the first case
> with no hint of resolution ( the incoming data on source topic
> far outstrips the WM progression )  I think I know the reasons
> and this is my hypothesis.
> 
> In replay mode the number of windows open do not have an upper
> bound. While buffer exhaustion ( and data in flight with
> watermark )  is the reason for throttle, it does not really
> limit the open windows and in fact creates windows that reflect
> futuristic data ( future is relative to the current WM ) . So if
> partition x has data for watermark time t(x) and partition y for
> watermark time t(y) and t(x) << t(y) where the overall watermark
> is t(x) nothing significantly throttles consumption from the y
> partition ( in fact for x too ) , the bounded buffer based
> approach does not give minute control AFAIK as one would hope
> and that implies there are far more open windows than the system
> can handle and that leads to the pathological case where the
> buffers fill up  ( I believe that happens way late ) and
> throttling occurs but the WM does not proceed and windows that
> could ease the glut the throttling cannot proceed. In the
> replay mode the amount of data implies that the Fetchers keep
> pulling data at the maximum consumption allowed by the open
> ended buffer approach.
> 
> My question thus is, is there any way to have a finer control of
> back pressure, where in the consumption from a source is
> throttled preemptively ( by for example decreasing the buffers
> associated for a pipe or the size allocated ) or sleeps in the
> Fetcher code that can help aligning the performance to have real
> time consumption  characteristics
> 
> Regards,
> 
> Vishal.
> 
> 
> 
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: BackPressure handling

2018-01-02 Thread Vishal Santoshi
Could you please point me to any documentation on the  "credit-based flow
control" approach

On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther  wrote:

> Hi Vishal,
>
> your assumptions sound reasonable to me. The community is currently
> working on a more fine-grained back pressuring with credit-based flow
> control. It is on the roamap for 1.5 [1]/[2]. I will loop in Nico that
> might tell you more about the details. Until then I guess you have to
> implement a custom source/adapt an existing source to let the data flow in
> more realistic.
>
> Regards,
> Timo
>
> [1] http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-
> timeline.html
> [2] https://www.youtube.com/watch?v=scStdhz9FHc
>
>
> Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
>
> I did a simulation on session windows ( in 2 modes ) and let it rip for
>> about 12 hours
>>
>> 1. Replay where a kafka topic with retention of 7 days was the source (
>> earliest )
>> 2. Start the pipe with kafka source ( latest )
>>
>> I saw results that differed dramatically.
>>
>> On replay the pipeline stalled after  good ramp up while in the second
>> case the pipeline hummed on without issues. For the same time period the
>> data consumed is significantly more in the second case with the WM
>> progression stalled in the first case with no hint of resolution ( the
>> incoming data on source topic far outstrips the WM progression )  I think I
>> know the reasons and this is my hypothesis.
>>
>> In replay mode the number of windows open do not have an upper bound.
>> While buffer exhaustion ( and data in flight with watermark )  is the
>> reason for throttle, it does not really limit the open windows and in fact
>> creates windows that reflect futuristic data ( future is relative to the
>> current WM ) . So if partition x has data for watermark time t(x) and
>> partition y for watermark time t(y) and t(x) << t(y) where the overall
>> watermark is t(x) nothing significantly throttles consumption from the y
>> partition ( in fact for x too ) , the bounded buffer based approach does
>> not give minute control AFAIK as one would hope and that implies there are
>> far more open windows than the system can handle and that leads to the
>> pathological case where the buffers fill up  ( I believe that happens way
>> late ) and throttling occurs but the WM does not proceed and windows that
>> could ease the glut the throttling cannot proceed. In the replay mode
>> the amount of data implies that the Fetchers keep pulling data at the
>> maximum consumption allowed by the open ended buffer approach.
>>
>> My question thus is, is there any way to have a finer control of back
>> pressure, where in the consumption from a source is throttled preemptively
>> ( by for example decreasing the buffers associated for a pipe or the size
>> allocated ) or sleeps in the Fetcher code that can help aligning the
>> performance to have real time consumption  characteristics
>>
>> Regards,
>>
>> Vishal.
>>
>>
>>
>>
>>
>


Re: keyby() issue

2018-01-02 Thread Timo Walther

Hi Jinhua,

did you check the key group assignments? What is the distribution of 
"MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your 
data? This also depends on the hashCode on the output of your KeySelector.


keyBy should handle high traffic well, but it is designed for key spaces 
with thousands or millions of values. If this is not the case, you need 
to introduce some more artifical key to spread the load more evenly.


Regarding your OutOfMemoryError: I think you producing elements much 
faster than the following operators after keyBy process/discard the 
elements. Can you explain us your job in more detail? Are you using 
event-time? How do you aggregate elements of the windows?


Regards,
Timo



Am 1/1/18 um 6:00 AM schrieb Jinhua Luo:

I checked the logs, but no information indicates what happens.

In fact, in the same app, there is another stream, but its kafka
source is low traffic, and I aggregate some field of that source too,
and flink gives correct results continuously.
So I doubt if keyby() could not handle high traffic well (which
affects the number of keys in the key partitions).

2018-01-01 2:04 GMT+08:00 Steven Wu :

  but soon later, no results produced, and flink seems busy doing something
forever.

Jinhua, don't know if you have checked these things. if not, maybe worth a
look.

have you tried to do a thread dump?
How is the GC pause?
do you see flink restart? check the exception tab in Flink web UI for your
job.



On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo  wrote:

I take time to read some source codes about the keyed stream
windowing, and I make below understanding:

a) the keyed stream would be split and dispatched to downstream tasks
in hash manner, and the hash base is the parallelism of the downstream
operator:

See
org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int,
int):
MathUtils.murmurHash(keyHash) % maxParallelism;

That's what the doc said "hash partitioning".

So the compiled execution graph already determines whose operator
instance receive which key groups.

b) with windowing, the key is used to index window states, so the
window function would receive the deserialized value from its
corresponding window state of some key.

b.1) The element would be added into the state first:

See
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord):
windowState.add(element.getValue());

b.2) when the trigger fires the window, the value would be
deserialized from the keyed state:

ACC contents = windowState.get();
emitWindowContents(actualWindow, contents);

For rocksdb backend, each input element would be taken back and forth
from the disk in the processing.

flink's keyed stream has the same functionality as storm's field
grouping, and more complicated.

Am I correct?


But I still could not understand why keyby() stops flink from
returning expected results.

Let me explain my case more:
I use kafka data source, which collects log lines of log files from
tens of machines.
The log line is in json format, which contains the "ip" field, the ip
address of the user, so it could be valued in million of ip addresses
of the Internet.
The stream processing is expected to result in ip aggregation in {1
hour, 1 min} sliding window.

If I use keyBy("ip"), then at first minutes, the flink could give me
correct aggregation results, but soon later, no results produced, and
flink seems busy doing something forever.

I doubt if keyby() could handle huge keys like this case, and when I
remove keyby().window().fold() and use windowAll().fold() instead (the
latter fold operator uses hashmap to aggregate ip by itself), flink
works. But as known, the windowAll() is not scale-able.

Could flink developers help me on this topic, I prefer flink and I
believe flink is one of best stream processing frameworks, but I am
really frustrated that flink could be fulfill its feature just like
the doc said.

Thank you all.


2017-12-29 17:42 GMT+08:00 Jinhua Luo :

I misuse the key selector. I checked the doc and found it must return
deterministic key, so using random is wrong, but I still could not
understand why it would cause oom.



2017-12-28 21:57 GMT+08:00 Jinhua Luo :

It's very strange, when I change the key selector to use random key,
the jvm reports oom.

.keyBy(new KeySelector() {
  public Integer getKey(MyEvent ev) { return
ThreadLocalRandom.current().nextInt(1, 100);}
})

Caused by: java.lang.OutOfMemoryError: Java heap space
 at
com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469)
 at
com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230)
 at
com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144)
 at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818)
 at 

Re: How to stop FlinkKafkaConsumer and make job finished?

2018-01-02 Thread Timo Walther

Hi Arnaud,

thanks for letting us know your workaround. I agree that this is a 
frequently asked topic and important in certain use cases. I'm sure that 
it will be solved in the near future depending on the priorities.


My 2 cents: Flink is an open source project maybe somebody is willing to 
work on a solution for one or more Flink sources :)


Regards,
Timo


Am 1/2/18 um 4:50 PM schrieb LINZ, Arnaud:

Hi,

My 2 cents: not being able to programmatically nicely stop a Flink stream is what 
lacks most to the framework IMHO. It's a very common use case: each time you want 
to update the application or change its configuration you need to nicely stop  
& restart it, without triggering alerts, data loss, or anything else.
That's why I never use the provided Flink Sources "out of the box". I've made a framework 
that encapsulate them, adding a monitoring thread that periodically check for a special "hdfs 
stop file" and try to nicely cancel() the source if the user requested a stop by this mean 
(I've found that the hdfs file trick is most easy way to reach from an external application all 
task managers running on unknown hosts).

I could not use the "special message" trick because in most real production 
environment you cannot, as a client, post a message in a queue just for your client's 
need: you don't have proper access rights to do so ; and you don't know how other 
clients, connected to the same data, may react to fake messages...

Unfortunately most Flink sources cannot be "cancelled" nicely without changing 
part of their code. It's the case for the Kafka source.

- If a kafa consumer source instance is not connected to any partition (because 
it's parallelism level exceeds the kafka consumer group partition number for 
instance), we end up in an infinite wait in FlinkKafkaConsumerBase.run() until 
thread is interrupted :

 // wait until this is canceled
 final Object waitLock = new Object();
 while (running) {
 try {
 //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
 synchronized (waitLock) {
 waitLock.wait();
 }
 }
 catch (InterruptedException e) {
 if (!running) {
 // restore the interrupted 
state, and fall through the loop
 
Thread.currentThread().interrupt();
 }
 }
 }

So either you change the code, or in your monitoring thread you interrupt the 
source thread -- but that will trigger the HA mechanism, the source instance 
will be relaunched n times before failing.

- BTW it's also the case with RMQSource, as the "nextDelivery" in 
RMQSource.run() is called without timeout :
 @Override
 public void run(SourceContext ctx) throws Exception {
 while (running) {
 QueueingConsumer.Delivery delivery = 
consumer.nextDelivery();

So if no message arrives, the while running check is not done and the source 
cannot be cancelled without hard interruption.

Best regards,
Arnaud


-Message d'origine-
De : Ufuk Celebi [mailto:u...@apache.org]
Envoyé : vendredi 29 décembre 2017 10:30
À : Eron Wright 
Cc : Ufuk Celebi ; Jaxon Hu ; user 
; Aljoscha Krettek 
Objet : Re: How to stop FlinkKafkaConsumer and make job finished?

Yes, that sounds like what Jaxon is looking for. :-) Thanks for the pointer 
Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright  wrote:

I believe you can extend the `KeyedDeserializationSchema` that you
pass to the consumer to check for end-of-stream markers.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/o
rg/apache/flink/streaming/util/serialization/KeyedDeserializationSchem
a.html#isEndOfStream-T-

Eron

On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi  wrote:

Hey Jaxon,

I don't think it's possible to control this via the life-cycle
methods of your functions.

Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:

1) Extend Kafka consumer to stop emitting records after your EOF
record. Look at the flink-connector-kafka-base module. This is
probably not feasible and some work to get familiar with the code.
Just putting in out there.

2) Throw a "SuccessException" that fails the job. Easy, but not nice.

3) Use an Http client and cancel your job via the Http endpoint


RE: How to stop FlinkKafkaConsumer and make job finished?

2018-01-02 Thread LINZ, Arnaud
Hi,

My 2 cents: not being able to programmatically nicely stop a Flink stream is 
what lacks most to the framework IMHO. It's a very common use case: each time 
you want to update the application or change its configuration you need to 
nicely stop  & restart it, without triggering alerts, data loss, or anything 
else.
That's why I never use the provided Flink Sources "out of the box". I've made a 
framework that encapsulate them, adding a monitoring thread that periodically 
check for a special "hdfs stop file" and try to nicely cancel() the source if 
the user requested a stop by this mean (I've found that the hdfs file trick is 
most easy way to reach from an external application all task managers running 
on unknown hosts).

I could not use the "special message" trick because in most real production 
environment you cannot, as a client, post a message in a queue just for your 
client's need: you don't have proper access rights to do so ; and you don't 
know how other clients, connected to the same data, may react to fake 
messages...

Unfortunately most Flink sources cannot be "cancelled" nicely without changing 
part of their code. It's the case for the Kafka source.

- If a kafa consumer source instance is not connected to any partition (because 
it's parallelism level exceeds the kafka consumer group partition number for 
instance), we end up in an infinite wait in FlinkKafkaConsumerBase.run() until 
thread is interrupted :

// wait until this is canceled
final Object waitLock = new Object();
while (running) {
try {
//noinspection 
SynchronizationOnLocalVariableOrMethodParameter
synchronized (waitLock) {
waitLock.wait();
}
}
catch (InterruptedException e) {
if (!running) {
// restore the interrupted 
state, and fall through the loop

Thread.currentThread().interrupt();
}
}
}

So either you change the code, or in your monitoring thread you interrupt the 
source thread -- but that will trigger the HA mechanism, the source instance 
will be relaunched n times before failing.

- BTW it's also the case with RMQSource, as the "nextDelivery" in 
RMQSource.run() is called without timeout :
@Override
public void run(SourceContext ctx) throws Exception {
while (running) {
QueueingConsumer.Delivery delivery = 
consumer.nextDelivery();

So if no message arrives, the while running check is not done and the source 
cannot be cancelled without hard interruption.

Best regards,
Arnaud


-Message d'origine-
De : Ufuk Celebi [mailto:u...@apache.org]
Envoyé : vendredi 29 décembre 2017 10:30
À : Eron Wright 
Cc : Ufuk Celebi ; Jaxon Hu ; user 
; Aljoscha Krettek 
Objet : Re: How to stop FlinkKafkaConsumer and make job finished?

Yes, that sounds like what Jaxon is looking for. :-) Thanks for the pointer 
Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright  wrote:
> I believe you can extend the `KeyedDeserializationSchema` that you
> pass to the consumer to check for end-of-stream markers.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/o
> rg/apache/flink/streaming/util/serialization/KeyedDeserializationSchem
> a.html#isEndOfStream-T-
>
> Eron
>
> On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi  wrote:
>>
>> Hey Jaxon,
>>
>> I don't think it's possible to control this via the life-cycle
>> methods of your functions.
>>
>> Note that Flink currently does not support graceful stop in a
>> meaningful manner and you can only cancel running jobs. What comes to
>> my mind to cancel on EOF:
>>
>> 1) Extend Kafka consumer to stop emitting records after your EOF
>> record. Look at the flink-connector-kafka-base module. This is
>> probably not feasible and some work to get familiar with the code.
>> Just putting in out there.
>>
>> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>>
>> 3) Use an Http client and cancel your job via the Http endpoint
>>
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
>> Easy, but not nice, since you need quite some logic in your function
>> (e.g. ignore records after EOF record until cancellation, etc.).
>>
>> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>>
>> – Ufuk
>>
>>
>> On Mon, Dec 25, 2017 at 8:59 

Re: Flink Kafka Consumer stops fetching records

2018-01-02 Thread Timo Walther

Hi Teena,

could you tell us a bit more about your job. Are you using event-time 
semantics?


Regards,
Timo

Am 1/2/18 um 6:14 AM schrieb Teena K:

Hi,

I am using Flink 1.4 along with Kafka 0.11. My stream job has 4 Kafka 
consumers each subscribing to 4 different topics. The stream from each 
consumer gets processed in 3 to 4 different ways there by writing to a 
total of 12 sinks (cassandra tables). When the job runs, up to 8 or 10 
records get processed correctly and after that they are not subscribed 
by the consumers. I have tried this with 'flink 1.3.2 and kafka 0.10' 
and 'flink 1.4 and kafka 0.10' all of which gave the same results. 





Re: BackPressure handling

2018-01-02 Thread Timo Walther

Hi Vishal,

your assumptions sound reasonable to me. The community is currently 
working on a more fine-grained back pressuring with credit-based flow 
control. It is on the roamap for 1.5 [1]/[2]. I will loop in Nico that 
might tell you more about the details. Until then I guess you have to 
implement a custom source/adapt an existing source to let the data flow 
in more realistic.


Regards,
Timo

[1] 
http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html

[2] https://www.youtube.com/watch?v=scStdhz9FHc


Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
I did a simulation on session windows ( in 2 modes ) and let it rip 
for about 12 hours


1. Replay where a kafka topic with retention of 7 days was the source 
( earliest )

2. Start the pipe with kafka source ( latest )

I saw results that differed dramatically.

On replay the pipeline stalled after  good ramp up while in the second 
case the pipeline hummed on without issues. For the same time period 
the data consumed is significantly more in the second case with the WM 
progression stalled in the first case with no hint of resolution ( the 
incoming data on source topic far outstrips the WM progression )  I 
think I know the reasons and this is my hypothesis.


In replay mode the number of windows open do not have an upper bound. 
While buffer exhaustion ( and data in flight with watermark )  is the 
reason for throttle, it does not really limit the open windows and in 
fact creates windows that reflect futuristic data ( future is relative 
to the current WM ) . So if partition x has data for watermark time 
t(x) and partition y for watermark time t(y) and t(x) << t(y) where 
the overall watermark is t(x) nothing significantly throttles 
consumption from the y partition ( in fact for x too ) , the bounded 
buffer based approach does not give minute control AFAIK as one would 
hope and that implies there are far more open windows than the system 
can handle and that leads to the pathological case where the buffers 
fill up  ( I believe that happens way late ) and throttling occurs but 
the WM does not proceed and windows that could ease the glut the 
throttling cannot proceed. In the replay mode the amount of data 
implies that the Fetchers keep pulling data at the maximum consumption 
allowed by the open ended buffer approach.


My question thus is, is there any way to have a finer control of back 
pressure, where in the consumption from a source is throttled 
preemptively ( by for example decreasing the buffers associated for a 
pipe or the size allocated ) or sleeps in the Fetcher code that can 
help aligning the performance to have real time consumption  
characteristics


Regards,

Vishal.








BackPressure handling

2018-01-02 Thread Vishal Santoshi
I did a simulation on session windows ( in 2 modes ) and let it rip for
about 12 hours

1. Replay where a kafka topic with retention of 7 days was the source (
earliest )
2. Start the pipe with kafka source ( latest )

I saw results that differed dramatically.

On replay the pipeline stalled after  good ramp up while in the second case
the pipeline hummed on without issues. For the same time period the data
consumed is significantly more in the second case with the WM progression
stalled in the first case with no hint of resolution ( the incoming data on
source topic far outstrips the WM progression )  I think I know the reasons
and this is my hypothesis.

In replay mode the number of windows open do not have an upper bound. While
buffer exhaustion ( and data in flight with watermark )  is the reason for
throttle, it does not really limit the open windows and in fact creates
windows that reflect futuristic data ( future is relative to the current WM
) . So if partition x has data for watermark time t(x) and partition y for
watermark time t(y) and t(x) << t(y) where the overall watermark is t(x)
nothing significantly throttles consumption from the y partition ( in fact
for x too ) , the bounded buffer based approach does not give minute
control AFAIK as one would hope and that implies there are far more open
windows than the system can handle and that leads to the pathological case
where the buffers fill up  ( I believe that happens way late ) and
throttling occurs but the WM does not proceed and windows that could ease
the glut the throttling cannot proceed. In the replay mode the amount
of data implies that the Fetchers keep pulling data at the maximum
consumption allowed by the open ended buffer approach.

My question thus is, is there any way to have a finer control of back
pressure, where in the consumption from a source is throttled preemptively
( by for example decreasing the buffers associated for a pipe or the size
allocated ) or sleeps in the Fetcher code that can help aligning the
performance to have real time consumption  characteristics

Regards,

Vishal.


Re: S3 Access in eu-central-1

2018-01-02 Thread Nico Kruber
Sorry for the late response,
but I finally got around adding this workaround to our "common issues"
section with PR https://github.com/apache/flink/pull/5231

Nico

On 29/11/17 09:31, Ufuk Celebi wrote:
> Hey Dominik,
> 
> yes, we should definitely add this to the docs.
> 
> @Nico: You recently updated the Flink S3 setup docs. Would you mind
> adding these hints for eu-central-1 from Steve? I think that would be
> super helpful!
> 
> Best,
> 
> Ufuk
> 
> On Tue, Nov 28, 2017 at 10:00 PM, Dominik Bruhn  wrote:
>> Hey Stephan, Hey Steve,
>> that was the right hint, adding that open to the Java-Options fixed the
>> problem. Maybe we should add this somehow to our Flink Wiki?
>>
>> Thanks!
>>
>> Dominik
>>
>> On 28/11/17 11:55, Stephan Ewen wrote:
>>>
>>> Got a pointer from Steve that this is answered on Stack Overflow here:
>>> https://stackoverflow.com/questions/36154484/aws-java-sdk-manually-set-signature-version
>>> 
>>>
>>> Flink 1.4 contains a specially bundled "fs-s3-hadoop" with smaller no
>>> footprint, compatible across Hadoop versions, and based on a later s3a and
>>> AWS sdk. In that connector, it should work out of the box because it uses a
>>> later AWS SDK. You can also use it with earlier Hadoop versions because
>>> dependencies are relocated, so it should not cash/conflict.
>>>
>>>
>>>
>>>
>>> On Mon, Nov 27, 2017 at 8:58 PM, Stephan Ewen >> > wrote:
>>>
>>> Hi!
>>>
>>> The endpoint config entry looks correct.
>>> I was looking at this issue to see if there are pointers to anything
>>> else, but it looks like the explicit endpoint entry is the most
>>> important thing: https://issues.apache.org/jira/browse/HADOOP-13324
>>> 
>>>
>>> I cc-ed Steve Loughran, who is Hadoop's S3 expert (sorry Steve for
>>> pulling you in again - listening and learning still about the subtle
>>> bits and pieces of S3).
>>> @Steve are S3 V4 endpoints supported in Hadoop 2.7.x already, or
>>> only in Hadoop 2.8?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Mon, Nov 27, 2017 at 9:47 AM, Dominik Bruhn >> > wrote:
>>>
>>> Hey,
>>> can anyone give a hint? Does anyone have flink running with an
>>> S3 Bucket in Frankfurt/eu-central-1 and can share his config and
>>> setup?
>>>
>>> Thanks,
>>> Dominik
>>>
>>> On 22. Nov 2017, at 17:52, domi...@dbruhn.de
>>>  wrote:
>>>
 Hey everyone,
 I'm trying since hours to get Flink 1.3.2 (downloaded for
 hadoop 2.7) to snapshot/checkpoint to an S3 bucket which is
 hosted in the eu-central-1 region. Everything works fine for
 other regions. I'm running my job on a JobTracker in local
 mode. I googled the internet and found several hints, most of
 them telling that setting the `fs.s3a.endpoint` should solve
 it. It doesn't. I'm also sure that the core-site.xml (see
 below) is picked up, if I put garbage into the endpoint then I
 receive a hostname not found error.

 The exception I'm getting is:
 com.amazonaws.services.s3.model.AmazonS3Exception: Status
 Code: 400, AWS Service: Amazon S3, AWS Request ID:
 432415098B0994BC, AWS Error Code: null, AWS Error Message: Bad
 Request, S3 Extended Request ID:

 1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEHXjDb3F9AniXgsBQ=

 I read the AWS FAQ but I don't think that

 https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request

 
 applies to me as I'm not running the NativeFileSystem.

 I suspect this is related to the v4 signing protocol which is
 required for S3 in Frankfurt. Could it be that the aws-sdk
 version is just too old? I tried to play around with it but
 the hadoop adapter is incompatible with newer versions.

 I have the following core-site.xml:

 
 

 fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem

 fs.s3a.buffer.dir/tmp

 fs.s3a.access.keysomething

 fs.s3a.secret.keywont-tell

 fs.s3a.endpoints3.eu-central-1.amazonaws.com
 
 >>>
 Here is my lib folder with the versions of the aws-sdk and the
 hadoop-aws integration:
 -rw---1 root root   11.4M Mar 20  2014
 aws-java-sdk-1.7.4.jar