Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-12-11 Thread Aljoscha Krettek
The PRs are merged. :D > On 11 Dec 2015, at 17:28, Stephan Ewen wrote: > > A solution for that is in these two pull requests: > > https://github.com/apache/flink/pull/1447 > > https://github.com/apache/flink/pull/1448 > > On Fri, Dec 4, 2015 at 10:21 PM, Robert Metzger wrote: > I think we nee

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-12-11 Thread Stephan Ewen
A solution for that is in these two pull requests: https://github.com/apache/flink/pull/1447 https://github.com/apache/flink/pull/1448 On Fri, Dec 4, 2015 at 10:21 PM, Robert Metzger wrote: > I think we need to find a solution for this problem soon. > Another user is most likely affected: > ht

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-12-04 Thread Robert Metzger
I think we need to find a solution for this problem soon. Another user is most likely affected: http://stackoverflow.com/q/34090808/568695 I've filed a JIRA for the problem: https://issues.apache.org/jira/browse/FLINK-3121 On Mon, Nov 30, 2015 at 5:58 PM, Aljoscha Krettek wrote: > Maybe. In th

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-30 Thread Aljoscha Krettek
Maybe. In the Kafka case we just need to ensure that parallel instances of the source that know that they don’t have any partitions assigned to them emit Long.MAX_VALUE as a watermark. > On 30 Nov 2015, at 17:50, Gyula Fóra wrote: > > Hi, > > I think what we will need at some point for this a

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-30 Thread Gyula Fóra
Hi, I think what we will need at some point for this are approximate whatermarks which correlate event and ingest time. I think they have similar concepts in Millwheel/Dataflow. Cheers, Gyula On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek wrote: > Hi, > as an addition. I don’t have a solutio

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-30 Thread Aljoscha Krettek
Hi, as an addition. I don’t have a solution yet, for the general problem of what happens when a parallel instance of a source never receives elements. This watermark business is very tricky... Cheers, Aljoscha > On 30 Nov 2015, at 17:20, Aljoscha Krettek wrote: > > Hi Konstantin, > I finally n

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-30 Thread Aljoscha Krettek
Hi Konstantin, I finally nailed down the problem. :-) The basis of the problem is the fact that there is a mismatch in the parallelism of the Flink Kafka Consumer and the number of partitions in the Kafka Stream. I would assume that in your case the Kafka Stream has 1 partition. This means, tha

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-29 Thread Konstantin Knauf
Hi Aljoscha, I have put together a gist [1] with two classes, a short processing pipeline, which shows the behavior and a data generator to write records into Kafka. I hope I remembered everything we discussed correctly. So basically in the example it works with "TimestampExtractor1" only for par

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-25 Thread Konstantin Knauf
Hi Aljoscha, sure, will do. I have neither found a solution. I won't have time to put a minimal example together before the weekend though. Cheers, Konstantin On 25.11.2015 19:10, Aljoscha Krettek wrote: > Hi Konstantin, > I still didn’t come up with an explanation for the behavior. Could you m

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-25 Thread Aljoscha Krettek
Hi Konstantin, I still didn’t come up with an explanation for the behavior. Could you maybe send me example code (and example data if it is necessary to reproduce the problem.)? This would really help me pinpoint the problem. Cheers, Aljoscha > On 17 Nov 2015, at 21:42, Konstantin Knauf > wrot

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-19 Thread Aljoscha Krettek
Hmm, that’s very strange. I’ll continue looking into it. > On 17 Nov 2015, at 21:42, Konstantin Knauf > wrote: > > Hi Aljoscha, > > Are you sure? I am running the job from my IDE at the moment. > > If I set > > StreamExecutionEnvironment.setParallelism(1); > > I works with the old TimestampE

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-17 Thread Konstantin Knauf
Hi Aljoscha, Are you sure? I am running the job from my IDE at the moment. If I set StreamExecutionEnvironment.setParallelism(1); I works with the old TimestampExtractor (returning Long.MIN_VALUE from getCurrentWatermark() and emitting a watermark at every record) If I set StreamExecutionEnvi

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-17 Thread Aljoscha Krettek
Hi, actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor sometimes emits a watermark that is lower than the one before. (This is the result of the bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from all upstream operat

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-17 Thread Aljoscha Krettek
Hi, yes, unfortunately, there is a bug in the timestamp extraction operator that sets the “last-seen watermark” to Long.MIN_VALUE even though it should not when calling getCurrentWatermark(). I’m opening an Issue and adding a fix to the latest master and the branch for the 0.10.x bugfix release

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-16 Thread Konstantin Knauf
Hi Aljoscha, I changed the Timestamp Extraktor to save the lastSeenTimestamp and only emit with getCurrentWatermark [1] as you suggested. So basically I do the opposite than before (only watermarks per events vs only watermarks per autowatermark). And now it works :). The question remains, why it

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-16 Thread Aljoscha Krettek
Hi, yes, at your data-rate emitting a watermark for every element should not be a problem. It could become a problem with higher data-rates since the system can get overwhelmed if every element also generates a watermark. In that case I would suggest storing the lastest element-timestamp in an i

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-16 Thread Konstantin Knauf
Hi Aljoscha, ok, now I at least understand, why it works with fromElements(...). For the rest I am not so sure. > What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated. But new elements arrive all the time, about

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-16 Thread Aljoscha Krettek
Hi, it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor works internally. First, the timestamp extractor internally keeps the value of the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows : - the result of extractTimestamp

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-16 Thread Gyula Fóra
Could this part of the extractor be the problem Aljoscha? @Override public long getCurrentWatermark() { return Long.MIN_VALUE; } Gyula Konstantin Knauf ezt írta (időpont: 2015. nov. 16., H, 10:39): > Hi Aljoscha, > > thanks for your answer. Yes I am using the same TimestampExtr

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-16 Thread Konstantin Knauf
Hi Aljoscha, thanks for your answer. Yes I am using the same TimestampExtractor-Class. The timestamps look good to me. Here an example. {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00 The order now is stream .map(dummyMapper) .assignTimestamps(...) .timeWindow(...) Is t

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-16 Thread Aljoscha Krettek
Hi, are you also using the timestamp extractor when you are using env.fromCollection(). Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka. Cheers, Aljoscha > On 15 Nov 2015, at

Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-15 Thread Konstantin Knauf
Hi everyone, I have the following issue with Flink (0.10) and Kafka. I am using a very simple TimestampExtractor like [1], which just extracts a millis timestamp from a POJO. In my streaming job, I read in these POJOs from Kafka using the FlinkKafkaConsumer082 like this: stream = env.addSource(n