Re: Why TimerService interface in ProcessFunction doesn't have deleteEventTimeTimer

2017-05-04 Thread Jagadish Bihani
Hi

Thanks for the multiple responses on this question.
Please correct me if I am wrong about the 3 possible ways of it:
1. As per FLINK-3089, RocksDB based timer implementation is efficient. But
it is not merged yet. Which release this will be part of?
2. FLINK-6359 suggests alternate approach based using Hierarchical timing
wheels.
3. We can use state to indicate whether to trigger or not but timer will
fire in all the cases. So this is not actually a cancellation of timer.

In our use case we have ~ 1000 events per sec. But number of cancellations
done will be much more. (i.e. as per the business logic, number of timers
which will need to be cancelled are more say 70%), so solution 3 can be
inefficient and a bit of wastage of resources.

So,
Could you please recommend how should I go about it?
 --  Which release item 1 or item 3 can be part of?
 --  About the existing implementation, are there any approximate data
points about how slow deletion is? (if RPS is 1000 and assuming any
standard AWS instance type.) I can derive from that and decide based on
data that to go with cancellations or not.



On Sat, Apr 22, 2017 at 8:13 PM, Ted Yu  wrote:

> Logged FLINK-6359, referring to this thread.
>
> FYI
>
> On Sat, Apr 22, 2017 at 1:10 AM, Gyula Fóra  wrote:
>
>> Hi,
>>
>> I am not familiar with this data structure, I will try to read up on it.
>> But it looks interesting.
>>
>> For some reference, some links:
>>
>> https://www.confluent.io/blog/apache-kafka-purgatory-hierarc
>> hical-timing-wheels/
>>
>> http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf
>>
>> Cheers,
>> Gyula
>>
>> On Sat, Apr 22, 2017, 00:35 Ted Yu  wrote:
>>
>>> Benjamin has an implementation for Hierarchical Timing Wheels (Apache
>>> License) :
>>>
>>> https://github.com/ben-manes/caffeine/blob/master/caffeine/s
>>> rc/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java
>>>
>>> If there is some interest, we can port the above over.
>>>
>>> Cheers
>>>
>>> On Fri, Apr 21, 2017 at 12:44 PM, Gyula Fóra  wrote:
>>>
 The timer will actually fire and will be removed at the original time,
 but we don't trigger any action on it. We also remove the tombstone state
 afterwards.

 So we use more memory yes depending on the length and number of timers
 that were deleted. But it is eventually cleaned up.

 Gyula

 Ted Yu  ezt írta (időpont: 2017. ápr. 21., P,
 21:38):

> A bit curious: wouldn't using "tombstone" markers constitute some
> memory leak (since Timers are not released) ?
>
> Cheers
>
> On Fri, Apr 21, 2017 at 12:23 PM, Gyula Fóra 
> wrote:
>
>> Hi!
>>
>> I thought I would drop my opinion here maybe it is relevant.
>>
>> We have used the Flink internal timer implementation in many of our
>> production applications, this supports the Timer deletion but the 
>> deletion
>> actually turned out to be a huge performance bottleneck because of the 
>> bad
>> deletion performance of the Priority queue.
>>
>> In many of our cases deletion could have been avoided by some more
>> clever registration/firing logic and we also ended up completely avoiding
>> deletion and instead using "tombstone" markers by setting a flag in the
>> state which timers not to fire when they actually trigger.
>>
>> Gyula
>>
>>
>>
>> Aljoscha Krettek  ezt írta (időpont: 2017. ápr.
>> 21., P, 14:47):
>>
>>> Hi,
>>> the reasoning behind the limited user facing API was that we were
>>> (are) not sure whether we would be able to support efficient deletion of
>>> timers for different ways of storing timers.
>>>
>>> @Stephan, If I remember correctly you were the strongest advocate
>>> for not allowing timer deletion. What’s your thinking on this? There was
>>> also a quick discussion on https://issues.apache.org/j
>>> ira/browse/FLINK-3089 where Xiaogang explained that the (new, not
>>> merged) RocksDB based timers would have efficient timer deletion.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 20. Apr 2017, at 11:56, Jagadish Bihani 
>>> wrote:
>>>
>>> Hi
>>>
>>> I am working on a use case where I want to start a timer for a given
>>> event type and when that timer expires it will perform certain action. 
>>> This
>>> can be done using Process Function.
>>>
>>> But I also want to cancel scheduled timer in case of some other
>>> types of events. I also checked the implementation of
>>> HeapInternalTimerService which implements InternalTimerService interface
>>> has those implementations already. Also SimpleTimerService which 
>>> overrides
>>> TimerService also uses InternalTimerService and simply passes
>>> VoidNamespace.INSTANCE.
>>>
>>> So in a way we are using InternalTimerService interface's
>>> implementations everywhere.
>>>
>>> So what is

Re: CEP memory requirements

2017-05-04 Thread Dawid Wysakowicz
Yes you are right, prior to 1.3.0 the state per key was never cleared.
Right now due to FLINK-5174
, in master branch, it is
stored only if necessary.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder



2017-05-04 22:12 GMT+02:00 Elias Levy :

> Looking at the code I gather that 1.2 does not clear the per key NFA state
> even if there is no state to keep, whereas this appears fixed in the master
> branch. Yes?
>
> On Thu, May 4, 2017 at 11:25 AM, Elias Levy 
> wrote:
>
>> I am observing odd memory behavior with the CEP library and I am
>> wondering if it is expected.
>>
>> If I write a simple local streaming Flink job that reads from a 65MB
>> compressed file of JSON objects, one per line, parses the JSON, performs a
>> filter operation, and then a keyBy, heap usage is stable, staying below
>> 250MB throughout per VisualVM.
>>
>> But if I create a CEP pattern that matches nothing
>> (Pattern.begin[T]("foo").where( _ => false )) and match it against the
>> stream produced by the last keyBy (CEP.pattern(stream, pattern).select),
>> then memory balloons until the program terminates, steadily growing until
>> 3GB.
>>
>> The VisualVM memory profiler appears unable to account for that used heap
>> space.  If I add the Live Bytes column I'd get only between 200-100 MB.
>>
>> Any idea what is going on?
>>
>> Flink 1.2.  Java 8.
>>
>>
>


Re: CEP memory requirements

2017-05-04 Thread Elias Levy
Looking at the code I gather that 1.2 does not clear the per key NFA state
even if there is no state to keep, whereas this appears fixed in the master
branch. Yes?

On Thu, May 4, 2017 at 11:25 AM, Elias Levy 
wrote:

