Re: RocksDB segfaults

2017-03-28 Thread Florian König
Thank you Stephan for spotting the problem. In hindsight it’s obvious that this 
can never work. I’ll figure something out :)

> Am 24.03.2017 um 10:28 schrieb Stephan Ewen <se...@apache.org>:
> 
> The code will not work properly, sorry.
> 
> The value returned by the state is whatever is stored under the key for which 
> the function was called the last time.
> In addition, the unsynchronized access is most likely causing the RocksDB 
> fault.
> 
> TL:DR - The "ValueState" / "ListState" / etc in Flink are not intended to be 
> used across threads.
> 
> 
> 
> On Fri, Mar 24, 2017 at 8:28 AM, Florian König <florian.koe...@micardo.com> 
> wrote:
> Hi,
> 
> @Robert: I have uploaded all the log files that I could get my hands on to 
> https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0. I 
> tried to remove all unrelated messages logged by the job itself. In 
> flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages and 
> the last half hour before the segfault.
> 
> @Stefan: Your theory could be the key. In the stack trace I see a call to 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge 
> that results in a call to 
> com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long 
> later. Here’s a slimmed-down snippet of the relevant code:
> 
> class TransitionProcessor extends RichFlatMapFunction<Transition, Reaction> {
> 
> transient ValueState headSeenState;
> 
> public void open(final Configuration parameters) throws Exception {
> headSeenState = FlinkUtil.getStateHandle(this, "head-seen", 
> IdSet.class);
> 
> getRuntimeContext()
> .getMetricGroup()
> .gauge("head-seen", new Gauge() {
> public Long getValue() {
> try {
> return 
> headSeenState.value().count();
> } catch (IOException e) {
> e.printStackTrace();
> return 0L;
> }
> }
> });
> }
> …
> }
> 
> FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires a 
> reference to that state via the RuntimeContext of the RichFunction passed as 
> ‚this‘ in the above code.
> 
> Further along in the stack trace I see that headSeenState.value() results in 
> a call to org.apache.flink.contrib.streaming.state.RocksDBValueState.value() 
> and then to org.rocksdb.RocksDB.get(J[BIJ).
> 
> It looks like part of the metrics system asynchronously reads the value of 
> the gauge and needs RocksDB for that. Is it possible that this thread does 
> not hold the checkpointing lock you were talking about?
> 
> Best regards
> Florian
> 
> 
> > Am 22.03.2017 um 18:19 schrieb Stefan Richter <s.rich...@data-artisans.com>:
> >
> > Hi,
> >
> > for the first checkpoint, from the stacktrace I assume that the backend is 
> > not accessed as part of processing an element, but by another thread. Is 
> > that correct? RocksDB requires accessing threads to hold the task’s 
> > checkpointing lock, otherwise they might call methods on an instance that 
> > is already disposed. However, this should only happen when the task was 
> > already about to shutdown anyways. Is that a plausible explanation for your 
> > observed behaviour? I can also not rule out that segfaults can happen 
> > inside RocksDB or due to the JNI bridge.
> >
> > Best,
> > Stefan
> >
> >> Am 22.03.2017 um 16:53 schrieb Florian König <florian.koe...@micardo.com>:
> >>
> >> Hi Stephen,
> >>
> >> you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
> >> Sorry, my bad.
> >>
> >> That leaves us with the first trace of a segfault for which I can 
> >> guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot 
> >> reproduce the problem. It has happened twice so far, but I can’t see any 
> >> pattern. Is there anything in the stack trace that could point us to a 
> >> probable cause?
> >>
> >> Florian
> >>
> >>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <se...@apache.org>:
> >>>
> >>> Hi!
> >>>
> >>> It looks like you are running the RocksDB state backend 1.1 (is

Re: RocksDB segfaults

2017-03-24 Thread Florian König
Hi,

@Robert: I have uploaded all the log files that I could get my hands on to 
https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0. I 
tried to remove all unrelated messages logged by the job itself. In 
flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages and 
the last half hour before the segfault.

@Stefan: Your theory could be the key. In the stack trace I see a call to 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge 
that results in a call to 
com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long later. 
Here’s a slimmed-down snippet of the relevant code:

class TransitionProcessor extends RichFlatMapFunction<Transition, Reaction> {

transient ValueState headSeenState;

public void open(final Configuration parameters) throws Exception {
headSeenState = FlinkUtil.getStateHandle(this, "head-seen", 
IdSet.class);

getRuntimeContext()
.getMetricGroup()
.gauge("head-seen", new Gauge() {
public Long getValue() {
try {
return 
headSeenState.value().count();
} catch (IOException e) {
e.printStackTrace();
return 0L;
}
}
});
}
…
}

FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires a 
reference to that state via the RuntimeContext of the RichFunction passed as 
‚this‘ in the above code.

Further along in the stack trace I see that headSeenState.value() results in a 
call to org.apache.flink.contrib.streaming.state.RocksDBValueState.value() and 
then to org.rocksdb.RocksDB.get(J[BIJ).

It looks like part of the metrics system asynchronously reads the value of the 
gauge and needs RocksDB for that. Is it possible that this thread does not hold 
the checkpointing lock you were talking about?

Best regards
Florian


> Am 22.03.2017 um 18:19 schrieb Stefan Richter <s.rich...@data-artisans.com>:
> 
> Hi,
> 
> for the first checkpoint, from the stacktrace I assume that the backend is 
> not accessed as part of processing an element, but by another thread. Is that 
> correct? RocksDB requires accessing threads to hold the task’s checkpointing 
> lock, otherwise they might call methods on an instance that is already 
> disposed. However, this should only happen when the task was already about to 
> shutdown anyways. Is that a plausible explanation for your observed 
> behaviour? I can also not rule out that segfaults can happen inside RocksDB 
> or due to the JNI bridge.
> 
> Best,
> Stefan
> 
>> Am 22.03.2017 um 16:53 schrieb Florian König <florian.koe...@micardo.com>:
>> 
>> Hi Stephen,
>> 
>> you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
>> Sorry, my bad.
>> 
>> That leaves us with the first trace of a segfault for which I can guarantee 
>> that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the 
>> problem. It has happened twice so far, but I can’t see any pattern. Is there 
>> anything in the stack trace that could point us to a probable cause?
>> 
>> Florian
>> 
>>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <se...@apache.org>:
>>> 
>>> Hi!
>>> 
>>> It looks like you are running the RocksDB state backend 1.1 (is still an 
>>> old version packaged into your JAR file?)
>>> 
>>> This line indicates that: 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
>>>  (the method does not exist in 1.2 any more)
>>> 
>>> Can you try and run 1.2 and see if that still occurs? In general, I cannot 
>>> vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>>> 
>>> Stephan
>>> 
>>> 
>>> 
>>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <florian.koe...@micardo.com> 
>>> wrote:
>>> Hi,
>>> 
>>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something 
>>> in RocksDB. What is the preferred way to report them? All I got at the 
>>> moment are two hs_err_pid12345.log files. They are over 4000 lines long 
>>> each. Is there anything significant that I should extract to help you guys 
>>> and/or put into a JIRA ticket?
>>> 
>>> The first thing that came to my mind was the stack traces (see be

Re: RocksDB segfaults

2017-03-22 Thread Florian König
Hi Stephen,

you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
Sorry, my bad.

That leaves us with the first trace of a segfault for which I can guarantee 
that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the 
problem. It has happened twice so far, but I can’t see any pattern. Is there 
anything in the stack trace that could point us to a probable cause?

Florian

> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <se...@apache.org>:
> 
> Hi!
> 
> It looks like you are running the RocksDB state backend 1.1 (is still an old 
> version packaged into your JAR file?)
> 
> This line indicates that: 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
>  (the method does not exist in 1.2 any more)
> 
> Can you try and run 1.2 and see if that still occurs? In general, I cannot 
> vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
> 
> Stephan
> 
> 
> 
> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <florian.koe...@micardo.com> 
> wrote:
> Hi,
> 
> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something 
> in RocksDB. What is the preferred way to report them? All I got at the moment 
> are two hs_err_pid12345.log files. They are over 4000 lines long each. Is 
> there anything significant that I should extract to help you guys and/or put 
> into a JIRA ticket?
> 
> The first thing that came to my mind was the stack traces (see below). 
> Anything else?
> 
> Thanks
> Florian
> 
> 
> 
> Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,  free 
> space=1019k
> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc 
> [0x7fec92588780+0x4c]
> J 27241 C2 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object;
>  (78 bytes) @ 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
> J 38483 C2 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V
>  (114 bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
> J 38522 C2 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
>  (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
> J 47531 C2 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V
>  (453 bytes) @ 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
> J 5815 C2 
> akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V
>  (7 bytes) @ 0x7fec91e3ae6c [0x7fec91e3adc0+0xac]
> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) 
> @ 0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
> J 6628 C2 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V
>  (60 bytes) @ 0x7fec9212d050 [0x7fec9212ccc0+0x390]
> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) 
> @ 0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
> v  ~StubRoutines::call_stub
> 
> --
> 
> Stack: [0x7f167a5b7000,0x7f167a6b8000],  sp=0x7f167a6b5f40,  free 
> space=1019k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> code)
> C  [libstdc++.so.6+0xc026b]  std::basic_string<char, std::char_traits, 
> std::allocator >::basic_string(std::string const&)+0xb
> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  
> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions 
> const&, rocksdb::BackupEngine**)+0x3a
> C  [librocksdbjni8426686507832168508.so+0x180ad5]  
> Java_org_rocksdb_BackupEngine_open+0x25
> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x7f16cb79aa36 
> [0x7f16cb79a980+0xb6]
> J 49809 C1 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap;
>  (416 bytes) @ 0x7f16cd2733d4 [0x7f16cd2719a0+0x1a34]
> J 51766 C2 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap;
>  (40 bytes) @ 0x7f16cb40a1fc [0x7f16cb40a1a0+0x5c]
> J 50547 C2 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapsho

RocksDB segfaults

2017-03-22 Thread Florian König
Hi,

I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in 
RocksDB. What is the preferred way to report them? All I got at the moment are 
two hs_err_pid12345.log files. They are over 4000 lines long each. Is there 
anything significant that I should extract to help you guys and/or put into a 
JIRA ticket?

The first thing that came to my mind was the stack traces (see below). Anything 
else?

Thanks
Florian



Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,  free 
space=1019k
Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc 
[0x7fec92588780+0x4c]
J 27241 C2 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object;
 (78 bytes) @ 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
J 38483 C2 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V
 (114 bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
J 38522 C2 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
 (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
J 47531 C2 
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V
 (453 bytes) @ 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
J 5815 C2 
akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V
 (7 bytes) @ 0x7fec91e3ae6c [0x7fec91e3adc0+0xac]
J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 
0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
J 6628 C2 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V
 (60 bytes) @ 0x7fec9212d050 [0x7fec9212ccc0+0x390]
J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 
0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
v  ~StubRoutines::call_stub

--

Stack: [0x7f167a5b7000,0x7f167a6b8000],  sp=0x7f167a6b5f40,  free 
space=1019k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
C  [libstdc++.so.6+0xc026b]  std::basic_string::basic_string(std::string const&)+0xb
C  [librocksdbjni8426686507832168508.so+0x2f14ca]  
rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, 
rocksdb::BackupEngine**)+0x3a
C  [librocksdbjni8426686507832168508.so+0x180ad5]  
Java_org_rocksdb_BackupEngine_open+0x25
J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x7f16cb79aa36 
[0x7f16cb79a980+0xb6]
J 49809 C1 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap;
 (416 bytes) @ 0x7f16cd2733d4 [0x7f16cd2719a0+0x1a34]
J 51766 C2 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap;
 (40 bytes) @ 0x7f16cb40a1fc [0x7f16cb40a1a0+0x5c]
J 50547 C2 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState;
 (206 bytes) @ 0x7f16cb8be89c [0x7f16cb8be7e0+0xbc]
J 52232 C2 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 
bytes) @ 0x7f16cbfbbf60 [0x7f16cbfbb540+0xa20]
J 52419 C2 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V
 (25 bytes) @ 0x7f16cbdd2624 [0x7f16cbdd25c0+0x64]
J 41649 C2 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z
 (439 bytes) @ 0x7f16cc8aed5c [0x7f16cc8add40+0x101c]
J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V 
(42 bytes) @ 0x7f16cbdc21d0 [0x7f16cbdc20c0+0x110]
j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
j  org.apache.flink.runtime.taskmanager.Task.run()V+700
j  java.lang.Thread.run()V+11
v  ~StubRoutines::call_stub
V  [libjvm.so+0x68c616]
V  [libjvm.so+0x68cb21]
V  [libjvm.so+0x68cfc7]
V  [libjvm.so+0x723d80]
V  [libjvm.so+0xa69dcf]
V  [libjvm.so+0xa69efc]
V  [libjvm.so+0x91d9d8]
C  [libpthread.so.0+0x8064]  start_thread+0xc4

Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x7f16cb79a9c4 
[0x7f16cb79a980+0x44]
J 49809 C1 

Re: ProcessingTimeTimer in ProcessFunction after a savepoint

2017-03-20 Thread Florian König
@Aljoscha Thank you for the pointer to ProcessFunction. That looks like a 
better approach with less code and other overhead.

After restoring, the job is both reading new elements and emitting some, but 
nowhere near as many as expected. I’ll investigate further after switching to 
ProcessFunction. I suspect that there is some problem with my code. I’ll let 
you know if any unexplained discrepancy remains.

> Am 20.03.2017 um 14:15 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> As a general remark, I think the ProcessFunction 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html)
>  could be better suited for implementing such a use case.
> 
> I did run tests on Flink 1.2 and master with a simple processing-time 
> windowing job. After performing a savepoint and waiting a few minutes I 
> restored and the windows that were still there immediately fired.
> 
> In your case, after restoring, is the Job also reading new elements or did 
> you try with just restoring without any new input?
> 
>> On 19 Mar 2017, at 13:15, Florian König <florian.koe...@micardo.com> wrote:
>> 
>> @Aljoscha: We’re using 1.2.
>> 
>> The intention of our code is as follows: The events that flow through Flink 
>> represent scheduling decisions, i.e. they contain the ID of a target entity, 
>> a description of an action that should be performed on that entity by some 
>> other job, and a timestamp of when that should happen.
>> 
>> We’re using the windowing mechanism to delay those events until they should 
>> be forwarded (and trigger the corresponding action). Furthermore, the 
>> schedule can be moved closer to the current point in time: subsequent 
>> schedule events for an entity (identified by its ID) can set the trigger 
>> time to an earlier instant. If the trigger time is in the past or very 
>> shortly (e.g., 100 ms) after now, the action should be triggered 
>> immediately. Actions scheduled for an instant after the currently planned 
>> one should be ignored; i.e. the schedule cannot be moved to the future.
>> 
>> exemplary event stream
>>  time … (ID, action, trigger time)   // intended reaction
>>  0 … (1, 'foo', 10)  // trigger action 'foo' on entity 1 at 
>> time 10
>>  3 … (2, 'bar', 15)  // trigger action 'bar' on entity 2 at 
>> time 15
>>  4 … (1, 'foo', 7)   // move trigger back to time 7
>>  9 … (1, 'foo', 12)  // ignore
>>  15 … (2, 'bar', 15) // trigger immediately
>> 
>> resulting stream:
>>  (1, 'foo', 7)   // at time 7
>>  (2, 'bar', 15)  // at time 15
>> 
>> To implement this, we have written a custom trigger that’s called from the 
>> following Flink code:
>> 
>> …
>> schedules.keyBy(schedule -> schedule.entityId)
>>  .window(GlobalWindows.create())
>>  .trigger(DynamicEarliestWindowTrigger.create())
>>  .fold((Schedule) null, (folded, schedule) -> schedule)
>>  .map( /* process schedules */ )
>> …
>> 
>> We fold the scheduling events 'to themselves', because only the latest event 
>> in each period is relevant. The custom trigger is implemented as follows 
>> (only Flink-revelvant parts and syntax):
>> 
>> class DynamicEarliestWindowTrigger 
>> extends Trigger<T, W> {
>>  
>>  ValueStateDescriptor windowEnd = new 
>> ValueStateDescriptor<>("windowEnd", Long.class);
>>  
>>  TriggerResult onElement(T element, long timestamp, W window, 
>> TriggerContext ctx) throws Exception {
>>  val windowEndState = ctx.getPartitionedState(windowEnd);
>>  val windowEndsAt = windowEndState.value();
>>  val newEnd = element.getTimestamp();
>>  
>>  // no timer set yet, or intention to trigger earlier
>>  if (windowEndsAt == null || newEnd <= windowEndsAt) {
>>  deleteCurrentTimer(ctx);
>>  
>>  // trigger time far enough from now => schedule timer
>>  if (newEnd > System.currentTimeMillis() + 100) {
>>  ctx.registerProcessingTimeTimer(newEnd);
>>  windowEndState.update(newEnd);
>>  } else {
>>  return TriggerResult.FIRE;  // close enough 
>> => fire immediately
>>  }
>>  }
>>

Re: ProcessingTimeTimer in ProcessFunction after a savepoint

2017-03-19 Thread Florian König
njoy the rest of your weekend :)
Florian


> Am 17.03.2017 um 16:51 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> When restoring, processing-time timers that would have fired already should 
> immediately fire.
> 
> @Florian what Flink version are you using? In Flink 1.1 there was a bug that 
> led to processing-time timers not being reset when restoring.
>> On 17 Mar 2017, at 15:39, Florian König <florian.koe...@micardo.com> wrote:
>> 
>> Hi,
>> 
>> funny coincidence, I was just about to ask the same thing. I have noticed 
>> this with restored checkpoints in one of my jobs. The timers seem to be 
>> gone. My window trigger registers a processing timer, but it seems that 
>> these don’t get restored - even if the timer is set to fire in the future, 
>> after the restore.
>> 
>> Is there something I need to be aware of in my class implementing Trigger? 
>> Anything I forgot to set in a method that’s being called upon a restore?
>> 
>> Thanks
>> Florian
>> 
>>> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI 
>>> <y.marzou...@mindlytix.com>:
>>> 
>>> Hi all,
>>> 
>>> How does the processing time timer behave when a job is taken down with a 
>>> savepoint and then restarted after the timer was supposed to fire? Will the 
>>> timer fire at restart because it was missed during the savepoint?
>>> 
>>> I'm wondering because I would like to schedule periodic timers in the 
>>> future (in processing time) at which a state is read and emitted, but I'm 
>>> afraid the timer will never fire if it occurs when the job is being down, 
>>> and therefore the state will never be emitted.
>>> 
>>> Best,
>>> Yassine
>> 
>> 
> 




Re: Telling if a job has caught up with Kafka

2017-03-19 Thread Florian König
Thanks Gordon for the detailed explanation! That makes sense and explains the 
expected behaviour.

The JIRA for the new metric also sounds very good. Can’t wait to have this in 
the Flink GUI (KafkaOffsetMonitor has some problems and stops working after 1-2 
days, don’t know the reason yet).

All the best,
Florian


> Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai <tzuli...@apache.org>:
> 
> @Florian
> the 0.9 / 0.10 version and 0.8 version behave a bit differently right now for 
> the offset committing.
> 
> In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable” etc. 
> settings will be completely ignored and overwritten before used to 
> instantiate the interval Kafka clients, hence committing will only happen on 
> Flink checkpoints.
> 
> In 0.8, this isn’t the case. Both automatic periodic committing and 
> committing on checkpoints can take place. That’s perhaps why you’re observing 
> the 0.8 consumer to be committing more frequently.
> 
> FYI: This behaviour will be unified in Flink 1.3.0. If you’re interested, you 
> can take a look at https://github.com/apache/flink/pull/3527.
> 
> - Gordon
> 
> 
> On March 17, 2017 at 6:07:38 PM, Florian König (florian.koe...@micardo.com) 
> wrote:
> 
>> Why is that so? The checkpoint contains the Kafka offset and would be able 
>> to start reading wherever it left off, regardless of any offset stored in 
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently 
>> from the checkpointing? Or did I misconfigure anything?




Re: ProcessingTimeTimer in ProcessFunction after a savepoint

2017-03-17 Thread Florian König
Hi,

funny coincidence, I was just about to ask the same thing. I have noticed this 
with restored checkpoints in one of my jobs. The timers seem to be gone. My 
window trigger registers a processing timer, but it seems that these don’t get 
restored - even if the timer is set to fire in the future, after the restore.

Is there something I need to be aware of in my class implementing Trigger? 
Anything I forgot to set in a method that’s being called upon a restore?

Thanks
Florian

> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI :
> 
> Hi all,
> 
> How does the processing time timer behave when a job is taken down with a 
> savepoint and then restarted after the timer was supposed to fire? Will the 
> timer fire at restart because it was missed during the savepoint?
> 
> I'm wondering because I would like to schedule periodic timers in the future 
> (in processing time) at which a state is read and emitted, but I'm afraid the 
> timer will never fire if it occurs when the job is being down, and therefore 
> the state will never be emitted.
> 
> Best,
> Yassine




Re: Telling if a job has caught up with Kafka

2017-03-17 Thread Florian König
Hi,

thank you Gyula for posting that question. I’d also be interested in how this 
could be done.

You mentioned the dependency on the commit frequency. I’m using 
https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer a 
job's offsets as shown in the diagrams updated a lot more regularly than the 
checkpointing interval. With the 10 consumer a commit is only made after a 
successful checkpoint (or so it seems).

Why is that so? The checkpoint contains the Kafka offset and would be able to 
start reading wherever it left off, regardless of any offset stored in Kafka or 
Zookeeper. Why is the offset not committed regularly, independently from the 
checkpointing? Or did I misconfigure anything?

Thanks
Florian

> Am 17.03.2017 um 10:26 schrieb Gyula Fóra :
> 
> Hi All,
> 
> I am wondering if anyone has some nice suggestions on what would be the 
> simplest/best way of telling if a job is caught up with the Kafka input.
> An alternative question would be how to tell if a job is caught up to another 
> job reading from the same topic.
> 
> The first thing that comes to my mind is looking at the offsets Flink commits 
> to Kafka. However this will only work if every job uses a different group id 
> and even then it is not very reliable depending on the commit frequency.
> 
> The use case I am trying to solve is fault tolerant update of a job, by 
> taking a savepoint for job1 starting job2 from the savepoint, waiting until 
> it catches up and then killing job1.
> 
> Thanks for your input!
> Gyula




Re: Rate-limit processing

2017-01-24 Thread Florian König
Hi Till,

thank you for the very helpful hints. You are right, I already see 
backpressure. In my case, that’s ok because it throttles the Kafka source. 
Speaking of which: You mentioned putting the rate limiting mechanism into the 
source. How can I do this with a Kafka source? Just extend the Producer, or is 
there a better mechanism to hook into the connector?

Cheers,
Florian


> Am 20.01.2017 um 16:58 schrieb Till Rohrmann <trohrm...@apache.org>:
> 
> Hi Florian,
> 
> any blocking of the user code thread is in general a not so good idea because 
> the checkpointing happens under the very same lock which also guards the user 
> code invocation. Thus any checkpoint barrier arriving at the operator has 
> only the chance to trigger the checkpointing once the blocking is over. Even 
> worse, if the blocking happens in a downstream operator (not a source), then 
> this blocking could cause backpressure. Since the checkpoint barriers flow 
> with the events and are processed in order, the backpressure will then also 
> influence the checkpointing time.
> 
> So if you want to limit the rate, you should do it a the sources without 
> blocking the source thread. You could for example count how many elements 
> you've emitted in the past second and if it exceeds your maximum, then you 
> don't emit the next element to downstream operators until some time has 
> passed (this might end up in a busy loop but it allows the checkpointing to 
> claim the lock).
> 
> Cheers,
> Till
> 
> On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI 
> <y.marzou...@mindlytix.com> wrote:
> Hi,
> 
> You might find this similar thread from the mailing list archive helpful : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/throttled-stream-td6138.html.
> 
> Best,
> Yassine
> 
> 2017-01-20 10:53 GMT+01:00 Florian König <florian.koe...@micardo.com>:
> Hi,
> 
> i need to limit the rate of processing in a Flink stream application. 
> Specifically, the number of items processed in a .map() operation has to stay 
> under a certain maximum per second.
> 
> At the moment, I have another .map() operation before the actual processing, 
> which just sleeps for a certain time (e.g., 250ms for a limit of 4 requests / 
> sec) and returns the item unchanged:
> 
> …
> 
> public T map(final T value) throws Exception {
> Thread.sleep(delay);
> return value;
> }
> 
> …
> 
> This works as expected, but is a rather crude approach. Checkpointing the job 
> takes a very long time: minutes for a state of a few kB, which for other jobs 
> is done in a few milliseconds. I assume that letting the whole thread sleep 
> for most of the time interferes with the checkpointing - not good!
> 
> Would using a different synchronization mechanism (e.g., 
> https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)
>  help to make checkpointing work better?
> 
> Or, preferably, is there a mechanism inside Flink that I can use to 
> accomplish the desired rate limiting? I haven’t found anything in the docs.
> 
> Cheers,
> Florian
> 
> 




Rate-limit processing

2017-01-20 Thread Florian König
Hi,

i need to limit the rate of processing in a Flink stream application. 
Specifically, the number of items processed in a .map() operation has to stay 
under a certain maximum per second.

At the moment, I have another .map() operation before the actual processing, 
which just sleeps for a certain time (e.g., 250ms for a limit of 4 requests / 
sec) and returns the item unchanged:

…

public T map(final T value) throws Exception {
Thread.sleep(delay);
return value;
}

…

This works as expected, but is a rather crude approach. Checkpointing the job 
takes a very long time: minutes for a state of a few kB, which for other jobs 
is done in a few milliseconds. I assume that letting the whole thread sleep for 
most of the time interferes with the checkpointing - not good!

Would using a different synchronization mechanism (e.g., 
https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)
 help to make checkpointing work better?

Or, preferably, is there a mechanism inside Flink that I can use to accomplish 
the desired rate limiting? I haven’t found anything in the docs.

Cheers,
Florian