How flink monitor source stream task(Time Trigger) is running?

2017-09-28 Thread yunfan123
In my understanding, flink just use task heartbeat to monitor taskManager is
running.
If source stream (Time Trigger for XXX)thread is crash, it seems flink can't
recovery from this state?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re: Exception in BucketingSink when cancelling Flink job

2017-09-28 Thread wangsan
Hi,


'Join' method can be call with a timeout (as is called in TaskCanceler), so it 
won't be block forever if  the respective thread is in deadlock state. Maybe 
calling 'interrupt()'  after 'join(timeout)' is more reasonable, altought it 
still can not make sure operations inside 'close()' method is finished.


Best,
wangsan


在2017年09月29 01时52分, "Stephan Ewen"写道:


Hi!


Calling 'interrupt()' makes only sense before 'join()', because 'join()' blocks 
until the respective thread is finished.
The 'interrupt()' call happens to cancel the task out of potentially blocking 
I/O or sleep/wait operations.


The problem is that HDFS does not handle interrupts correctly, it sometimes 
deadlocks in the case of interrupts on unclosed streams :-(


I think it would be important to make sure (in the Bucketing Sink) that the DFS 
streams are closed upon task cancellation.
@aljoscha - adding you to this thread, as you know most about the bucketing 
sink.


Best,
Stephan




On Wed, Sep 27, 2017 at 10:18 AM, Stefan Richter  
wrote:

Hi,


I would speculate that the reason for this order is that we want to shutdown 
the tasks quickly by interrupting blocking calls in the event of failure, so 
that recover can begin as fast as possible. I am looping in Stephan who might 
give more details about this code.


Best,
Stefan 


Am 27.09.2017 um 07:33 schrieb wangsan :



After digging into the source code, we found that when Flink job is canceled, a 
TaskCanceler thread is created.

The TaskCanceler thread calls cancel() on the invokable and periodically 
interrupts the
task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//..executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//..

Notice that TaskCanceler first send interrupt signal to task thread, and 
following with join method. And since the task thread is now try to close 
DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed 
out in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {break;
  }  try {
dataQueue.wait(1000); // when we receive an ack, we notify on
// dataQueue
  } catch (InterruptedException ie) {thrownewInterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was confused why TaskCanceler call executer.interrupt() before 
executer.join(interruptInterval). Can anyone help?










Hi,


We are currently using BucketingSink to save data into HDFS in parquet format. 
But when the flink job was cancelled, we always got Exception in 
BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] 
[org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
...

at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


It seems that DFSOutputStream haven't been closed before task thread is force 
terminated. We found a similar problem in 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
 , but setting "akka.ask.timeout" to a larger value does not work for us. So 
how can we make sure the stream is safely closed when cacelling a job?


Best,
wangsan















Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-28 Thread Yunus Olgun
Hi Kostas, Aljoscha,

To answer Kostas’s concern, the algorithm works this way:

Let’s say we have two sources Source-0 and Source-1. Source-0 is slow and 
Source-1 is fast. Sources read from Kafka at different paces. Threshold is 10 
time units.

1st cycle: Source-0 sends records with timestamp 1,2 and emit watermark 2. 
Throttle-0 has WM 2.
Source-1 sends records with timestamp 1,2,3 and emit watermark 
3. Throttle-1 has also WM 2.
.
.
.
10th cycle: Source-0 sends records with timestamp 19, 20 and emit watermark 20. 
Throttle-0 has WM 20.
  Source-1 sends records with timestamp 28, 29, 30 and emit 
watermark 30. Throttle-1 has also WM 20.

11th cycle: Source-0 sends records with timestamp 21,22 and emit watermark 22. 
Throttle-0 has WM 22.
  Source-1 sends records with timestamp 31,32,33 and emit 
watermark 33. Since, Throttle-1 has a WM of 20 at the beginning of the cycle 
,it will start sleeping a very short amount of time for each incoming record. 
This eventually causes a backpressure to Source-1 and only Source-1. Source-1 
starts to poll less frequently from Kafka.

For this algorithm to work each Throttler should receive records from only one 
source. Otherwise backpressure will be applied to both sources. I achive that 
using a custom partitioner and indexIds. Everything that comes from Source-n 
goes to Throttler-n. Since it is a custom partitioner watermarks gets 
broadcasted to all throttlers.

The problem is I thought Source-0 and Throttler-0 will be colocated in the same 
taskmanager. Unfortunately this is not the case. Source-0 and Throttler-1 can 
end up in TM-0; Source-1 and Throttler-0 at TM-1. This causes a network 
shuffle, one more data serialization/deserialization. I want to avoid that if 
it is possible, since the stream is big.

Regards,  
 
> On 28. Sep 2017, at 23:03, Aljoscha Krettek  wrote:
> 
> To quickly make Kostas' intuition concrete: it's currently not possible to 
> have watermarks broadcast but the data be locally forwarded. The reason is 
> that watermarks and data travel in the same channels so if the watermark 
> needs to be broadcast there needs to be an n to m (in this case m == n) 
> connection pattern between the operations (tasks).
> 
> I think your algorithm should work if you take the correct difference, i.e. 
> throttle when timestamp - "global watermark" > threshold. The inverted diff 
> would be "global watermark" - timestamp. I think you're already doing the 
> correct thing, just wanted to clarify for others who might be reading.
> 
> Did you check on which TaskManagers the taskA and taskB operators run? I 
> think they should still be running on the same TM if resources permit.
> 
> Best,
> Aljoscha
>> On 28. Sep 2017, at 10:25, Kostas Kloudas > > wrote:
>> 
>> Hi Yunus,
>> 
>> I see. Currently I am not sure that you can simply broadcast the watermark 
>> only, without 
>> having a shuffle.
>> 
>> But one thing to notice about your algorithm is that, I am not sure if your 
>> algorithm solves 
>> the problem you encounter.
>> 
>> Your algorithm seems to prioritize the stream with the elements with the 
>> smallest timestamps,
>> rather than throttling fast streams so that slow ones can catch up.
>> 
>> Example: Reading a partition from Kafka that has elements with timestamps 
>> 1,2,3
>> will emit watermark 3 (assuming ascending watermark extractor), while 
>> another task that reads 
>> another partition with elements with timestamps 5,6,7 will emit watermark 7. 
>> With your algorithm, 
>> if I get it right, you will throttle the second partition/task, while allow 
>> the first one to advance, although
>> both read at the same pace (e.g. 3 elements per unit of time).
>> 
>> I will think a bit more on the solution. 
>> 
>> Some sketches that I can find, they all introduce some latency, e.g. 
>> measuring throughput in taskA
>> and sending it to a side output with a taksID, then broadcasting the side 
>> output to a downstream operator
>> which is sth like a coprocess function (taskB) and receives the original 
>> stream and the side output, and 
>> this is the one that checks if “my task" is slow. 
>> 
>> As I said I will think on it a bit more,
>> Kostas
>> 
>>> On Sep 27, 2017, at 6:32 PM, Yunus Olgun >> > wrote:
>>> 
>>> Hi Kostas,
>>> 
>>> Yes, you have summarized well. I want to only forward the data to the next 
>>> local operator, but broadcast the watermark through the cluster.
>>> 
>>> - I can’t set parallelism of taskB to 1. The stream is too big for that. 
>>> Also, the data is ordered at each partition. I don’t want to change that 
>>> order.
>>> 
>>> - I don’t need KeyedStream. Also taskA and taskB will always have the same 
>>> parallelism with each other. But this parallelism can be increased in the 
>>> future.
>>> 
>>> The use case is: The source is Kafka. At our peak hours or when we want to 
>>> run the streaming job with old

starting query server when running flink embedded

2017-09-28 Thread Henri Heiskanen
Hi,

I would like to test queryable state just by running the flink embedded
from my IDE. What is the easiest way to start it properly? If I run the
below I can not see the query server listening at the given port. I found
something about this, but it was about copying some base classes and post
was from 2016 so maybe things have improved.

Configuration conf = new Configuration();
conf.setBoolean("query.server.enable", true);
conf.setInteger("query.server.port", 16122);

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(2, conf);

Br,
Henkka


Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-28 Thread Aljoscha Krettek
To quickly make Kostas' intuition concrete: it's currently not possible to have 
watermarks broadcast but the data be locally forwarded. The reason is that 
watermarks and data travel in the same channels so if the watermark needs to be 
broadcast there needs to be an n to m (in this case m == n) connection pattern 
between the operations (tasks).

I think your algorithm should work if you take the correct difference, i.e. 
throttle when timestamp - "global watermark" > threshold. The inverted diff 
would be "global watermark" - timestamp. I think you're already doing the 
correct thing, just wanted to clarify for others who might be reading.

Did you check on which TaskManagers the taskA and taskB operators run? I think 
they should still be running on the same TM if resources permit.

Best,
Aljoscha
> On 28. Sep 2017, at 10:25, Kostas Kloudas  wrote:
> 
> Hi Yunus,
> 
> I see. Currently I am not sure that you can simply broadcast the watermark 
> only, without 
> having a shuffle.
> 
> But one thing to notice about your algorithm is that, I am not sure if your 
> algorithm solves 
> the problem you encounter.
> 
> Your algorithm seems to prioritize the stream with the elements with the 
> smallest timestamps,
> rather than throttling fast streams so that slow ones can catch up.
> 
> Example: Reading a partition from Kafka that has elements with timestamps 
> 1,2,3
> will emit watermark 3 (assuming ascending watermark extractor), while another 
> task that reads 
> another partition with elements with timestamps 5,6,7 will emit watermark 7. 
> With your algorithm, 
> if I get it right, you will throttle the second partition/task, while allow 
> the first one to advance, although
> both read at the same pace (e.g. 3 elements per unit of time).
> 
> I will think a bit more on the solution. 
> 
> Some sketches that I can find, they all introduce some latency, e.g. 
> measuring throughput in taskA
> and sending it to a side output with a taksID, then broadcasting the side 
> output to a downstream operator
> which is sth like a coprocess function (taskB) and receives the original 
> stream and the side output, and 
> this is the one that checks if “my task" is slow. 
> 
> As I said I will think on it a bit more,
> Kostas
> 
>> On Sep 27, 2017, at 6:32 PM, Yunus Olgun > > wrote:
>> 
>> Hi Kostas,
>> 
>> Yes, you have summarized well. I want to only forward the data to the next 
>> local operator, but broadcast the watermark through the cluster.
>> 
>> - I can’t set parallelism of taskB to 1. The stream is too big for that. 
>> Also, the data is ordered at each partition. I don’t want to change that 
>> order.
>> 
>> - I don’t need KeyedStream. Also taskA and taskB will always have the same 
>> parallelism with each other. But this parallelism can be increased in the 
>> future.
>> 
>> The use case is: The source is Kafka. At our peak hours or when we want to 
>> run the streaming job with old data from Kafka, always the same thing 
>> happens. Even at trivial jobs. Some consumers consumes faster than others. 
>> They produce too much data to downstream but watermark advances slowly at 
>> the speed of the slowest consumer. This extra data gets piled up at 
>> downstream operators. When the downstream operator is an aggregation, it is 
>> ok. But when it is a in-Flink join; state size gets too big, checkpoints 
>> take much longer and overall the job becomes slower or fails. Also it 
>> effects other jobs at the cluster.
>> 
>> So, basically I want to implement a throttler. It compares timestamp of a 
>> record and the global watermark. If the difference is larger than a constant 
>> threshold it starts sleeping 1 ms for each incoming record. This way, fast 
>> operators wait for the slowest one.
>> 
>> The only problem is that, this solution came at the cost of one network 
>> shuffle and data serialization/deserialization. Since the stream is large I 
>> want to avoid the network shuffle at the least. 
>> 
>> I thought operator instances within a taskmanager would get the same 
>> indexId, but apparently this is not the case.
>> 
>> Thanks,
>> 
>>> On 27. Sep 2017, at 17:16, Kostas Kloudas >> > wrote:
>>> 
>>> Hi Yunus,
>>> 
>>> I am not sure if I understand correctly the question.
>>> 
>>> Am I correct to assume that you want the following?
>>> 
>>> ———> time
>>> 
>>> ProcessAProcessB
>>> 
>>> Task1: W(3) E(1) E(2) E(5)  W(3) W(7) E(1) E(2) E(5)
>>> 
>>> Task2: W(7) E(3) E(10) E(6) W(3) W(7) E(3) E(10) E(6)
>>> 
>>> 
>>> In the above, elements flow from left to right and W() stands for watermark 
>>> and E() stands for element.
>>> In other words, between Process(TaksA) and Process(TaskB) you want to only 
>>> forward the elements, but broadcast the watermarks, right?
>>> 
>>> If this is the case, a trivial solution woul

Re: Job Manager minimum memory hard coded to 768

2017-09-28 Thread Aljoscha Krettek
I believe this could be from a time when there was not yet the setting 
"containerized.heap-cutoff-min" since this part of the code is quite old.

I think we could be able to remove that restriction but I'm not sure so I'm 
cc'ing Till who knows those parts best.

@Till, what do you think?

> On 28. Sep 2017, at 17:47, Dan Circelli  wrote:
> 
> In our usage of Flink, our Yarn Job Manager never goes above ~48 MB of heap 
> utilization. In order to maximize the heap available to the Task Managers I 
> thought we could shrink our Job Manager heap setting down from the 1024MB we 
> were using to something tiny like 128MB. However, doing so results in the 
> runtime error:
>  
> java.lang.IllegalArgumentException: The JobManager memory (64) is below the 
> minimum required memory amount of 768 MB
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.setJobManagerMemory(AbstractYarnClusterDescriptor.java:187)
> …
>  
> Looking into it: this value isn’t controlled by the settings in yarn-site.xml 
> but is actually hardcoded in Flink code base to 768 MB. (see 
> AbstractYarnDescriptor.java where MIN_JM_MEMORY = 768.)
>  
>  
> Why is this hardcoded? 
> Why not let value be set via the Yarn Site Configuration xml?
> Why such a high minimum?
>  
>  
> Thanks,
> Dan



Job Manager minimum memory hard coded to 768

2017-09-28 Thread Dan Circelli
In our usage of Flink, our Yarn Job Manager never goes above ~48 MB of heap 
utilization. In order to maximize the heap available to the Task Managers I 
thought we could shrink our Job Manager heap setting down from the 1024MB we 
were using to something tiny like 128MB. However, doing so results in the 
runtime error:

java.lang.IllegalArgumentException: The JobManager memory (64) is below the 
minimum required memory amount of 768 MB
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.setJobManagerMemory(AbstractYarnClusterDescriptor.java:187)
…

Looking into it: this value isn’t controlled by the settings in yarn-site.xml 
but is actually hardcoded in Flink code base to 768 MB. (see 
AbstractYarnDescriptor.java where MIN_JM_MEMORY = 768.)


Why is this hardcoded?
Why not let value be set via the Yarn Site Configuration xml?
Why such a high minimum?


Thanks,
Dan


Re: Issue with CEP library

2017-09-28 Thread Kostas Kloudas
Hi Ajay,

After reading all the data from your source, could you somehow tell your 
sources to send 
a watermark of Long.MaxValue (or a high value)??

I am asking this, just to see if the problem is that the data is simply 
buffered inside Flink because
there is a problem with the timestamps and the watermarks.
You could also see this from the WebUi, but seeing the size of your 
checkpointed state.
If the size increases, it means that something is stored there.

I will also have a deeper look.

Kostas

> On Sep 28, 2017, at 5:17 PM, Ajay Krishna  wrote:
> 
> Hi Kostas,
> 
> Thank you for reaching out and for the suggestions. Here are the results
> 
> 1. Using an env parallelism of 1 performed similar with the additional 
> problem that there was significant lag in the kafka topic
> 2. I removed the additional keyBy(0) but that did not change anything
> 3. I also tried only to check for the start only pattern and it was exactly 
> the same where I saw one of the homes going through but 3 others just getting 
> dropped. 
> 4. I also tried slowing down the rate from 5000/second into Kafka to about 
> 1000/second but I see similar results. 
> 
> I was wondering if you had any other solutions to the problem. I am specially 
> concerned about 1 and 3. Is this library under active development ? Is there 
> a JIRA open on this issue and could be open one to track this ? 
> 
> 
> I was trying read on Stackoverlfow and found a user had a very very similar 
> issue in Aug'16. So I also contacted him to discuss the issue and learn't 
> that the pattern of failure was exactly the same. 
> 
> https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic 
> 
> 
> 
> Before I found the above post, I created a post for this issue
> https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern
>  
> 
> 
> 
> 
> I would really appreciate your guidance on this. 
> 
> Best regards,
> Ajay
> 
> 
> 
> 
> 
> On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas  > wrote:
> Hi Ajay,
> 
> I will look a bit more on the issue.
> 
> But in the meantime, could you run your job with parallelism of 1, to see if 
> the results are the expected?
> 
> Also could you change the pattern, for example check only for the start, to 
> see if all keys pass through.
> 
> As for the code, you apply keyBy(0) the cepMap stream twice, which is 
> redundant and introduces latency. 
> You could remove that to also see the impact.
> 
> Kostas
> 
>> On Sep 28, 2017, at 2:57 AM, Ajay Krishna > > wrote:
>> 
>> Hi, 
>> 
>> I've been only working with flink for the past 2 weeks on a project and am 
>> trying using the CEP library on sensor data. I am using flink version 1.3.2. 
>> Flink has a kafka source. I am using KafkaSource9. I am running Flink on a 3 
>> node AWS cluster with 8G of RAM running Ubuntu 16.04. From the flink 
>> dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>> 
>> What I observe is the following. The input to Kafka is a json string and 
>> when parsed on the flink side, it looks like this
>> 
>> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
>> event,High,37.75142,-122.39458,12.0,20.0)
>> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
>> time characteristic is set to EventTime and I have an 
>> AscendingTimestampExtractor using the timestamp field. I have parallelism 
>> for the execution environment is set to 4. I have a rather simple event that 
>> I am trying to capture
>> 
>> DataStream> 
>> cepMapByHomeId = cepMap.keyBy(0);
>> 
>> //cepMapByHomeId.print();
>> 
>> 
>> Pattern, ?> cep1 =
>> 
>> Pattern.>begin("start")
>> .where(new OverLowThreshold())
>> .followedBy("end")
>> .where(new OverHighThreshold());
>> 
>> 
>> PatternStream> Float, Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), 
>> cep1);
>> 
>> 
>> DataStream> Float>> alerts = patternStream.select(new PackageCapturedEvents());
>> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
>> 16. The output of the pattern is like this
>> 
>> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
>> event,Non-event,37.75837,-122.41467)
>> On the Kafka producer side, I am trying send simulated data for around 100 
>> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
>> I have about 10 partitions in kafka. The producer just loops going through a 
>> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
>> data is exactly the same for all 100 of the csv files except for home_id and 
>> the lat &