> I am observing odd memory behavior with the CEP library and I am wondering
> if it is expected.
>
> If I write a simple local streaming Flink job that reads from a 65MB
> compressed file of JSON objects, one per line, parses the JSON, performs a
> filter operation, and then a keyBy, heap usage is stable, staying below
> 250MB throughout per VisualVM.
>
> But if I create a CEP pattern that matches nothing
> (Pattern.begin[T]("foo").where( _ => false )) and match it against the
> stream produced by the last keyBy (CEP.pattern(stream, pattern).select),
> then memory balloons until the program terminates, steadily growing until
> 3GB.
>
> The VisualVM memory profiler appears unable to account for that used heap
> space.  If I add the Live Bytes column I'd get only between 200-100 MB.
>
> Any idea what is going on?
>
> Flink 1.2.  Java 8.
>
>


Re: Queryable State

2017-05-04 Thread Chet Masterson
I found the issue. When parallelism = 3, my test data set was skewed such that data was only going to two of the three task managers (kafka partition = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a test data set with enough keys that spread across all three task managers, queryable state started working as expected. That is why only two KVStates were registered with the job manager, instead of three. my FINAL :-) questionshould I be getting org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event only N-1 task managers have data in a parallelism of N situation? Thanks for all the help!  04.05.2017, 11:24, "Ufuk Celebi" :Could you try KvStateRegistry#registerKvState please?In the JM logs you should see something about the number of connectedtask managers and in the task manager logs that each one connects to aJM.– UfukOn Tue, May 2, 2017 at 2:53 PM, Chet Masterson wrote: Can do. Any advice on where the trace prints should go in the task manager source code? BTW - How do I know I have a correctly configured cluster? Is there a set of messages in the job / task manager logs that indicate all required connectivity is present? I know I use the UI to make sure all the task managers are present, and that the job is running on all of them, but is there some verbiage in the logs that indicates the job manager can talk to all the task managers, and vice versa? Thanks! 02.05.2017, 06:03, "Ufuk Celebi" : Hey Chet! I'm wondering why you are only seeing 2 registration messages for 3 task managers. Unfortunately, there is no log message at the task managers when they send out the notification. Is it possible for you to run a remote debugger with the task managers or build a custom Flink version with the appropriate log messages on the task manager side? – Ufuk On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson  wrote:  Any insight here? I've got a situation where a key value state on a task  manager is being registered with the job manager, but when I try to query  it, the job manager responds it doesn't know the location of the key value  state...  26.04.2017, 12:11, "Chet Masterson" :  After setting the logging to DEBUG on the job manager, I learned four  things:  (On the message formatting below, I have the Flink logs formatted into JSON  so I can import them into Kibana)  1. The appropriate key value state is registered in both parallelism = 1 and  parallelism = 3 environments. In parallelism = 1, I saw one registration  message in the log, in the parallelism = 3, I saw two registration messages:  {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",  "msg":"Key value state registered for job  under name "}  2. When I issued the query in both parallelism = 1 and parallelism = 3  environments, I saw "Lookup key-value state for job  with  registration name ". In parallelism = 1, I saw 1 log message, in  parallelism = 3, I saw two identical messages.  3. I saw no other messages in the job manager log that seemed relevant.  4. When issuing the query in parallelism = 3, I continued to get the error:  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message  of null.  Thanks!  26.04.2017, 09:52, "Ufuk Celebi" :  Thanks! Your config looks good to me.  Could you please set the log level org.apache.flink.runtime.jobmanager to  DEBUG?  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG  Then we can check whether the JobManager logs the registration of the  state instance with the respective name in the case of parallelism >  1?  Expected output is something like this: "Key value state registered  for job ${msg.getJobId} under name ${msg.getRegistrationName}."  – Ufuk  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson   wrote:   Ok...more information.   1. Built a fresh cluster from the ground up. Started testing queryable  state   at each step.   2. When running under any configuration of task managers and job managers   were parallelism = 1, the queries execute as expected.   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job   manager) feeding off a kafka topic partitioned three ways, queries will   always fail, returning error   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an   error message of null.   4. I do know my state is as expected on the cluster. Liberal use of trace   prints show my state managed on the jobs is as I expect. However, I cannot   query them external.   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed   is configured by using the job manager UI.   6. My flink-conf.yaml:   jobmanager.rpc.address: flink01   jobmanager.rpc.port: 6123   jobmanager.heap.mb: 256   taskmanager.heap.mb: 512   taskmanager.data.port: 6121   taskmanager.numberOfTaskSlots: 1   taskmanager.memo

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-04 Thread G.S.Vijay Raajaa
I tried to reorder and the window function works fine. but then after
processing few stream of data from Topic A and Topic B, the window function
seem to throw the below error. The keyby is on eventTime field.

java.lang.RuntimeException: Unexpected key group index. This indicates a
bug.

at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)

at org.apache.flink.runtime.state.heap.HeapListState.add(
HeapListState.java:98)

at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(
WindowOperator.java:372)

at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
StreamInputProcessor.java:185)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
OneInputStreamTask.java:63)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
StreamTask.java:272)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

at java.lang.Thread.run(Thread.java:745)


Regards,

Vijay Raajaa GS

On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa 
wrote:

> Thanks for your input, will try to incorporate them in my implementation.
>
> Regards,
> Vijay Raajaa G S
>
> On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek 
> wrote:
>
>> The approach could work, but if it can happen that an event from stream A
>> is not matched by an event in stream B you will have lingering state that
>> never goes away. For such cases it might be better to write a custom
>> CoProcessFunction as sketched here: https://ci.apache.org/pr
>> ojects/flink/flink-docs-release-1.2/dev/stream/process_function.html.
>>
>> The idea is to keep events from each side in state and emit a result when
>> you get the event from the other side. You also set a cleanup timer in case
>> no other event arrives to make sure that state eventually goes away.
>>
>> Best,
>> Aljoscha
>>
>> On 3. May 2017, at 11:47, G.S.Vijay Raajaa 
>> wrote:
>>
>> Sure. Thanks for the pointer, let me reorder the same. Any comments about
>> the approach followed for merging topics and creating a single JSON?
>>
>> Regards,
>> Vijay Raajaa G S
>>
>> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> An AllWindow operator requires an AllWindowFunction, instead of a
>>> WindowFunction. In your case, the keyBy() seems to be in the wrong place,
>>> to get a keyed window you have to write something akin to:
>>>
>>> inputStream
>>>   .keyBy(…)
>>>   .window(…)
>>>   .apply(…) // or reduce()
>>>
>>> In your case, you key the stream and then the keying is “lost” again
>>> because you apply a flatMap(). That’s why you have an all-window and not a
>>> keyed window.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa 
>>> wrote:
>>>
>>> Hi,
>>>
>>> I am trying to combine two kafka topics using the a single kafka
>>> consumer on a list of topics, further convert the json string in the stream
>>> to POJO. Then, join them via keyBy ( On event time field ) and to merge
>>> them as a single fat json, I was planning to use a window stream and apply
>>> a window function on the window stream. The assumption is that Topic-A &
>>> Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) ,
>>> Topic B (JSON ) will be present with the same eventTime. Hence was planning
>>> to use a coutWindow(2) post keyBy on eventTime.
>>>
>>> I have couple of questions for the same;
>>>
>>> 1. Is the approach fine for merging topics and creating a single JSON?
>>> 2. The window function on All Window stream doesnt seem to work fine;
>>> Any pointers will be greatly appreciated.
>>>
>>> Code Snippet :
>>>
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>
>>> logger.info("Flink Stream Window Charger has started");
>>>
>>> Properties properties = new Properties();
>>>
>>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030");
>>>
>>> properties.setProperty("zookeeper.connect", "
>>> 127.0.0.1:2181/service-kafka");
>>>
>>> properties.setProperty("group.id", "group-0011");
>>>
>>> properties.setProperty("auto.offset.reset", "smallest");
>>>
>>>
>>> List < String > names = new ArrayList < > ();
>>>
>>>
>>> names.add("Topic-A");
>>>
>>> names.add("Topic-B");
>>>
>>>
>>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 <
>>> > (names, new SimpleStringSchema(), properties));
>>>
>>> DataStream < TopicPojo > pojo = stream.map(new
>>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
>>>
>>> List < String > where = new ArrayList < String > ();
>>>
>>> AllWindowedStream < String, GlobalWindow > data_window =
>>> pojo.flatMap(new Tokenizer()).countWindowAll(2);
>>>
>>> DataStream < String > data_charging = data_window.apply(new
>>> MyWindowFunction());
>>>
>>> data_charging.addSink(new SinkFunction < String > () {
>>>
>>>
>>> public void invoke(String value) throws Exception {
>>>
>>>
>>>   // Yet to be implemented - Merge two POJO into one
>>>
>>>  }
>>>
>>> });
>>>
>>>
>>> try
>>>
>>> {
>>>
>>>

CEP memory requirements

2017-05-04 Thread Elias Levy
I am observing odd memory behavior with the CEP library and I am wondering
if it is expected.

If I write a simple local streaming Flink job that reads from a 65MB
compressed file of JSON objects, one per line, parses the JSON, performs a
filter operation, and then a keyBy, heap usage is stable, staying below
250MB throughout per VisualVM.

But if I create a CEP pattern that matches nothing
(Pattern.begin[T]("foo").where( _ => false )) and match it against the
stream produced by the last keyBy (CEP.pattern(stream, pattern).select),
then memory balloons until the program terminates, steadily growing until
3GB.

The VisualVM memory profiler appears unable to account for that used heap
space.  If I add the Live Bytes column I'd get only between 200-100 MB.

Any idea what is going on?

Flink 1.2.  Java 8.


Re: assignTimestampsAndWatermarks not working as expected

2017-05-04 Thread Kostas Kloudas
Hi Jayesh,

Glad that it finally worked! 

From a first look, I cannot spot anything wrong with the code itself.
The only thing I have to note is that the locations of the logs and the 
printouts  you put
in your code differ and normally they are not printed in the console.

Thanks,
Kostas

> On May 4, 2017, at 6:45 PM, Jayesh Patel  wrote:
> 
> I figured out what’s wrong – there was a silly mistake on my side.  There is 
> nothing wrong with the code  here, but please do let me know if you see 
> anything wrong with my approach.
>  
> Thank you.
>  
> From: Jayesh Patel 
> Sent: Thursday, May 04, 2017 10:00 AM
> To: 'user@flink.apache.org' 
> Subject: assignTimestampsAndWatermarks not working as expected
>  
> Can anybody see what’s wrong with the following code?  I am using Flink 1.2 
> and have tried running it in Eclipse (local mode) as well as on a 3 node 
> cluster and it’s not behaving as expected.
>  
> The idea is to have a custom source collect messages from a JMS topic (I have 
> a fake source for now that generates some out of order messages with event 
> time that is not delayed more than 5 seconds).  The source doesn’t 
> collectWithTimestamp() or emitWatermark().
> The messages (events) include the event time.  In order to allow for late or 
> out of order messages I use assignTimestampsAndWatermarks with 
> BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method 
> retrieves the event time from the event.
>  
> When I run this job, I don’t get the printout from the extractTimestamp() 
> method, nor do I get the logTuples.print() or stampedLogs.print() output.  
> When running on the local environment(Eclipse) I do see the printouts from 
> the fake source (MockSource – not shown here).  But I don’t even get those 
> when run from my 3 node cluster with parallelism of 3.
>  
> public static void main(String[] args) throws Exception {
>final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>env.getConfig().setAutoWatermarkInterval(2000); // just for debugging, 
> didn’t affect the behavior
>  
>DataStream logs = env.addSource(new MockSource());
>DataStream> logTuples = logs.map(new 
> ParseEvent());
>logTuples.print();
>  
>  
>DataStream> stampedLogs = 
> logTuples.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(5))
>  {
>  private static final long serialVersionUID = 1L;
>  @Override
>  public long extractTimestamp(Tuple2 
> element) {
> // This is how to extract timestamp from the event
>long eventTime = 
> element.f1.getEventStartTime().toInstant().toEpochMilli();
>System.out.println("returning event time " + 
> eventTime);
>return eventTime;
>  }});
>stampedLogs.print();
>env.execute(“simulation”);
> }
>  
> Thank you,
> Jayesh



RE: assignTimestampsAndWatermarks not working as expected

2017-05-04 Thread Jayesh Patel
I figured out what's wrong - there was a silly mistake on my side.  There is
nothing wrong with the code  here, but please do let me know if you see
anything wrong with my approach.

 

Thank you.

 

From: Jayesh Patel 
Sent: Thursday, May 04, 2017 10:00 AM
To: 'user@flink.apache.org' 
Subject: assignTimestampsAndWatermarks not working as expected

 

Can anybody see what's wrong with the following code?  I am using Flink 1.2
and have tried running it in Eclipse (local mode) as well as on a 3 node
cluster and it's not behaving as expected.

 

The idea is to have a custom source collect messages from a JMS topic (I
have a fake source for now that generates some out of order messages with
event time that is not delayed more than 5 seconds).  The source doesn't
collectWithTimestamp() or emitWatermark().

The messages (events) include the event time.  In order to allow for late or
out of order messages I use assignTimestampsAndWatermarks with
BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method
retrieves the event time from the event.

 

When I run this job, I don't get the printout from the extractTimestamp()
method, nor do I get the logTuples.print() or stampedLogs.print() output.
When running on the local environment(Eclipse) I do see the printouts from
the fake source (MockSource - not shown here).  But I don't even get those
when run from my 3 node cluster with parallelism of 3.

 