Re: Issue with CEP library

2017-09-28 Thread Ajay Krishna
Hi Kostas,

Thank you for reaching out and for the suggestions. Here are the results

1. Using an env parallelism of 1 performed similar with the additional
problem that there was significant lag in the kafka topic
2. I removed the additional keyBy(0) but that did not change anything
3. I also tried only to check for the start only pattern and it was exactly
the same where I saw one of the homes going through but 3 others just
getting dropped.
4. I also tried slowing down the rate from 5000/second into Kafka to about
1000/second but I see similar results.

I was wondering if you had any other solutions to the problem. I am
specially concerned about 1 and 3. Is this library under active development
? Is there a JIRA open on this issue and could be open one to track this ?


I was trying read on Stackoverlfow and found a user had a very very similar
issue in Aug'16. So I also contacted him to discuss the issue and learn't
that the pattern of failure was exactly the same.

https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic


Before I found the above post, I created a post for this issue
https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern



I would really appreciate your guidance on this.

Best regards,
Ajay





On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas  wrote:

> Hi Ajay,
>
> I will look a bit more on the issue.
>
> But in the meantime, could you run your job with parallelism of 1, to see
> if the results are the expected?
>
> Also could you change the pattern, for example check only for the start,
> to see if all keys pass through.
>
> As for the code, you apply keyBy(0) the cepMap stream twice, which is
> redundant and introduces latency.
> You could remove that to also see the impact.
>
> Kostas
>
> On Sep 28, 2017, at 2:57 AM, Ajay Krishna  wrote:
>
> Hi,
>
> I've been only working with flink for the past 2 weeks on a project and am
> trying using the CEP library on sensor data. I am using flink version
> 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running
> Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the
> flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>
> What I observe is the following. The input to Kafka is a json string and
> when parsed on the flink side, it looks like this
>
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
>
> I use a Tuple8 to capture the parsed data. The first field is home_id. The
> time characteristic is set to EventTime and I have an
> AscendingTimestampExtractor using the timestamp field. I have parallelism
> for the execution environment is set to 4. I have a rather simple event
> that I am trying to capture
>
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
>
> //cepMapByHomeId.print();
>
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
>
>
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
>
>
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
>
> The pattern checks if the 7th field in the tuple8 goes over 12 and then
> over 16. The output of the pattern is like this
>
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
>
> On the Kafka producer side, I am trying send simulated data for around 100
> homes, so the home_id would go from 0-100 and the input is keyed by
> home_id. I have about 10 partitions in kafka. The producer just loops going
> through a csv file with a delay of about 100 ms between 2 rows of the csv
> file. The data is exactly the same for all 100 of the csv files except for
> home_id and the lat & long information. The timestamp is incremented by a
> step of 1 sec. I start multiple processes to simulate data form different
> homes.
>
> THE PROBLEM:
>
> Flink completely misses capturing events for a large subset of the input
> data. I barely see the events for about 4-5 of the home_id values. I do a
> print before applying the pattern and after and I see all home_ids before
> and only a tiny subset after. Since the data is exactly the same, I expect
> all homeid to be captured and written to my sink which is cassandra in this
> case. I've looked through all available docs and examples but cannot seem
> to get a fix for the problem.
>
> I would really appreciate some guidance how to understand fix this.
>
>
> Thank you,
>
> Ajay
>
>
>