public static void main(String[] args) throws Exception {

   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

   env.getConfig().setAutoWatermarkInterval(2000); // just for
debugging, didn't affect the behavior

 

   DataStream logs = env.addSource(new MockSource());

   DataStream> logTuples = logs.map(new
ParseEvent());

   logTuples.print();

 

 

   DataStream> stampedLogs =
logTuples.assignTimestampsAndWatermarks(

new
BoundedOutOfOrdernessTimestampExtractor>(Time.second
s(5)) {

 private static final long serialVersionUID = 1L;

 @Override

 public long extractTimestamp(Tuple2
element) {

// This is how to extract timestamp from the
event

   long eventTime =
element.f1.getEventStartTime().toInstant().toEpochMilli();

   System.out.println("returning event time " +
eventTime);

   return eventTime;

 }});

   stampedLogs.print();

   env.execute("simulation");

}

 

Thank you,

Jayesh



smime.p7s
Description: S/MIME cryptographic signature


Re: OperatorState partioning when recovering from failure

2017-05-04 Thread Stefan Richter
Hi,

the repartitoning happens indeed as some round-robin algorithm (see 
RoundRobinOperatorStateRepartitioner). This repartitioning happens at the level 
of the checkpoint coordinator in the master on restore, by redistrubution of 
state handles. The state that those handles are pointing to is a black box in 
this place, so all assumptions that we can make is that all partitions can be 
redistributed freely. If we want additional constraints to the repartitioning, 
the user has to apply those when handing over the state partitions, i.e. the 
partitioning into the list state must happen in a way that already groups 
together state partitions that should not end up on separate machines after a 
restore.

Best,
Stefan

> Am 04.05.2017 um 17:29 schrieb Kostas Kloudas :
> 
> Hi Seth,
> 
> Upon restoring, splits will be re-shuffled among the new tasks, and I believe 
> that state is repartitioned 
> in a round robin way (although I am not 100% sure so I am also including 
> Stefan and Aljoscha in this).
> The priority queues will be reconstructed based on the restored elements. So 
> task managers may get
> a relatively equal number of splits, but “recent” ones may be concentrated on 
> a few nodes. This may 
> also have to do with how your monitor sends them to the reader (e.g. all 
> splits of a recent file go to the 
> same node).
> 
> As far as I know, we do not have an option for custom state re-partitioner.
> 
> To see what is restored, you can enable DEBUG logging and this will print 
> upon restoring sth like:
> 
> "ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored 
> {restoredReaderState}"
> 
> with the restoredReaderState containing the restored splits.
> 
> And something similar upon checkpointing. This will give you a better look in 
> what may be happening.
> 
> Thanks,
> Kostas
> 
>> On May 4, 2017, at 3:45 PM, Seth Wiesman > > wrote:
>> 
>> I am curious about how operator state is repartitioned to subtasks when a 
>> job is resumed from a checkpoint or savepoint. The reason is that I am 
>> having issues with the ContinuousFileReaderOperator when recovering from a 
>> failure. 
>>  
>> I consume most of my data from files off S3. I have a custom file monitor 
>> that understands how to walk my directory structure and outputs 
>> TimestampedFileSplits downstream in chronological order to the stock 
>> ContinuousFileReaderOperator. The reader consumes those splits and stores 
>> them a priority queue based on their last modified time ensuring that files 
>> are read in chronological order which is exactly what I want. The problem is 
>> when recovering, the unread splits being partitioned out to each of the 
>> subtasks seem to be heavily skewed in terms of last modified time.
>>  
>> While each task may have a similar number of files I find then one or two 
>> will have a disproportionate number of old files. This in turn holds back my 
>> watermark (sometimes for several hours depending on the number of unread 
>> splits) which keeps timers from firing, windows from purging, etc.
>>  
>> I was hoping there were some way I could add a custom partitioner to ensure 
>> that splits are uniformly distributed in a temporal manner or if someone had 
>> other ideas of how I could mitigate the problem.
>>  
>> Thank you, 
>>  
>> Seth Wiesman 
>>  
> 



Re: Long running time based Patterns

2017-05-04 Thread Kostas Kloudas
Hi Moiz,

Then it should work.
And the previous issue is already fixed on the master.

Kostas

> On May 4, 2017, at 6:02 PM, Moiz Jinia  wrote:
> 
> It'll definitely have a where clause. Just forgot to include it in the 
> example. Just meant to focus on the within clause.
> 
> Am on 1.3 - expect it'll be fixed by the time stable is out?
> 
> Thanks!
> 
> Moiz
> 
> —
> sent from phone
> 
> On 04-May-2017, at 8:12 PM, Kostas Kloudas  > wrote:
> 
> Hi Moiz,
> 
> You are on Flink 1.2 or 1.3? 
> In Flink 1.2 (latest stable) there are no known issues, so this will work 
> correctly. 
> Keep in mind that without any conditions (where-clauses), you will only get 
> all possible 
> 2-tuples of incoming elements, which could also be done with a simple process 
> function I would say.
> 
> In Flink 1.3 (unreleased) there is this issue: 
> https://issues.apache.org/jira/browse/FLINK-6445 
> 
> 
> Thanks,
> Kostas
> 
>> On May 4, 2017, at 1:45 PM, Moiz S Jinia > > wrote:
>> 
>> Does Flink (with a persistent State backend such as RocksDB) work well with 
>> long running Patterns of this type? (running into days)
>> 
>> Pattern.begin("start").followedBy("end").within(Time.days(3))
>> 
>> Is there some gotchas here or things to watch out for?
>> 
>> Thanks,
>> Moiz
> 



Re: Long running time based Patterns

2017-05-04 Thread Moiz Jinia
It'll definitely have a where clause. Just forgot to include it in the example. 
Just meant to focus on the within clause.

Am on 1.3 - expect it'll be fixed by the time stable is out?

Thanks!

Moiz

—
sent from phone

On 04-May-2017, at 8:12 PM, Kostas Kloudas  wrote:

Hi Moiz,

You are on Flink 1.2 or 1.3? 
In Flink 1.2 (latest stable) there are no known issues, so this will work 
correctly. 
Keep in mind that without any conditions (where-clauses), you will only get all 
possible 
2-tuples of incoming elements, which could also be done with a simple process 
function I would say.

In Flink 1.3 (unreleased) there is this issue: 
https://issues.apache.org/jira/browse/FLINK-6445

Thanks,
Kostas

> On May 4, 2017, at 1:45 PM, Moiz S Jinia  wrote:
> 
> Does Flink (with a persistent State backend such as RocksDB) work well with 
> long running Patterns of this type? (running into days)
> 
> Pattern.begin("start").followedBy("end").within(Time.days(3))
> 
> Is there some gotchas here or things to watch out for?
> 
> Thanks,
> Moiz



Re: OperatorState partioning when recovering from failure