Re: Custom Serializers

2017-09-28 Thread nragon
Got it :)
I've redesign my object which I use across jobs.
Ended up with 4 serializers.
My object Element holds 2 fields, an array of Parameter and a Metadata.
Metadata holds an array of ParameterInfo and each Parameter holds it's
ParameterInfo (Kinda duplicate against Metadata but needed for legacy). So
Element has the TypeInfo and TypeInfoFactory and also serializers for
Parameter and Metadata. The others are just adaptation os
GenericArraySerializer, StringSerializer, ...
>From my test I've manage to get around 40% improvement against serializing
Element, as is, with kryo.

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: how many 'run -c' commands to start?

2017-09-28 Thread r. r.
Thank you, Chesnay
to make sure - should the node where the job has been submitted goes down, the 
processing will continue, I hope?
Do I need to ensure this by configuration?

btw I added --detached param to the run cmd, but it didn't go into background 
process as I would've expected. Am I guessing wrong?

Thanks!
Rob






 > Оригинално писмо 

 >От: Chesnay Schepler ches...@apache.org

 >Относно: Re: how many 'run -c' commands to start?

 >До: user@flink.apache.org

 >Изпратено на: 28.09.2017 15:05



 
> Hi!
 
> 
 
> Given a Flink cluster, you would only call `flink run ...` to submit a 
 
> job once; for simplicity i would submit it on the node where you started 
 
> the cluster. Flink will automatically distribute job across the cluster, 
 
> in smaller independent parts known as Tasks.
 
> 
 
> Regards,
 
> Chesnay
 
> 
 
> On 28.09.2017 08:31, r. r. wrote:
 
> > Hello
 
> >
 
> > I successfully ran a job with 'flink run -c', but this is for the local
 
> >
 
> > setup.
 
> >
 
> > How should i proceed with a cluster? Will flink automagically instantiate
 
> >
 
> > the job on all servers - i hope i don't have to start 'flink run -c' on all
 
> >
 
> > machines.
 
> >
 
> > New to flink and bigdata, so sorry for the probably silly question
 
> >
 
> >
 
> >
 
> > Thanks!
 
> >
 
> > Rob
 
> >
 
> >


Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Tony Wei
Hi Stefan,

That reason makes sense to me. Thanks for point me out.

About my job, the database currently was never used, I disabled it for some
reasons, but output to s3 was implemented by async io.

I used ForkJoinPool with 50 capacity.
I have tried to rebalance after count window to monitor the back pressure
on upload operator.
The result is always OK status.
I think the reason is due to that count window buffered lots of records, so
the input rate in upload operator was not too high.

But I am not sure that if the setup for my capacity of ForkJoinPool would
impact the process asynchronous checkpoints both machine's resources and s3
connection.

BTW, s3 serves both operator and checkpointing and I used aws java api to
access s3 in upload operator in order to control where the files go.

Best Regards,
Tony Wei

Stefan Richter 於 2017年9月28日 週四,下午7:43寫道:

> Hi,
>
> the gap between the sync and the async part does not mean too much. What
> happens per task is that all operators go through their sync part, and then
> one thread executes all the async parts, one after the other. So if an
> async part starts late, this is just because it started only after another
> async part finished.
>
> I have one more question about your job,because it involves communication
> with external systems, like S3 and a database. Are you sure that they
> cannot sometimes become a bottleneck, block, and bring down your job. in
> particular: is the same S3 used to serve the operator and checkpointing and
> what is your sustained read/write rate there and the maximum number of
> connections? You can try to use the backpressure metric and try to identify
> the first operator (counting from the sink) that indicates backpressure.
>
> Best,
> Stefan
>
> Am 28.09.2017 um 12:59 schrieb Tony Wei :
>
> Hi,
>
> Sorry. This is the correct one.
>
> Best Regards,
> Tony Wei
>
> 2017-09-28 18:55 GMT+08:00 Tony Wei :
>
>> Hi Stefan,
>>
>> Sorry for providing partial information. The attachment is the full logs
>> for checkpoint #1577.
>>
>> Why I would say it seems that asynchronous part was not executed
>> immediately is due to all synchronous parts were all finished at 2017-09-27
>> 13:49.
>> Did that mean the checkpoint barrier event had already arrived at the
>> operator and started as soon as when the JM triggered the checkpoint?
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-28 18:22 GMT+08:00 Stefan Richter :
>>
>>> Hi,
>>>
>>> I agree that the memory consumption looks good. If there is only one TM,
>>> it will run inside one JVM. As for the 7 minutes, you mean the reported
>>> end-to-end time? This time measurement starts when the checkpoint is
>>> triggered on the job manager, the first contributor is then the time that
>>> it takes for the checkpoint barrier event to travel with the stream to the
>>> operators. If there is back pressure and a lot of events are buffered, this
>>> can introduce delay to this first part, because barriers must not overtake
>>> data for correctness. After the barrier arrives at the operator, next comes
>>> the synchronous part of the checkpoint, which is typically short running
>>> and takes a snapshot of the state (think of creating an immutable version,
>>> e.g. through copy on write). In the asynchronous part, this snapshot is
>>> persisted to DFS. After that the timing stops and is reported together with
>>> the acknowledgement to the job manager.
>>>
>>> So, I would assume if reporting took 7 minutes end-to-end, and the async
>>> part took 4 minutes, it is likely that it took around 3 minutes for the
>>> barrier event to travel with the stream. About the debugging, I think it is
>>> hard to figure out what is going on with the DFS if you don’t have metrics
>>> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where
>>> time is spend for the snapshotting?
>>>
>>> I am also looping in Stephan, he might have more suggestions.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 28.09.2017 um 11:25 schrieb Tony Wei :
>>>
>>> Hi Stefan,
>>>
>>> These are some telemetry information, but I don't have history
>>> information about gc.
>>>
>>> 
>>> 
>>>
>>> 1) Yes, my state is not large.
>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
>>> Since this is a POC, we might move to AWS in the future or use HDFS in the
>>> same cluster. However, how can I recognize the problem is this.
>>> 3) It seems memory usage is bounded. I'm not sure if the status showed
>>> above is fine.
>>>
>>> There is only one TM in my cluster for now, so all tasks are running on
>>> that machine. I think that means they are in the same JVM, right?
>>> Besides taking so long on asynchronous part, there is another question
>>> is that the late message showed that this task was delay for almost 7
>>> minutes, but the log showed it only took 4 minutes.
>>> It seems that it was somehow waiting for being executed. Are there some
>>> points to find out what happened?
>>>
>>> For the log information, what I means is it is

Re: how many 'run -c' commands to start?

2017-09-28 Thread Chesnay Schepler

Hi!

Given a Flink cluster, you would only call `flink run ...` to submit a 
job once; for simplicity i would submit it on the node where you started 
the cluster. Flink will automatically distribute job across the cluster, 
in smaller independent parts known as Tasks.


Regards,
Chesnay

On 28.09.2017 08:31, r. r. wrote:

Hello

I successfully ran a job with 'flink run -c', but this is for the local

setup.

How should i proceed with a cluster? Will flink automagically instantiate

the job on all servers - i hope i don't have to start 'flink run -c' on all

machines.

New to flink and bigdata, so sorry for the probably silly question



Thanks!

Rob






Re: Custom Serializers

2017-09-28 Thread Chesnay Schepler

On 19.09.2017 11:39, nragon wrote:

createInstance(Object[] fields) at TupleSerializerBase seems not to be part
of TypeSerializer API.
Will I be loosing any functionality? In what cases do you use this instead
of createInstance()?

// We use this in the Aggregate and Distinct Operators to create instances
// of immutable Tuples (i.e. Scala Tuples)

Thanks

Taken from TupleSerializerBase:

// We use this in the Aggregate and Distinct Operators to create 
instances // of immutable Tuples (i.e. Scala Tuples) public abstract T 
createInstance(Object[] fields);


On 27.09.2017 17:43, nragon wrote:

Should I use TypeSerializerSingleton if it is independent of the object which
it's serializing?



--
Sent from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Generally, use TypeSerializerSingleton. There is virtually no reason to 
not use it. Do keep this section of the TypeSerializer javadoc in mind:


* The methods in this class are assumed to be stateless, such that it is 
effectively thread safe. Stateful * implementations of the methods may 
lead to unpredictable side effects and will compromise both stability 
and * correctness of the program.




Re: Custom Serializers

2017-09-28 Thread Chesnay Schepler

On 19.09.2017 11:39, nragon wrote:

createInstance(Object[] fields) at TupleSerializerBase seems not to be part
of TypeSerializer API.
Will I be loosing any functionality? In what cases do you use this instead
of createInstance()?

// We use this in the Aggregate and Distinct Operators to create instances
// of immutable Tuples (i.e. Scala Tuples)

Thanks

Taken from TupleSerializerBase:

// We use this in the Aggregate and Distinct Operators to create 
instances // of immutable Tuples (i.e. Scala Tuples) public abstract T 
createInstance(Object[] fields);


On 27.09.2017 17:43, nragon wrote:

Should I use TypeSerializerSingleton if it is independent of the object which
it's serializing?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Generally, use TypeSerializerSingleton. There is virtually no reason to 
not use it. Do keep this section of the TypeSerializer javadoc in mind:


* The methods in this class are assumed to be stateless, such that it is 
effectively thread safe. Stateful * implementations of the methods may 
lead to unpredictable side effects and will compromise both stability 
and * correctness of the program.




Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Stefan Richter
Hi,

the gap between the sync and the async part does not mean too much. What 
happens per task is that all operators go through their sync part, and then one 
thread executes all the async parts, one after the other. So if an async part 
starts late, this is just because it started only after another async part 
finished.

I have one more question about your job, because it involves communication with 
external systems, like S3 and a database. Are you sure that they cannot 
sometimes become a bottleneck, block, and bring down your job. in particular: 
is the same S3 used to serve the operator and checkpointing and what is your 
sustained read/write rate there and the maximum number of connections? You can 
try to use the backpressure metric and try to identify the first operator 
(counting from the sink) that indicates backpressure.

Best,
Stefan

> Am 28.09.2017 um 12:59 schrieb Tony Wei :
> 
> Hi,
> 
> Sorry. This is the correct one.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-28 18:55 GMT+08:00 Tony Wei  >:
> Hi Stefan, 
> 
> Sorry for providing partial information. The attachment is the full logs for 
> checkpoint #1577.
> 
> Why I would say it seems that asynchronous part was not executed immediately 
> is due to all synchronous parts were all finished at 2017-09-27 13:49.
> Did that mean the checkpoint barrier event had already arrived at the 
> operator and started as soon as when the JM triggered the checkpoint?
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-28 18:22 GMT+08:00 Stefan Richter  >:
> Hi,
> 
> I agree that the memory consumption looks good. If there is only one TM, it 
> will run inside one JVM. As for the 7 minutes, you mean the reported 
> end-to-end time? This time measurement starts when the checkpoint is 
> triggered on the job manager, the first contributor is then the time that it 
> takes for the checkpoint barrier event to travel with the stream to the 
> operators. If there is back pressure and a lot of events are buffered, this 
> can introduce delay to this first part, because barriers must not overtake 
> data for correctness. After the barrier arrives at the operator, next comes 
> the synchronous part of the checkpoint, which is typically short running and 
> takes a snapshot of the state (think of creating an immutable version, e.g. 
> through copy on write). In the asynchronous part, this snapshot is persisted 
> to DFS. After that the timing stops and is reported together with the 
> acknowledgement to the job manager. 
> 
> So, I would assume if reporting took 7 minutes end-to-end, and the async part 
> took 4 minutes, it is likely that it took around 3 minutes for the barrier 
> event to travel with the stream. About the debugging, I think it is hard to 
> figure out what is going on with the DFS if you don’t have metrics on that. 
> Maybe you could attach a sampler to the TM’s jvm and monitor where time is 
> spend for the snapshotting?
> 
> I am also looping in Stephan, he might have more suggestions.
> 
> Best,
> Stefan
> 
>> Am 28.09.2017 um 11:25 schrieb Tony Wei > >:
>> 
>> Hi Stefan,
>> 
>> These are some telemetry information, but I don't have history information 
>> about gc.
>> 
>> 
>> 
>> 
>> 1) Yes, my state is not large.
>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since 
>> this is a POC, we might move to AWS in the future or use HDFS in the same 
>> cluster. However, how can I recognize the problem is this.
>> 3) It seems memory usage is bounded. I'm not sure if the status showed above 
>> is fine.
>> 
>> There is only one TM in my cluster for now, so all tasks are running on that 
>> machine. I think that means they are in the same JVM, right?
>> Besides taking so long on asynchronous part, there is another question is 
>> that the late message showed that this task was delay for almost 7 minutes, 
>> but the log showed it only took 4 minutes.
>> It seems that it was somehow waiting for being executed. Are there some 
>> points to find out what happened?
>> 
>> For the log information, what I means is it is hard to recognize which 
>> checkpoint id that asynchronous parts belong to if the checkpoint takes more 
>> time and there are more concurrent checkpoints taking place.
>> Also, it seems that asynchronous part might be executed right away if there 
>> is no resource from thread pool. It is better to measure the time between 
>> creation time and processing time, and log it and checkpoint id with the 
>> original log that showed what time the asynchronous part took.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2017-09-28 16:25 GMT+08:00 Stefan Richter > >:
>> Hi,
>> 
>> when the async part takes that long I would have 3 things to look at:
>> 
>> 1) Is your state so large? I don’t think this applies in your case, right?
>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>> 3) Are w

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Tony Wei
Hi,