2017-05-04 Thread Kostas Kloudas
Hi Seth,

Upon restoring, splits will be re-shuffled among the new tasks, and I believe 
that state is repartitioned 
in a round robin way (although I am not 100% sure so I am also including Stefan 
and Aljoscha in this).
The priority queues will be reconstructed based on the restored elements. So 
task managers may get
a relatively equal number of splits, but “recent” ones may be concentrated on a 
few nodes. This may 
also have to do with how your monitor sends them to the reader (e.g. all splits 
of a recent file go to the 
same node).

As far as I know, we do not have an option for custom state re-partitioner.

To see what is restored, you can enable DEBUG logging and this will print upon 
restoring sth like:

"ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored 
{restoredReaderState}"

with the restoredReaderState containing the restored splits.

And something similar upon checkpointing. This will give you a better look in 
what may be happening.

Thanks,
Kostas

> On May 4, 2017, at 3:45 PM, Seth Wiesman  wrote:
> 
> I am curious about how operator state is repartitioned to subtasks when a job 
> is resumed from a checkpoint or savepoint. The reason is that I am having 
> issues with the ContinuousFileReaderOperator when recovering from a failure. 
>  
> I consume most of my data from files off S3. I have a custom file monitor 
> that understands how to walk my directory structure and outputs 
> TimestampedFileSplits downstream in chronological order to the stock 
> ContinuousFileReaderOperator. The reader consumes those splits and stores 
> them a priority queue based on their last modified time ensuring that files 
> are read in chronological order which is exactly what I want. The problem is 
> when recovering, the unread splits being partitioned out to each of the 
> subtasks seem to be heavily skewed in terms of last modified time.
>  
> While each task may have a similar number of files I find then one or two 
> will have a disproportionate number of old files. This in turn holds back my 
> watermark (sometimes for several hours depending on the number of unread 
> splits) which keeps timers from firing, windows from purging, etc.
>  
> I was hoping there were some way I could add a custom partitioner to ensure 
> that splits are uniformly distributed in a temporal manner or if someone had 
> other ideas of how I could mitigate the problem.
>  
> Thank you, 
>  
> Seth Wiesman 
>  



Re: Queryable State

2017-05-04 Thread Ufuk Celebi
Could you try KvStateRegistry#registerKvState please?

In the JM logs you should see something about the number of connected
task managers and in the task manager logs that each one connects to a
JM.

– Ufuk


On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
 wrote:
> Can do. Any advice on where the trace prints should go in the task manager
> source code?
>
> BTW - How do I know I have a correctly configured cluster? Is there a set of
> messages in the job / task manager logs that indicate all required
> connectivity is present? I know I use the UI to make sure all the task
> managers are present, and that the job is running on all of them, but is
> there some verbiage in the logs that indicates the job manager can talk to
> all the task managers, and vice versa?
>
> Thanks!
>
>
> 02.05.2017, 06:03, "Ufuk Celebi" :
>
> Hey Chet! I'm wondering why you are only seeing 2 registration
> messages for 3 task managers. Unfortunately, there is no log message
> at the task managers when they send out the notification. Is it
> possible for you to run a remote debugger with the task managers or
> build a custom Flink version with the appropriate log messages on the
> task manager side?
> – Ufuk
>
>
> On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
>  wrote:
>
>
>
>  Any insight here? I've got a situation where a key value state on a task
>  manager is being registered with the job manager, but when I try to query
>  it, the job manager responds it doesn't know the location of the key value
>  state...
>
>
>  26.04.2017, 12:11, "Chet Masterson" :
>
>  After setting the logging to DEBUG on the job manager, I learned four
>  things:
>
>  (On the message formatting below, I have the Flink logs formatted into JSON
>  so I can import them into Kibana)
>
>  1. The appropriate key value state is registered in both parallelism = 1
> and
>  parallelism = 3 environments. In parallelism = 1, I saw one registration
>  message in the log, in the parallelism = 3, I saw two registration
> messages:
>  {"level":"DEBUG","time":"2017-04-26
>
> 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",
>  "msg":"Key value state registered for job  under name "}
>
>  2. When I issued the query in both parallelism = 1 and parallelism = 3
>  environments, I saw "Lookup key-value state for job  with
>  registration name ". In parallelism = 1, I saw 1 log message, in
>  parallelism = 3, I saw two identical messages.
>
>  3. I saw no other messages in the job manager log that seemed relevant.
>
>  4. When issuing the query in parallelism = 3, I continued to get the error:
>  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
> message
>  of null.
>
>  Thanks!
>
>
>
>
>
>  26.04.2017, 09:52, "Ufuk Celebi" :
>
>  Thanks! Your config looks good to me.
>
>  Could you please set the log level org.apache.flink.runtime.jobmanager to
>  DEBUG?
>
>  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG
>
>  Then we can check whether the JobManager logs the registration of the
>  state instance with the respective name in the case of parallelism >
>  1?
>
>  Expected output is something like this: "Key value state registered
>  for job ${msg.getJobId} under name ${msg.getRegistrationName}."
>
>  – Ufuk
>
>  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
>   wrote:
>
>   Ok...more information.
>
>   1. Built a fresh cluster from the ground up. Started testing queryable
>  state
>   at each step.
>   2. When running under any configuration of task managers and job managers
>   were parallelism = 1, the queries execute as expected.
>   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
>   manager) feeding off a kafka topic partitioned three ways, queries will
>   always fail, returning error
>   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
>   error message of null.
>   4. I do know my state is as expected on the cluster. Liberal use of trace
>   prints show my state managed on the jobs is as I expect. However, I cannot
>   query them external.
>   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
>   is configured by using the job manager UI.
>   6. My flink-conf.yaml:
>
>   jobmanager.rpc.address: flink01
>   jobmanager.rpc.port: 6123
>   jobmanager.heap.mb: 256
>
>   taskmanager.heap.mb: 512
>   taskmanager.data.port: 6121
>   taskmanager.numberOfTaskSlots: 1
>   taskmanager.memory.preallocate: false
>
>   parallelism.default: 1
>   blob.server.port: 6130
>
>   jobmanager.web.port: 8081
>   query.server.enable: true
>
>   7. I do know my job is indeed running in parallel, from trace prints going
>   to the task manager logs.
>
>   Do I need a backend configured when running in parallel for the queryable
>   state? Do I need a shared temp directory on the task managers?
>
>   THANKS!
>
>
>   25.04.2017, 04:24, "Ufuk Celebi" :
>
>   It's strange that the rpc port is set to 3 when you use a
>   standalone cluster and configure 6123 as t

Re: Join two kafka topics

2017-05-04 Thread Kostas Kloudas
Perfect! 
Thanks a lot for the clarification!

Kostas

> On May 4, 2017, at 4:37 PM, Tarek khal  wrote:
> 
> Hi Kostas,
> 
> Yes, now is solved by the help of Jason.
> 
> Best,
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p13006.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Join two kafka topics

2017-05-04 Thread Tarek khal
Hi Kostas,

Yes, now is solved by the help of Jason.

Best,



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p13006.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Join two kafka topics

2017-05-04 Thread Kostas Kloudas
Hi Tarek,

This question seems to be a duplicate with your other question “ConnectedStream 
keyBy issues”, right?
I am just asking for clarification.

Thanks,
Kostas

> On May 4, 2017, at 1:41 PM, Tarek khal  wrote:
> 
> Hi Aljoscha,
> 
> I tested ConnectedStream and CoFlatMapFunction as you told me but the result
> is not as I wait.
> 
> 
> *For the execution:*
> 
> 1) I added 3 rules on "rules" topic (imei: "01","02,"03") 
> 2) Perform 15 events with different imei but i guess i have problem with
> "keyby"
> 
> *Result : *
> 
> 
>  
> 
> Code :
> 
> 
> Best,
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p12998.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Long running time based Patterns

2017-05-04 Thread Kostas Kloudas
Hi Moiz,

You are on Flink 1.2 or 1.3? 
In Flink 1.2 (latest stable) there are no known issues, so this will work 
correctly. 
Keep in mind that without any conditions (where-clauses), you will only get all 
possible 
2-tuples of incoming elements, which could also be done with a simple process 
function I would say.

In Flink 1.3 (unreleased) there is this issue: 
https://issues.apache.org/jira/browse/FLINK-6445 


Thanks,
Kostas

> On May 4, 2017, at 1:45 PM, Moiz S Jinia  wrote:
> 
> Does Flink (with a persistent State backend such as RocksDB) work well with 
> long running Patterns of this type? (running into days)
> 
> Pattern.begin("start").followedBy("end").within(Time.days(3))
> 
> Is there some gotchas here or things to watch out for?
> 
> Thanks,
> Moiz



Re: ConnectedStream keyby issues

2017-05-04 Thread Tarek khal
Hi Jason,

Thank you very much for your help, it solves my problem.

Best regards,



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13003.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


assignTimestampsAndWatermarks not working as expected

2017-05-04 Thread Jayesh Patel
Can anybody see what's wrong with the following code?  I am using Flink 1.2
and have tried running it in Eclipse (local mode) as well as on a 3 node
cluster and it's not behaving as expected.

 

The idea is to have a custom source collect messages from a JMS topic (I
have a fake source for now that generates some out of order messages with
event time that is not delayed more than 5 seconds).  The source doesn't
collectWithTimestamp() or emitWatermark().

The messages (events) include the event time.  In order to allow for late or
out of order messages I use assignTimestampsAndWatermarks with
BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method
retrieves the event time from the event.

 

When I run this job, I don't get the printout from the extractTimestamp()
method, nor do I get the logTuples.print() or stampedLogs.print() output.
When running on the local environment(Eclipse) I do see the printouts from
the fake source (MockSource - not shown here).  But I don't even get those
when run from my 3 node cluster with parallelism of 3.

 

public static void main(String[] args) throws Exception {

   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

   env.getConfig().setAutoWatermarkInterval(2000); // just for
debugging, didn't affect the behavior

 

   DataStream logs = env.addSource(new MockSource());

   DataStream> logTuples = logs.map(new
ParseEvent());

   logTuples.print();

 

 

   DataStream> stampedLogs =
logTuples.assignTimestampsAndWatermarks(

new
BoundedOutOfOrdernessTimestampExtractor>(Time.second
s(5)) {

 private static final long serialVersionUID = 1L;

 @Override

 public long extractTimestamp(Tuple2
element) {

// This is how to extract timestamp from the
event

   long eventTime =
element.f1.getEventStartTime().toInstant().toEpochMilli();

   System.out.println("returning event time " +
eventTime);

   return eventTime;

 }});

   stampedLogs.print();

   env.execute("simulation");

}

 

Thank you,

Jayesh



smime.p7s
Description: S/MIME cryptographic signature


Re: ConnectedStream keyby issues

2017-05-04 Thread Jason Brelloch
I think the issue is that t2 is not registered to keyed state, so it is
being shared across all of the keys on that taskmanager.  Take a look at
this article:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state

Basically you need to change t2 to be a
ValueState[Tuple2[TrackEvent,RulesEvent]]
and register it with a ValueStateDescriptor in in the function's open
method.  Then access it using t2.value() and t2.update().

Hopefully that helps.

On Thu, May 4, 2017 at 9:17 AM, Tarek khal 
wrote:

> Hi ,
> I have two kafka topics (tracking and rules) and I would like to join
> "tracking" datastream with "rules" datastream as the data arrives in the
> "tracking" datastream.
>
> The problem with a join is that the rules only “survive” for the length of
> the window while I suspect that i want them to survive longer than that so
> that they can be applied to events arriving in the future.
>
> I tested ConnectedStream and CoFlatMapFunction but the result is not as I
> wait.
>
> *For the execution:*
>
> 1) I added 3 rules on "rules" topic (imei: "01","02,"03")
> 2) Perform 15 events with different imei but i guess i have problem with
> "keyby"
>
> *Result : *
>
>  n4.nabble.com/file/n12999/222.jpg>
>
> *Code :*
>
> ConnectedStreams  connectedStreams =
> inputEventStream.connect(inputRulesStream).keyBy("imei","imei");
> DataStream> ds=
> connectedStreams.flatMap(new CoFlatMapFunction Tuple2>() {
> Tuple2 t2=new Tuple2 RulesEvent>();
> @Override
> public void flatMap1(TrackEvent trackEvent,
> Collector> collector) throws Exception {
> t2.f0=trackEvent;
> collector.collect(t2);
> // t2=new Tuple2();
> }
>
> @Override
> public void flatMap2(RulesEvent rulesEvent,
> Collector> collector) throws Exception {
> t2.f1 = rulesEvent;
> //collector.collect(t2);
> }
> });
> ds.printToErr();
>
> Best,
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-
> keyby-issues-tp12999.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 
*Jason Brelloch* | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305

Subscribe to the BetterCloud Monitor

-
Get IT delivered to your inbox


OperatorState partioning when recovering from failure

2017-05-04 Thread Seth Wiesman
I am curious about how operator state is repartitioned to subtasks when a job 
is resumed from a checkpoint or savepoint. The reason is that I am having 
issues with the ContinuousFileReaderOperator when recovering from a failure.

I consume most of my data from files off S3. I have a custom file monitor that 
understands how to walk my directory structure and outputs 
TimestampedFileSplits downstream in chronological order to the stock 
ContinuousFileReaderOperator. The reader consumes those splits and stores them 
a priority queue based on their last modified time ensuring that files are read 
in chronological order which is exactly what I want. The problem is when 
recovering, the unread splits being partitioned out to each of the subtasks 
seem to be heavily skewed in terms of last modified time.