Sorry. This is the correct one.

Best Regards,
Tony Wei

2017-09-28 18:55 GMT+08:00 Tony Wei :

> Hi Stefan,
>
> Sorry for providing partial information. The attachment is the full logs
> for checkpoint #1577.
>
> Why I would say it seems that asynchronous part was not executed
> immediately is due to all synchronous parts were all finished at 2017-09-27
> 13:49.
> Did that mean the checkpoint barrier event had already arrived at the
> operator and started as soon as when the JM triggered the checkpoint?
>
> Best Regards,
> Tony Wei
>
> 2017-09-28 18:22 GMT+08:00 Stefan Richter :
>
>> Hi,
>>
>> I agree that the memory consumption looks good. If there is only one TM,
>> it will run inside one JVM. As for the 7 minutes, you mean the reported
>> end-to-end time? This time measurement starts when the checkpoint is
>> triggered on the job manager, the first contributor is then the time that
>> it takes for the checkpoint barrier event to travel with the stream to the
>> operators. If there is back pressure and a lot of events are buffered, this
>> can introduce delay to this first part, because barriers must not overtake
>> data for correctness. After the barrier arrives at the operator, next comes
>> the synchronous part of the checkpoint, which is typically short running
>> and takes a snapshot of the state (think of creating an immutable version,
>> e.g. through copy on write). In the asynchronous part, this snapshot is
>> persisted to DFS. After that the timing stops and is reported together with
>> the acknowledgement to the job manager.
>>
>> So, I would assume if reporting took 7 minutes end-to-end, and the async
>> part took 4 minutes, it is likely that it took around 3 minutes for the
>> barrier event to travel with the stream. About the debugging, I think it is
>> hard to figure out what is going on with the DFS if you don’t have metrics
>> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where
>> time is spend for the snapshotting?
>>
>> I am also looping in Stephan, he might have more suggestions.
>>
>> Best,
>> Stefan
>>
>> Am 28.09.2017 um 11:25 schrieb Tony Wei :
>>
>> Hi Stefan,
>>
>> These are some telemetry information, but I don't have history
>> information about gc.
>>
>> 
>> 
>>
>> 1) Yes, my state is not large.
>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
>> Since this is a POC, we might move to AWS in the future or use HDFS in the
>> same cluster. However, how can I recognize the problem is this.
>> 3) It seems memory usage is bounded. I'm not sure if the status showed
>> above is fine.
>>
>> There is only one TM in my cluster for now, so all tasks are running on
>> that machine. I think that means they are in the same JVM, right?
>> Besides taking so long on asynchronous part, there is another question is
>> that the late message showed that this task was delay for almost 7 minutes,
>> but the log showed it only took 4 minutes.
>> It seems that it was somehow waiting for being executed. Are there some
>> points to find out what happened?
>>
>> For the log information, what I means is it is hard to recognize which
>> checkpoint id that asynchronous parts belong to if the checkpoint takes
>> more time and there are more concurrent checkpoints taking place.
>> Also, it seems that asynchronous part might be executed right away if
>> there is no resource from thread pool. It is better to measure the time
>> between creation time and processing time, and log it and checkpoint id
>> with the original log that showed what time the asynchronous part took.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-28 16:25 GMT+08:00 Stefan Richter :
>>
>>> Hi,
>>>
>>> when the async part takes that long I would have 3 things to look at:
>>>
>>> 1) Is your state so large? I don’t think this applies in your case,
>>> right?
>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>> 3) Are we running low on memory on that task manager?
>>>
>>> Do you have telemetry information about used heap and gc pressure on the
>>> problematic task? However, what speaks against the memory problem
>>> hypothesis is that future checkpoints seem to go through again. What I find
>>> very strange is that within the reported 4 minutes of the async part the
>>> only thing that happens is: open dfs output stream, iterate the in-memory
>>> state and write serialized state data to dfs stream, then close the stream.
>>> No locks or waits in that section, so I would assume that for one of the
>>> three reasons I gave, writing the state is terribly slow.
>>>
>>> Those snapshots should be able to run concurrently, for example so that
>>> users can also take savepoints  even when a checkpoint was triggered and is
>>> still running, so there is no way to guarantee that the previous parts have
>>> finished, this is expected behaviour. Which waiting times are you missing
>>> in the log? I think the information about when a checkpoint is triggered,
>>> received by the TM, perf

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Tony Wei
Hi Stefan,

Sorry for providing partial information. The attachment is the full logs
for checkpoint #1577.

Why I would say it seems that asynchronous part was not executed
immediately is due to all synchronous parts were all finished at 2017-09-27
13:49.
Did that mean the checkpoint barrier event had already arrived at the
operator and started as soon as when the JM triggered the checkpoint?

Best Regards,
Tony Wei

2017-09-28 18:22 GMT+08:00 Stefan Richter :

> Hi,
>
> I agree that the memory consumption looks good. If there is only one TM,
> it will run inside one JVM. As for the 7 minutes, you mean the reported
> end-to-end time? This time measurement starts when the checkpoint is
> triggered on the job manager, the first contributor is then the time that
> it takes for the checkpoint barrier event to travel with the stream to the
> operators. If there is back pressure and a lot of events are buffered, this
> can introduce delay to this first part, because barriers must not overtake
> data for correctness. After the barrier arrives at the operator, next comes
> the synchronous part of the checkpoint, which is typically short running
> and takes a snapshot of the state (think of creating an immutable version,
> e.g. through copy on write). In the asynchronous part, this snapshot is
> persisted to DFS. After that the timing stops and is reported together with
> the acknowledgement to the job manager.
>
> So, I would assume if reporting took 7 minutes end-to-end, and the async
> part took 4 minutes, it is likely that it took around 3 minutes for the
> barrier event to travel with the stream. About the debugging, I think it is
> hard to figure out what is going on with the DFS if you don’t have metrics
> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where
> time is spend for the snapshotting?
>
> I am also looping in Stephan, he might have more suggestions.
>
> Best,
> Stefan
>
> Am 28.09.2017 um 11:25 schrieb Tony Wei :
>
> Hi Stefan,
>
> These are some telemetry information, but I don't have history information
> about gc.
>
> 
> 
>
> 1) Yes, my state is not large.
> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
> Since this is a POC, we might move to AWS in the future or use HDFS in the
> same cluster. However, how can I recognize the problem is this.
> 3) It seems memory usage is bounded. I'm not sure if the status showed
> above is fine.
>
> There is only one TM in my cluster for now, so all tasks are running on
> that machine. I think that means they are in the same JVM, right?
> Besides taking so long on asynchronous part, there is another question is
> that the late message showed that this task was delay for almost 7 minutes,
> but the log showed it only took 4 minutes.
> It seems that it was somehow waiting for being executed. Are there some
> points to find out what happened?
>
> For the log information, what I means is it is hard to recognize which
> checkpoint id that asynchronous parts belong to if the checkpoint takes
> more time and there are more concurrent checkpoints taking place.
> Also, it seems that asynchronous part might be executed right away if
> there is no resource from thread pool. It is better to measure the time
> between creation time and processing time, and log it and checkpoint id
> with the original log that showed what time the asynchronous part took.
>
> Best Regards,
> Tony Wei
>
> 2017-09-28 16:25 GMT+08:00 Stefan Richter :
>
>> Hi,
>>
>> when the async part takes that long I would have 3 things to look at:
>>
>> 1) Is your state so large? I don’t think this applies in your case, right?
>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>> 3) Are we running low on memory on that task manager?
>>
>> Do you have telemetry information about used heap and gc pressure on the
>> problematic task? However, what speaks against the memory problem
>> hypothesis is that future checkpoints seem to go through again. What I find
>> very strange is that within the reported 4 minutes of the async part the
>> only thing that happens is: open dfs output stream, iterate the in-memory
>> state and write serialized state data to dfs stream, then close the stream.
>> No locks or waits in that section, so I would assume that for one of the
>> three reasons I gave, writing the state is terribly slow.
>>
>> Those snapshots should be able to run concurrently, for example so that
>> users can also take savepoints  even when a checkpoint was triggered and is
>> still running, so there is no way to guarantee that the previous parts have
>> finished, this is expected behaviour. Which waiting times are you missing
>> in the log? I think the information about when a checkpoint is triggered,
>> received by the TM, performing the sync and async part and acknowledgement
>> time should all be there?.
>>
>> Best,
>> Stefan
>>
>>
>>
>> Am 28.09.2017 um 08:18 schrieb Tony Wei :
>>
>> Hi Stefan,
>>
>> The checkpoint on my job has been subsume

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Stefan Richter
Hi,