While each task may have a similar number of files I find then one or two will 
have a disproportionate number of old files. This in turn holds back my 
watermark (sometimes for several hours depending on the number of unread 
splits) which keeps timers from firing, windows from purging, etc.

I was hoping there were some way I could add a custom partitioner to ensure 
that splits are uniformly distributed in a temporal manner or if someone had 
other ideas of how I could mitigate the problem.

Thank you,

Seth Wiesman



ConnectedStream keyby issues

2017-05-04 Thread Tarek khal
Hi ,
I have two kafka topics (tracking and rules) and I would like to join
"tracking" datastream with "rules" datastream as the data arrives in the
"tracking" datastream. 

The problem with a join is that the rules only “survive” for the length of
the window while I suspect that i want them to survive longer than that so
that they can be applied to events arriving in the future.

I tested ConnectedStream and CoFlatMapFunction but the result is not as I
wait.

*For the execution:*

1) I added 3 rules on "rules" topic (imei: "01","02,"03") 
2) Perform 15 events with different imei but i guess i have problem with
"keyby"

*Result : *


 

*Code :*

ConnectedStreams  connectedStreams =
inputEventStream.connect(inputRulesStream).keyBy("imei","imei");
DataStream> ds=
connectedStreams.flatMap(new CoFlatMapFunction>() {
Tuple2 t2=new Tuple2();
@Override
public void flatMap1(TrackEvent trackEvent,
Collector> collector) throws Exception {
t2.f0=trackEvent;
collector.collect(t2);
// t2=new Tuple2();
}

@Override
public void flatMap2(RulesEvent rulesEvent,
Collector> collector) throws Exception {
t2.f1 = rulesEvent;
//collector.collect(t2);
}
});
ds.printToErr();

Best,





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Join two kafka topics

2017-05-04 Thread Tarek khal
Hi Aljoscha,

I tested ConnectedStream and CoFlatMapFunction as you told me but the result
is not as I wait.


*For the execution:*

1) I added 3 rules on "rules" topic (imei: "01","02,"03") 
2) Perform 15 events with different imei but i guess i have problem with
"keyby"

*Result : *


 

Code :


Best,





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p12998.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Long running time based Patterns

2017-05-04 Thread Moiz S Jinia
Does Flink (with a persistent State backend such as RocksDB) work well with
long running Patterns of this type? (running into days)

Pattern.begin("start").followedBy("end").within(Time.days(3))

Is there some gotchas here or things to watch out for?

Thanks,
Moiz


Re: Queries regarding Historical Reprocessing

2017-05-04 Thread Aljoscha Krettek
Hi,
Sorry for the longer wait, it’s a longer answer and I had to sort my thoughts. 
I’ll try and answer each question separately, though the solution for some of 
the issues are the same.

1. I think the problem here is that Flink will not perform any checkpoints if 
some operators have finished. The Streaming File Sources are implemented as a 
combination of two operators: file monitor/split generator and file reader 
operator. The first one is responsible for enumerating available files and 
generating input splits. The second one is responsible for reading the actual 
contents. In a Flink Job it will thus look like this: File Monitor -> File 
Reader, you should see this in the JobManager dashboard.

Now, by default env.createInput(InputFormat) creates a file monitor that only 
scans the directory once and then finishes, this is why we don’t see any 
checkpoints being performed. You can work around this by using
env.readFile(FileInputFormat format, String filePath, FileProcessingMode 
watchType, long interval)
With a watch type of PROCESS_CONTINUOUSLY. With this, the file monitor will 
stay active and continuously send input splits downstream for newly created 
files.

2. The File Monitor should always have parallelism=1 while the read operator 
will have the parallelism configured by the user.

3. With separate source there would be a separate file monitor/file reader 
combination for each of them. How they are spread across the TaskManagers 
depends on the parallelism and how the Cluster, especially the TaskManager 
slots are configured.

4. See 1. and 2. If you set PROCESS_CONTINUOUSLY it will pick up new files that 
are added to the folder.

Best,
Aljoscha


> On 3. May 2017, at 16:27, Vinay Patil  wrote:
> 
> Hi Guys,
> 
> Can someone please help me in understanding this ?
> 
> Regards,
> Vinay Patil
> 
> On Thu, Apr 27, 2017 at 12:36 PM, Vinay Patil  wrote:
> Hi Guys, 
> 
> For historical reprocessing , I am reading the avro data from S3 and passing 
> these records to the same pipeline for processing. 
> 
> I have the following queries: 
> 
> 1. I am running this pipeline as a stream application with checkpointing 
> enabled, the records are successfully written to S3, however they remain in 
> the pending state as checkpointing is not triggered when I doing 
> re-processing. Why does this happen ? (kept the checkpointing interval to 1 
> minute, pipeline ran for 10 minutes) 
> this is the code I am using for reading avro data from S3 
> 
> AvroInputFormat avroInputFormat = new AvroInputFormat<>( 
> new org.apache.flink.core.fs.Path(s3Path), 
> SomeAvroClass.class); 
> 
> sourceStream = env.createInput(avroInputFormat).map(...); 
> 
> 
> 2. For the source stream Flink sets the parallelism as 1 , and for the rest 
> of the operators the user specified parallelism is set. How does Flink reads 
> the data ? does it bring the entire file from S3 one at a time  and then 
> Split it according to parallelism ? 
> 
> 3. I am reading from two different S3 folders and treating them as separate 
> sourceStreams, how does Flink reads data in this case ? does it pick one file 
> from each S3 folder , split the data and pass it downstream ? Does Flink 
> reads the data sequentially ? I am confused here as only one Task Manager is 
> reading the data from S3 and then all TM's are getting the data. 
> 
> 4. Although I am running this as as stream application, the operators goes 
> into FINISHED state after processing , is this because Flink treats the S3 
> source as finite data ? What will happen if the data is continuously written 
> to S3 from one pipeline and from the second pipeline I am doing historical 
> re-processing ? 
> 
> Regards, 
> Vinay Patil
> 



Re: High Availability on Yarn

2017-05-04 Thread Aljoscha Krettek
Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
> On 3. May 2017, at 16:58, Jain, Ankit  wrote:
> 
> Thanks for your reply Aljoscha.
>  
> After building better understanding of Yarn and spending copious amount of 
> time on Flink codebase, I think I now get how Flink & Yarn interact – I plan 
> to document this soon in case it could help somebody starting afresh with 
> Flink-Yarn.
>  
> Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
> still need leader election?
>  
> If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
> and while restarting, Flink AM will bring back previous running containers.  
> So, where does Zookeeper sit in this setup?
>  
> Thanks
> Ankit
>  
> From: Aljoscha Krettek 
> Date: Wednesday, May 3, 2017 at 2:05 AM
> To: "Jain, Ankit" 
> Cc: "user@flink.apache.org" , Till Rohrmann 
> 
> Subject: Re: High Availability on Yarn
>  
> Hi, 
> As a first comment, the work mentioned in the FLIP-6 doc you linked is still 
> work-in-progress. You cannot use these abstractions yet without going into 
> the code and setting up a cluster “by hand”.
>  
> The documentation for one-step deployment of a Job to YARN is available here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn
>  
> 
>  
> Regarding your third question, ZooKeeper is mostly used for discovery and 
> leader election. That is, JobManagers use it to decide who is the main JM and 
> who are standby JMs. TaskManagers use it to discover the leading JobManager 
> that they should connect to.
>  
> I’m also cc’ing Till, who should know this stuff better and can maybe explain 
> it in a bit more detail.
>  
> Best,
> Aljoscha
> On 1. May 2017, at 18:59, Jain, Ankit  > wrote:
>  
> Hi fellow users,
> We are trying to straighten out high availability story for flink.
>  
> Our setup includes a long running EMR cluster, job submission is a two-step 
> process – 1) Flink cluster is first created using flink yarn client on the 
> EMR cluster already running 2) Flink job is submitted.
>  
> I also saw references that with 1.2, these two steps have been combined into 
> 1 – is that change in FlinkYarnSessionCli.java? Can somebody point to 
> documentation please?
>  
> W/o worrying about Yarn RM (not Flink Yarn RM that seems to be newly 
> introduced) failure for now, I want to understand first how task manager & 
> job manager failures are handled.
>  
> My questions-
> 1)   
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 
> 
>  suggests a new RM has been added and now there is one JobManager for each 
> job. Since Yarn RM will now talk to Flink RM( instead of JobManager 
> previously), will Yarn automatically restart failing Flink RM?
> 2)   Is there any documentation on behavior of new Flink RM that will 
> come up? How will previously running JobManagers & TaskManagers find out 
> about new RM?
> 3)   
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#configuration
>  
> 
>  requires configuring Zookeeper even for Yarn – Is this needed for handling 
> Task Manager failures or JM or both? Will Yarn not take care of JM failures?
>  
> It may sound like I am little confused between role of Yarn and Flink 
> components– who has the most burden of HA? Documentation in current state is 
> lacking clarity – I know it is still evolving.
>  
> Please let me know if somebody can help clear the confusion.
>  
> Thanks
> Ankit
>  
>  
>  



Re: Fault tolerance & idempotency on window functions

2017-05-04 Thread Aljoscha Krettek
Hi,
When keying, keep in mind that Kafka and Flink might use a different scheme for 
hashing. For example, Flink also applies a murmur hash on the hash code 
retrieved from the key and then has some internal logic for assigning that hash 
to a key group (the internal unit of key partitioning). I don’t know what Kafka 
does internally for hashing.

Also keep in mind that even with event time, the events are not ordered by 
event time. So the event that arrives first does not necessarily have the 
lowest timestamp. Using event-time just means that we wait for the watermark to 
trigger window computation.

Regarding state size, if you don’t use merging windows (for example, session 
windows) then the only state that is kept for a purged window is a cleanup 
timer that is set for “end of window + allowed lateness”. That is, the state 
size does not increase with increasing allowed lateness if you purge. This 
could still fit into the heap state backend and you don’t necessarily need to 
consider RocksDB.

Best,
Aljoscha
> On 29. Apr 2017, at 10:19, Kamil Dziublinski  
> wrote:
> 
> Big thanks for replying Aljoscha, I spend quite some time on thinking how to 
> solve this problem and came to some conclusions. Would be cool if you can 
> verify if my logic is correct.
> 
> I decided that if I will partition data in kafka in the same way as I 
> partition my window with keyby. It's tenant, user combination (I would still 
> use hash out of it in kafka producer) and I will switch processing to event 
> time (currently it was processing time) then during replay I could be 100% 
> sure that first element will always be first, and watermark for triggering 
> the window would also come at the same moment. This giving me idempotent 
> writes of this batched object to HBase.
> 
> And for late events (by configuring lateness on the window itself) I would 
> configure the trigger to fire & purge, so that it doesn't hold fired data. 
> This way if late event arrives I could fire this late event with a different 
> timestamp treating it in hbase as totally separate increment, not overriding 
> my previous data. 
> The reason I want to purge data here on firing, is cause I would need to have 
> allowed lateness on window of at least 2 months. So holding all data after 
> firing for 2 months would be too costly.
> Additional question here, is there any cost to having allowed lateness very 
> high (like 2 months) if we configure trigger to fire & purge. Like any 
> additional state or metadata that flinks need to maintain that would take 
> much memory from the cluster? Would I have to consider rocksdb here for state 
> or FS state could still work?
> 
> On Fri, Apr 28, 2017 at 5:54 PM Aljoscha Krettek  > wrote:
> Hi,
> Yes, your analysis is correct: Flink will not retry for individual elements 
> but will restore from the latest consistent checkpoint in case of failure. 
> This also means that you can get different window results based on which 
> element arrives first, i.e. you have a different timestamp on your output in 
> that case.
> 
> One simple mitigation for the timestamp problem is to use the largest 
> timestamp of elements within a window instead of the first timestamp. This 
> will be stable across restores even if the order of arrival of elements 
> changes. You can still get problems when it comes to late data and window 
> triggering, if you cannot guarantee that your watermark is 100 % correct, 
> though. I.e. it might be that, upon restore, an element with an even larger 
> timestamp arrives late that was not considered when doing the first 
> processing that failed.
> 
> Best,
> Aljoscha
> > On 25. Apr 2017, at 19:54, Kamil Dziublinski  > > wrote:
> >
> > Hi guys,
> >
> > I have a flink streaming job that reads from kafka, creates some statistics 
> > increments and stores this in hbase (using normal puts).
> > I'm using fold function here of with window of few seconds.
> >
> > My tests showed me that restoring state with window functions is not 
> > exactly working how I expected.
> > I thought that if my window functions emits an aggregated object to a sink, 
> > and that object fails in a sink, this write to hbase will be replayed. So 
> > even if it actually got written to HBase, but flink thought it didnt (for 
> > instance during network problem) I could be sure of idempotent writes. I 
> > wanted to enforce that by using the timestamp of the first event used in 
> > that window for aggregation.
> >
> > Now correct me if I'm wrong but it seems that in the case of failure (even 
> > if its in sink) whole flow is getting replayed from last checkpoint which 
> > means that my window function might evict aggregated object in a different 
> > form. For instance not only having tuples that failed but also other ones, 
> > which would break my idempotency her and I might end up with having higher 
> > counters than I should have.
> >
> > Do you