I agree that the memory consumption looks good. If there is only one TM, it 
will run inside one JVM. As for the 7 minutes, you mean the reported end-to-end 
time? This time measurement starts when the checkpoint is triggered on the job 
manager, the first contributor is then the time that it takes for the 
checkpoint barrier event to travel with the stream to the operators. If there 
is back pressure and a lot of events are buffered, this can introduce delay to 
this first part, because barriers must not overtake data for correctness. After 
the barrier arrives at the operator, next comes the synchronous part of the 
checkpoint, which is typically short running and takes a snapshot of the state 
(think of creating an immutable version, e.g. through copy on write). In the 
asynchronous part, this snapshot is persisted to DFS. After that the timing 
stops and is reported together with the acknowledgement to the job manager. 

So, I would assume if reporting took 7 minutes end-to-end, and the async part 
took 4 minutes, it is likely that it took around 3 minutes for the barrier 
event to travel with the stream. About the debugging, I think it is hard to 
figure out what is going on with the DFS if you don’t have metrics on that. 
Maybe you could attach a sampler to the TM’s jvm and monitor where time is 
spend for the snapshotting?

I am also looping in Stephan, he might have more suggestions.

Best,
Stefan

> Am 28.09.2017 um 11:25 schrieb Tony Wei :
> 
> Hi Stefan,
> 
> These are some telemetry information, but I don't have history information 
> about gc.
> 
> 
> 
> 
> 1) Yes, my state is not large.
> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since 
> this is a POC, we might move to AWS in the future or use HDFS in the same 
> cluster. However, how can I recognize the problem is this.
> 3) It seems memory usage is bounded. I'm not sure if the status showed above 
> is fine.
> 
> There is only one TM in my cluster for now, so all tasks are running on that 
> machine. I think that means they are in the same JVM, right?
> Besides taking so long on asynchronous part, there is another question is 
> that the late message showed that this task was delay for almost 7 minutes, 
> but the log showed it only took 4 minutes.
> It seems that it was somehow waiting for being executed. Are there some 
> points to find out what happened?
> 
> For the log information, what I means is it is hard to recognize which 
> checkpoint id that asynchronous parts belong to if the checkpoint takes more 
> time and there are more concurrent checkpoints taking place.
> Also, it seems that asynchronous part might be executed right away if there 
> is no resource from thread pool. It is better to measure the time between 
> creation time and processing time, and log it and checkpoint id with the 
> original log that showed what time the asynchronous part took.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-28 16:25 GMT+08:00 Stefan Richter  >:
> Hi,
> 
> when the async part takes that long I would have 3 things to look at:
> 
> 1) Is your state so large? I don’t think this applies in your case, right?
> 2) Is something wrong with writing to DFS (network, disks, etc)?
> 3) Are we running low on memory on that task manager?
> 
> Do you have telemetry information about used heap and gc pressure on the 
> problematic task? However, what speaks against the memory problem hypothesis 
> is that future checkpoints seem to go through again. What I find very strange 
> is that within the reported 4 minutes of the async part the only thing that 
> happens is: open dfs output stream, iterate the in-memory state and write 
> serialized state data to dfs stream, then close the stream. No locks or waits 
> in that section, so I would assume that for one of the three reasons I gave, 
> writing the state is terribly slow.
> 
> Those snapshots should be able to run concurrently, for example so that users 
> can also take savepoints  even when a checkpoint was triggered and is still 
> running, so there is no way to guarantee that the previous parts have 
> finished, this is expected behaviour. Which waiting times are you missing in 
> the log? I think the information about when a checkpoint is triggered, 
> received by the TM, performing the sync and async part and acknowledgement 
> time should all be there?.
> 
> Best,
> Stefan
> 
> 
> 
>> Am 28.09.2017 um 08:18 schrieb Tony Wei > >:
>> 
>> Hi Stefan,
>> 
>> The checkpoint on my job has been subsumed again. There are some questions 
>> that I don't understand.
>> 
>> Log in JM :
>> 2017-09-27 13:45:15,686 INFO 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
>> checkpoint 1576 (174693180 bytes in 21597 ms).
>> 2017-09-27 13:49:42,795 INFO 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>> checkpoint 1577 @ 1506520182795
>> 2017-09-27 13:54:42,795 INFO 
>

Re: Flink on EMR

2017-09-28 Thread Stefan Richter
Hi,

for issue 1, you could delete the slf4j jar from Flink’s lib folder, but I 
wonder if this producing any problems even with the warning? 

For issue 2, my question is where you found that only 5GB have been allocated? 
Did you consider that Flink only allocates a fraction of the memory for heap 
and another fraction for off-heap memory? This can be influenced with the 
memory fraction parameter.

About issue 3, I think this should work without providing the host name.

Issue 4 is a matter of taste, if cloudwatch is some log aggregation service, it 
might be easier for you to use something like that.

Best,
Stefan

> Am 27.09.2017 um 06:57 schrieb Navneeth Krishnan :
> 
> Hi All,
> 
> Any suggestions?
> 
> Thanks.
> 
> On Mon, Sep 25, 2017 at 10:14 PM, Navneeth Krishnan  > wrote:
> Hello All,
> 
> I'm trying to deploy flink on AWS EMR and I'm very new to EMR. I'm running 
> into multiple issues and need some help.
> 
> Issue1:
> How did others resolve this multiple bindings issue?
> 
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/11/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/12/location-compute-1.0-SNAPSHOT-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings 
>  for an explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 
> Issue2:
> Running the below command runs the pipeline but the task manager is allocated 
> with only 5GB memory instead of 8GB memory. Any reason why?
> flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8192 ./my-pipeline.jar
> 
> Issue3:
> How to provide the checkpoint directory? By just providing this 
> "hdfs:///checkpoints/" will it work or should I provide any master node host 
> name?
> 
> Issue 4:
> How can I get the task manager logs? Should I use log aggregation in hadoop 
> yarn or send it to cloud watch?
> 
> Also if there any best practices to be used while running flink on yarn, 
> please let me know.
> 
> Thanks a lot.
> 
> Regards,
> Navneeth
> 



Question about checkpointing with stateful operators and state recovery

2017-09-28 Thread Federico D'Ambrosio
Hi, I've got a couple of questions concerning the topics in the subject:

1. If an operator is getting applied on a keyed stream, do I still have
to implement the CheckpointedFunction trait and define the snapshotState
and initializeState methods, in order to successfully recover the state
from a job failure?

2. While using a FlinkKafkaConsumer, enabling checkpointing allows
exactly once semantics end to end, provided that the sink is able to
guarantee the same. Do I have to set
setCommitOffsetsOnCheckpoints(true)? How would someone implement exactly
once semantics in a sink?

3. What are the advantages of externalized checkpoints and which are
the cases where I would want to use them?

4. Let's suppose a scenario where: checkpointing is enabled every 10
seconds, I have a kafka consumer which is set to start from the latest
records, a sink providing at least once semantics and a stateful keyed
operator inbetween the consumer and the sink. Is it correct that, in case
of task failure, happens the following?
- the kafka consumer gets reverted to the latest offset (does it
happen even if I don't set setCommitOffsetsOnCheckpoints(true)?)
- the operator state gets reverted to the latest checkpoint
- the sink is stateless so it doesn't really care about what
happened
- the stream restarts and probably some of the events coming to the
sink have already been processed before

Thank you for attention,
Kind regards,
Federico


Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Tony Wei
Hi Stefan,

These are some telemetry information, but I don't have history information
about gc.

[image: 內置圖片 2]
[image: 內置圖片 1]

1) Yes, my state is not large.
2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since
this is a POC, we might move to AWS in the future or use HDFS in the same
cluster. However, how can I recognize the problem is this.
3) It seems memory usage is bounded. I'm not sure if the status showed
above is fine.

There is only one TM in my cluster for now, so all tasks are running on
that machine. I think that means they are in the same JVM, right?
Besides taking so long on asynchronous part, there is another question is
that the late message showed that this task was delay for almost 7 minutes,
but the log showed it only took 4 minutes.
It seems that it was somehow waiting for being executed. Are there some
points to find out what happened?

For the log information, what I means is it is hard to recognize which
checkpoint id that asynchronous parts belong to if the checkpoint takes
more time and there are more concurrent checkpoints taking place.
Also, it seems that asynchronous part might be executed right away if there
is no resource from thread pool. It is better to measure the time between
creation time and processing time, and log it and checkpoint id with the
original log that showed what time the asynchronous part took.

Best Regards,
Tony Wei

2017-09-28 16:25 GMT+08:00 Stefan Richter :

> Hi,
>
> when the async part takes that long I would have 3 things to look at:
>
> 1) Is your state so large? I don’t think this applies in your case, right?
> 2) Is something wrong with writing to DFS (network, disks, etc)?
> 3) Are we running low on memory on that task manager?
>
> Do you have telemetry information about used heap and gc pressure on the
> problematic task? However, what speaks against the memory problem
> hypothesis is that future checkpoints seem to go through again. What I find
> very strange is that within the reported 4 minutes of the async part the
> only thing that happens is: open dfs output stream, iterate the in-memory
> state and write serialized state data to dfs stream, then close the stream.
> No locks or waits in that section, so I would assume that for one of the
> three reasons I gave, writing the state is terribly slow.
>
> Those snapshots should be able to run concurrently, for example so that
> users can also take savepoints  even when a checkpoint was triggered and is
> still running, so there is no way to guarantee that the previous parts have
> finished, this is expected behaviour. Which waiting times are you missing
> in the log? I think the information about when a checkpoint is triggered,
> received by the TM, performing the sync and async part and acknowledgement
> time should all be there?.
>
> Best,
> Stefan
>
>
>
> Am 28.09.2017 um 08:18 schrieb Tony Wei :
>
> Hi Stefan,
>
> The checkpoint on my job has been subsumed again. There are some questions
> that I don't understand.
>
> Log in JM :
> 2017-09-27 13:45:15,686 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
> 2017-09-27 13:49:42,795 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1577 @ 1506520182795
> 2017-09-27 13:54:42,795 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1578 @ 1506520482795
> 2017-09-27 13:55:13,105 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
> 2017-09-27 13:56:37,103 WARN 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Received late message for now expired checkpoint attempt 1577 from
> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b
> 2017-09-27 13:59:42,795 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1579 @ 1506520782795
>
> Log in TM:
> 2017-09-27 13:56:37,105 INFO 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend
> - DefaultOperatorStateBackend snapshot (File Stream Factory @
> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
> Threads] took 240248 ms.
>
> I think the log in TM might be the late message for #1577 in JM, because
> #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
> If there is no mistake on my words, I am wondering why the time it took
> was 240248 ms (4 min). It seems that it started late than asynchronous
> tasks in #1578.
> Is there any way to guarantee the previous asynchronous parts of
> checkpoints will be executed before the following.
>
> Moreover, I think it will be better to have more information in INFO log,
> such as waiting time and checkpoint id, in order to trace the progress of
> checkpoint conveniently.
>
> What do you think? Do you have any suggestion for me to deal with these

Flink Savepoint Config parameter

2017-09-28 Thread ant burton
Hey,

When running in EMR and taking a savepoint with

flink cancel -s SAVEPOINT_DIR JOB_ID

results in the following error

Caused by: org.apache.flink.util.ConfigurationException: Config parameter
'Key: 'jobmanager.rpc.address' , default: null (deprecated keys: [])' is
missing (hostname/address of JobManager to connect to).

Am I missing something shouldn't this have been set by the cluster?

Thanks


Re: Kinesis connector - Jackson issue

2017-09-28 Thread Tomasz Dobrzycki
Hi guys,

I was able to solve the issue. I deleted all my Flink distributions
and followed these steps:

1) Clone Flink source (because I'm building Flink with Kinesis connector)
2) Checkout to release-1.3.1 (that's the version of Flink on EMR)
3) mvn clean install -Pinclude-kinesis -DskipTests (using Maven 3.2.5
for that so no need to do it in flink-dist again)
4) go to application folder
5) mvn clean package
- 'flink run' with file input works fine
- IntelliJ with file input works fine
- !! Caused by: java.lang.IllegalStateException: Socket not created by
this factory when running IntelliJ or 'flink run' with Kinesis input

6) follow the changes in
https://github.com/apache/flink/pull/4150/commits/9b539470ac308d7af9df9a70792aa1fa8c6995fc
(need to apply them to pom.xml file of
flink-connectors/flink-connector-kinesis) !! important note !! follow
all of the including deletion of aws shading
- now the code should run via IntelliJ and 'flink run'

The trick was to apply all the changes from that PR.
Thanks for all the help and suggestions.
Oh, and Bowen Li - you're the champ mate, thanks ;)

Cheers,
Tomasz

On 26 September 2017 at 17:05, Tomasz Dobrzycki
 wrote:
> Yes I am using quickstart template. I have removed the exclusions for
> jackson: core, databind and annotations.
>
> On 26 September 2017 at 16:36, Tzu-Li (Gordon) Tai  
> wrote:
>> Ah, I see.
>>
>> Are you using the Flink quickstart template to build your application?
>> I think exclusion is defined in the pom.xml of that archetype.
>>
>> Just above the exclusion I do see this message:
>> “WARNING: You have to remove these excludes if your code relies on other
>> version of these dependencies."
>>
>>
>> On 26 September 2017 at 5:27:47 PM, Tomasz Dobrzycki
>> (dobrzycki.tom...@gmail.com) wrote:
>>
>> Hi Gordon,
>>
>> Thanks for your answer.
>> - I've built it with Maven 3.2.5
>> - I am using Jackson in my application (version 2.7.4)
>>
>> Something that I have noticed when building Kinesis connector is that
>> it excludes jackson:
>> [INFO] Excluding
>> com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.7.3
>> from the shaded jar.
>> even though I can't find any mention of that in it's pom.xml.
>>
>> Cheers,
>> Tomasz
>>
>> On 26 September 2017 at 15:43, Tzu-Li (Gordon) Tai 
>> wrote:
>>> Hi Tomasz,
>>>
>>> Yes, dependency clashes may surface when executing actual job runs on
>>> clusters.
>>>
>>> A few things to probably check first:
>>> - Have you built Flink or the Kinesis connector with Maven version 3.3 or
>>> above? If yes, try using a lower version, as 3.3+ results in some shading
>>> issues when used to build Flink.
>>> - I’m not sure if the Kinesis client has a Jackson dependency, but you
>>> could
>>> also try checking if your application pulls in a conflicting Jackson
>>> version
>>> (with Flink, which uses 2.7.4) via some other dependency.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 26 September 2017 at 4:28:27 PM, Tomasz Dobrzycki
>>> (dobrzycki.tom...@gmail.com) wrote:
>>>
>>> Hi guys,
>>>
>>> I'm working with Kinesis connector and currently trying to solve a
>>> bizarre issue.
>>> I had problems with Kinesis and httpcomponents which I was able to
>>> solve using steps shown in:
>>>
>>> https://github.com/apache/flink/pull/4150/commits/9b539470ac308d7af9df9a70792aa1fa8c6995fc
>>>
>>> That did the trick and I am able to run my code successfully via
>>> IntelliJ. I am connecting to Kinesis stream hosted on AWS and reading
>>> messages just fine.
>>>
>>> Unfortunately that is not true for running Flink via command line
>>> scripts. I get this error when running start-local.sh into flink run
>>> ... :
>>> Caused by: java.lang.ClassNotFoundException:
>>> com.fasterxml.jackson.dataformat.cbor.CBORFactory
>>>
>>> I have built my Kinesis connector and installed it via mvn install. Am
>>> I missing some steps? I'm assuming that my code is fine given that I'm
>>> able to run it through IntelliJ.
>>>
>>> Anyone faced this problem or maybe some solution comes to your mind?
>>>
>>> Cheers
>>> Tomasz


Re: Issue with CEP library

2017-09-28 Thread Kostas Kloudas
Hi Ajay,

I will look a bit more on the issue.

But in the meantime, could you run your job with parallelism of 1, to see if 
the results are the expected?

Also could you change the pattern, for example check only for the start, to see 
if all keys pass through.

As for the code, you apply keyBy(0) the cepMap stream twice, which is redundant 
and introduces latency. 
You could remove that to also see the impact.

Kostas

> On Sep 28, 2017, at 2:57 AM, Ajay Krishna  wrote:
> 
> Hi, 
> 
> I've been only working with flink for the past 2 weeks on a project and am 
> trying using the CEP library on sensor data. I am using flink version 1.3.2. 
> Flink has a kafka source. I am using KafkaSource9. I am running Flink on a 3 
> node AWS cluster with 8G of RAM running Ubuntu 16.04. From the flink 
> dashboard, I see that I have 2 Taskmanagers & 4 Task slots
> 
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> 
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> 
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> 
> //cepMapByHomeId.print();
> 
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> 
> 
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
> 
> 
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> 
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented by a step of 1 sec. 
> I start multiple processes to simulate data form different homes.
> 
> THE PROBLEM:
> 
> Flink completely misses capturing events for a large subset of the input 
> data. I barely see the events for about 4-5 of the home_id values. I do a 
> print before applying the pattern and after and I see all home_ids before and 
> only a tiny subset after. Since the data is exactly the same, I expect all 
> homeid to be captured and written to my sink which is cassandra in this case. 
> I've looked through all available docs and examples but cannot seem to get a 
> fix for the problem.
> 
> I would really appreciate some guidance how to understand fix this.
> 
> 
> 
> Thank you,
> 
> Ajay
> 



Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-28 Thread Kostas Kloudas
Hi Yunus,

I see. Currently I am not sure that you can simply broadcast the watermark 
only, without 
having a shuffle.

But one thing to notice about your algorithm is that, I am not sure if your 
algorithm solves 
the problem you encounter.

Your algorithm seems to prioritize the stream with the elements with the 
smallest timestamps,
rather than throttling fast streams so that slow ones can catch up.

Example: Reading a partition from Kafka that has elements with timestamps 1,2,3
will emit watermark 3 (assuming ascending watermark extractor), while another 
task that reads 
another partition with elements with timestamps 5,6,7 will emit watermark 7. 
With your algorithm, 
if I get it right, you will throttle the second partition/task, while allow the 
first one to advance, although
both read at the same pace (e.g. 3 elements per unit of time).

I will think a bit more on the solution. 

Some sketches that I can find, they all introduce some latency, e.g. measuring 
throughput in taskA
and sending it to a side output with a taksID, then broadcasting the side 
output to a downstream operator
which is sth like a coprocess function (taskB) and receives the original stream 
and the side output, and 
this is the one that checks if “my task" is slow. 

As I said I will think on it a bit more,
Kostas

> On Sep 27, 2017, at 6:32 PM, Yunus Olgun  wrote:
> 
> Hi Kostas,
> 
> Yes, you have summarized well. I want to only forward the data to the next 
> local operator, but broadcast the watermark through the cluster.
> 
> - I can’t set parallelism of taskB to 1. The stream is too big for that. 
> Also, the data is ordered at each partition. I don’t want to change that 
> order.
> 
> - I don’t need KeyedStream. Also taskA and taskB will always have the same 
> parallelism with each other. But this parallelism can be increased in the 
> future.
> 
> The use case is: The source is Kafka. At our peak hours or when we want to 
> run the streaming job with old data from Kafka, always the same thing 
> happens. Even at trivial jobs. Some consumers consumes faster than others. 
> They produce too much data to downstream but watermark advances slowly at the 
> speed of the slowest consumer. This extra data gets piled up at downstream 
> operators. When the downstream operator is an aggregation, it is ok. But when 
> it is a in-Flink join; state size gets too big, checkpoints take much longer 
> and overall the job becomes slower or fails. Also it effects other jobs at 
> the cluster.
> 
> So, basically I want to implement a throttler. It compares timestamp of a 
> record and the global watermark. If the difference is larger than a constant 
> threshold it starts sleeping 1 ms for each incoming record. This way, fast 
> operators wait for the slowest one.
> 
> The only problem is that, this solution came at the cost of one network 
> shuffle and data serialization/deserialization. Since the stream is large I 
> want to avoid the network shuffle at the least. 
> 
> I thought operator instances within a taskmanager would get the same indexId, 
> but apparently this is not the case.
> 
> Thanks,
> 
>> On 27. Sep 2017, at 17:16, Kostas Kloudas > > wrote:
>> 
>> Hi Yunus,
>> 
>> I am not sure if I understand correctly the question.
>> 
>> Am I correct to assume that you want the following?
>> 
>>  ———> time
>> 
>>  ProcessAProcessB
>> 
>> Task1: W(3) E(1) E(2) E(5)   W(3) W(7) E(1) E(2) E(5)
>> 
>> Task2: W(7) E(3) E(10) E(6)  W(3) W(7) E(3) E(10) E(6)
>> 
>> 
>> In the above, elements flow from left to right and W() stands for watermark 
>> and E() stands for element.
>> In other words, between Process(TaksA) and Process(TaskB) you want to only 
>> forward the elements, but broadcast the watermarks, right?
>> 
>> If this is the case, a trivial solution would be to set the parallelism of 
>> TaskB to 1, so that all elements go through the same node.
>> 
>> One other solution is what you did, BUT by using a custom partitioner you 
>> cannot use keyed state in your process function B because the 
>> stream is no longer keyed.
>> 
>> A similar approach to what you did but without the limitation above, is that 
>> in the first processFunction (TaskA) you can append the 
>> taskId to the elements themselves and then do a keyBy(taskId) between the 
>> first and the second process function.
>> 
>> These are the solutions that I can come up with, assuming that you want to 
>> do what I described.
>> 
>> But in general, could you please describe a bit more what is your use case? 
>> This way we may figure out another approach to achieve your goal. 
>> In fact, I am not sure if you earn anything by broadcasting the watermark, 
>> other than 
>> re-implementing (to some extent) Flink’s windowing mechanism.
>> 
>> Thanks,
>> Kostas
>> 
>>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun >

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Stefan Richter
Hi,

when the async part takes that long I would have 3 things to look at:

1) Is your state so large? I don’t think this applies in your case, right?
2) Is something wrong with writing to DFS (network, disks, etc)?
3) Are we running low on memory on that task manager?

Do you have telemetry information about used heap and gc pressure on the 
problematic task? However, what speaks against the memory problem hypothesis is 
that future checkpoints seem to go through again. What I find very strange is 
that within the reported 4 minutes of the async part the only thing that 
happens is: open dfs output stream, iterate the in-memory state and write 
serialized state data to dfs stream, then close the stream. No locks or waits 
in that section, so I would assume that for one of the three reasons I gave, 
writing the state is terribly slow.

Those snapshots should be able to run concurrently, for example so that users 
can also take savepoints  even when a checkpoint was triggered and is still 
running, so there is no way to guarantee that the previous parts have finished, 
this is expected behaviour. Which waiting times are you missing in the log? I 
think the information about when a checkpoint is triggered, received by the TM, 
performing the sync and async part and acknowledgement time should all be 
there?.

Best,
Stefan



> Am 28.09.2017 um 08:18 schrieb Tony Wei :
> 
> Hi Stefan,
> 
> The checkpoint on my job has been subsumed again. There are some questions 
> that I don't understand.
> 
> Log in JM :
> 2017-09-27 13:45:15,686 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 1576 (174693180 bytes in 21597 ms).
> 2017-09-27 13:49:42,795 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1577 @ 1506520182795
> 2017-09-27 13:54:42,795 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1578 @ 1506520482795
> 2017-09-27 13:55:13,105 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 1578 (152621410 bytes in 19109 ms).
> 2017-09-27 13:56:37,103 WARN 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 1577 from 
> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b
> 2017-09-27 13:59:42,795 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1579 @ 1506520782795
> 
> Log in TM:
> 2017-09-27 13:56:37,105 INFO 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend - 
> DefaultOperatorStateBackend snapshot (File Stream Factory @ 
> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2, 
> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task Threads] 
> took 240248 ms.
> 
> I think the log in TM might be the late message for #1577 in JM, because 
> #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
> If there is no mistake on my words, I am wondering why the time it took was 
> 240248 ms (4 min). It seems that it started late than asynchronous tasks in 
> #1578.
> Is there any way to guarantee the previous asynchronous parts of checkpoints 
> will be executed before the following.
> 
> Moreover, I think it will be better to have more information in INFO log, 
> such as waiting time and checkpoint id, in order to trace the progress of 
> checkpoint conveniently.
> 
> What do you think? Do you have any suggestion for me to deal with these 
> problems? Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-27 17:11 GMT+08:00 Tony Wei  >:
> Hi Stefan,
> 
> Here is the summary for my streaming job's checkpoint after restarting at 
> last night.
> 
> 
> 
> This is the distribution of alignment buffered from the last 12 hours.
> 
> 
> 
> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk #1245 
> and #1246, you can check the picture I sent before.
> 
>  
> 
> AFAIK, the back pressure rate usually is in LOW status, sometimes goes up to 
> HIGH, and always OK during the night.
> 
> Best Regards,
> Tony Wei
> 
> 
> 2017-09-27 16:54 GMT+08:00 Stefan Richter  >:
> Hi Tony,
> 
> are your checkpoints typically close to the timeout boundary? From what I 
> see, writing the checkpoint is relatively fast but the time from the 
> checkpoint trigger to execution seems very long. This is typically the case 
> if your job has a lot of backpressure and therefore the checkpoint barriers 
> take a long time to travel to the operators, because a lot of events are 
> piling up in the buffers. Do you also experience large alignments for your 
> checkpoints?
> 
> Best,
> Stefan  
> 
>> Am 27.09.2017 um 10:43 schrieb Tony Wei > >:
>> 
>> Hi Stefan,
>> 
>> It seems that I found something strange from JM's log.
>> 
>> It had happened more than once before, but all subtasks would finish their 
>> ch