Re: Missing MapState when Timer fires after restored state

2018-05-14 Thread sihua zhou
Hi Juho,
in fact, from your code I can't see any possible that the MapState could be 
inconsistency with the timer, it's looks like a bug to me, because once the 
checkpoint's complete and you haven't query the state in a customer thread 
async, then the result of the checkpoint should be consistency. The only case, 
I can see where the timer could be inconsistency with state is when the task is 
shutting down, that case the backend maybe already closed but the timer failed 
to shutdown, so that the time callback function may access a closed backend. 
But it shouldn't be reason of your case. Maybe, could you please provide us 
more information, like what type of backend are you using? are you using the 
RocksDBBackend? and I think @Stefan may tell more about this, and please 
correct me if I'm incorrect.


Best,
Sihua


On 05/15/2018 01:48,Bowen Li wrote:
Hi Juho,


You are right, there's no transactional guarantee on timers and state in 
processElement(). They may end up with inconsistency if your job was cancelled 
in the middle of processing an element.


To avoid the situation, the best programming practice is to always check if the 
state you're trying to get is null or not.


I've also created https://issues.apache.org/jira/browse/FLINK-9362 to document 
this. 


Thanks
Bowen






On Mon, May 14, 2018 at 4:00 AM, Juho Autio  wrote:

We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old 
state. After restoring state from a checkpoint, it seems like a timer had been 
restored, but not the data that was expected to be in a related MapState if 
such timer has been added.



The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or 
maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete


Our code (simplified):


private MapState mapState;


public void processElement(..) {
mapState.put("lastUpdated", ctx.timestamp().toString());
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 
stateRetentionMillis);
}


public void onTimer(long timestamp, OnTimerContext ctx, ..) {

long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
if (timestamp >= lastUpdated + stateRetentionMillis) {
mapState.clear();
}
}


Normally this "just works". As you can see, it shouldn't be possible that 
"lastUpdated" doesn't exist in state if timer was registered and onTimer gets 
called.


However, after restoring state from a checkpoint, the job kept failing with 
this error:


Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..


So apparently onTimer was called but lastUpdated wasn't found in the MapState.


The background for restoring state in this case is not entirely clean. There 
was an OS level issue "Too many open files" after running a job for ~11 days. 
To fix that, we replaced the cluster with a new one and launched the Flink job 
again. State was successfully restored from the latest checkpoint that had been 
created by the "problematic execution". Now, I'm assuming that if the state 
wouldn't have been created successfully, restoring wouldn't succeed either – 
correct? This is just to rule out that the issue with state didn't happen 
because the checkpoint files were somehow corrupted due to the Too many open 
files problem.



Thank you all for your continued support!


P.S. I would be very much interested to hear if there's some cleaner way to 
achieve this kind of TTL for keyed state in Flink.



Leader Retrieval Timeout with HA Job Manager

2018-05-14 Thread Jason Kania
Hi,

I am using the 1.4.2 release on ubuntu and attempting to make use of an HA Job 
Manager, but unfortunately using HA functionality prevents job submission with 
the following error:

java.lang.RuntimeException: Failed to retrieve JobManager address
    at 
org.apache.flink.client.program.ClusterClient.getJobManagerAddress(ClusterClient.java:308)
    at 
org.apache.flink.client.program.StandaloneClusterClient.getClusterIdentifier(StandaloneClusterClient.java:86)
    at 
org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:921)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:264)
    at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
    at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
    at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
    at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: 
Could not retrieve the leader address and leader session ID.
    at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:113)
    at 
org.apache.flink.client.program.ClusterClient.getJobManagerAddress(ClusterClient.java:302)
    ... 8 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[6 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
    at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at scala.concurrent.Await.result(package.scala)
    at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:111)
    ... 9 more

This seems to also be tied to problems in having the TaskManager register. I 
have to repeatedly restart the TaskManager until it finally connects to the Job 
Manager. Most times it doesn't connect and doesn't complain making the 
determination of the root cause more difficult. The cluster is not busy and I 
have tried both with IP addresses and host names to determine if name 
resolution issues were the cause, but both situations are the same.

I have also noticed that if 2 job managers are launched on different nodes in 
the same cluster, they both come back with logging indicating that they are the 
leader so they are not talking to each other effectively and the logging is not 
even indicating that they are even attempting to talk with one another.

Lastly, the error "Could not retrieve the leader address and leader session 
ID." is a very poor error because it does not tell where it is attempting to 
get the information from.

Any suggestions would be appreciated.

Thanks,

Jason


minPauseBetweenCheckpoints for failed checkpoints

2018-05-14 Thread Dmitry Minaev
Hello!

I have a question regarding checkpointing parameter minPauseBetweenCheckpoints 
that is the minimal pause between checkpointing attempts.
I’ve noticed the following (strange) behavior in Flink.

I set the following parameters for a sample Flink job:

Checkpointing Mode = Exactly Once
Interval = 10s
Timeout = 30s
Minimum Pause Between Checkpoints = 15s
Maximum Concurrent Checkpoints = 1
Persist Checkpoints Externally = Disabled

Then I started the job that intentionally makes some of the checkpoints fail by 
timeout.
I noticed that this parameter minPauseBetweenCheckpoints is taken into 
consideration by Flink only when checkpoint doesn’t fail by timeout:

My first checkpoint triggered at 18:03:11 and failed within expected 30 
seconds. But immediately after that, a new checkpoint was triggered at 
18:03:41. It doesn’t make sense to me since I’m using a 
minPauseBetweenCheckpoints = 15 seconds. I would expect Flink to wait for 15 
seconds before starting a new checkpoint.

However, it seems like this minPauseBetweenCheckpoints works as expected for 
checkpoints that completed successfully within configured interval. For 
example, my 4th checkpoint started at 18:04:41 and completed at 18:04:56. And 
the next checkpoint waited another 15 seconds to start at 18:05:11.

Please see attached screenshots for configuration and checkpoint history.

My question is – is it an expected behavior or a bug? Is there a way to have a 
pause between checkpoints even if checkpoint fails by timeout?

Thank you!

--
Kind regards,
Dmitry Minaev



CONFIDENTIALITY NOTICE: This e-mail and any files attached may contain 
confidential information of Five9 and/or its affiliated entities. Access by the 
intended recipient only is authorized. Any liability arising from any party 
acting, or refraining from acting, on any information contained in this e-mail 
is hereby excluded. If you are not the intended recipient, please notify the 
sender immediately, destroy the original transmission and its attachments and 
do not disclose the contents to any other person, use it for any purpose, or 
store or copy the information in any medium. Copyright in this e-mail and any 
attachments belongs to Five9 and/or its affiliated entities.


Re: Best way to clean-up states in memory

2018-05-14 Thread ashish pok
 Thanks Fabian, Kostas,
Here is what I had in the Trigger - idea is to run bitwise OR until a threshold 
is reached or a timeout is reached (nothing too fancy here). Let me know what 
you guys think. Like I said, I moved this logic to Process Function and I 
haven't seen the same issue I was with this. 

@PublicEvolvingpublic class BitwiseOrTrigger extends 
Trigger { private static final long serialVersionUID = 1L; 
private final int threshold; private final long epocDelta; private final 
ReducingStateDescriptor> stateDesc =  new 
ReducingStateDescriptor<>("bitwiseOr", new BitwiseOr(), TypeInformation.of(new 
TypeHint>() {}));

 private BitwiseOrTrigger(int threshold, long allowedLateness) { this.threshold 
= threshold; this.epocDelta = allowedLateness; }
 @Override public TriggerResult onElement(FactoredEvent event, long timestamp, 
W window, TriggerContext ctx) throws Exception { 
ReducingState> currState = 
ctx.getPartitionedState(stateDesc); if (this.epocDelta>0) { 
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + this.epocDelta); } 
currState.add(new Tuple2(event.getFactor(), this.epocDelta)); if 
(currState.get().f0 >= threshold) { currState.clear(); return 
TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; }
 @Override public TriggerResult onEventTime(long time, W window, TriggerContext 
ctx) { return TriggerResult.FIRE_AND_PURGE; }
 @Override public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; }
 @Override public void clear(W window, TriggerContext ctx) throws Exception { 
ctx.getPartitionedState(stateDesc).clear(); }
 @Override public boolean canMerge() { return true; }
 @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { 
ctx.mergePartitionedState(stateDesc); }
 @Override public String toString() { return "BitwiseOrTrigger(" +  threshold + 
")"; }
 public static  BitwiseOrTrigger of(int threshold, long 
expirationEpoc) { return new BitwiseOrTrigger<>(threshold, expirationEpoc); }
 private static class BitwiseOr implements ReduceFunction> { private static final long serialVersionUID = 1L;
 @Override public Tuple2 reduce(Tuple2 tup1, 
Tuple2 tup2) throws Exception { Tuple2 retTup = 
tup1; retTup.f0 = tup1.f0 | tup2.f0; return retTup; }
 }}

On Monday, May 14, 2018, 6:00:11 AM EDT, Fabian Hueske  
wrote:  
 
 Hi Ashish,

Did you use per-window state (also called partitioned state) in your Trigger? 
If yes, you need to make sure that it is completely removed in the clear() 
method because processing time timers won't fire once a window was purged. 
So you cannot (fully) rely on timers to clean up per-window state.

Best, Fabian

2018-05-14 9:34 GMT+02:00 Kostas Kloudas :

Hi Ashish,
It would be helpful to share the code of your custom trigger for the first 
case.Without that, we cannot tell what state you create and how/when you 
update/clear it.
Cheers,Kostas

On May 14, 2018, at 1:04 AM, ashish pok  wrote:
 Hi Till,
Thanks for getting back. I am sure that will fix the issue but I feel like that 
would potentially mask an issue. I have been going back and forth with Fabian 
on a use case where for some of our highly transient datasets, it might make 
sense to just use memory based state (except of course data loss becomes an 
issue when apps occasionally hit a problem and whole job restarts or app has to 
be taken down etc - ie. handling graceful shutdowns / restarts better 
essentially). I was on the hook to create a business case and post it back to 
this forum (which I am hoping I can get around to at some point soon). Long 
story short, this is one of those datasets. 
States in this case are either fired and cleared normally or on processing 
timeout. So technically, unless there is a memory leak in app code, memory 
usage should plateau out at a high-point. What I was noticing was memory would 
start to creep up ever so slowly. 
I couldn't tell exactly why heap utilization kept on growing (ever so slowly 
but it had upward trend for sure) because the states should technically be 
cleared if not as part of a reducing function then on timeout. App after 
running for couple of days would then run into Java Heap issues. So changing to 
RocksDB probably will fix the issue but not necessarily leak of states that 
should be cleared IMO. Interestingly, I switched my app from using something 
like this:
WindowedStream windowedStats = 
statsStream          .keyBy(BasicFactTuple::getKey)          
.window(GlobalWindows.create() )          .trigger(BitwiseOrTrigger.of( 60, 
AppConfigs.getWindowSize(5*60* 1000)))          ;
To 
 DataStream processStats = pipStatsStream          

Flink does not read from some Kafka Partitions

2018-05-14 Thread Ruby Andrews
Hello,

My team ran into some behavior we did not expect when we tried to get an 
existing Flink app to read from a re-sized Kafka. Here are the highlights: 
- We are using the FlinkKafkaConsumer010.
- We re-partitioned (added partitions to) an existing topic that our Flink app 
reads so that it the topic has 8 partitions. Following that, we re-deployed our 
task managers. We thought that the task managers would start reading new 
partitions.
- 8 task managers read from the topic, but they did NOT read all of the 
partitions. 3 of the partitions had 2 task managers reading from them and 3 of 
the partitions had 0 task managers reading from them. My team had expected that 
Flink would automatically read from all partitions, 1 task manager per 
partition.
- To force the app to read from all partitions, we added this property to our 
kafka consumer properties: flink.partition-discovery.interval-millis and 
re-deployed the task managers. We expected this flag to cause Flink to discover 
(and start reading) all partitions. 
- We did not see a change in the Kafka readers — there were still 3 topics not 
being read.
- Finally, we changed the ID of the Flink operator that  reads the Kafka topic 
and re-deployed the task managers again. 
- After changing the ID, the app started reading from all partitions. 

 
What is the correct way to pick up partitions after re-partitioning a Kafka 
topic? 

Thanks,
Ruby  

Akka heartbeat configurations

2018-05-14 Thread Bajaj, Abhinav
Hi,

We are running into issues where GC pause will result into Taskmanagers being 
marked dead incorrectly.
Flink 
documentation
 documents some knobs of Akka configurations to play around.

Focusing on “akka.watch.heartbeat.pause”, it mentions “Higher value increases 
the time to detect a dead TaskManager”

Can someone please help me understand the downside of increasing the time to 
detect a dead taskmanager?
Will this affect the fault tolerance guarantees / state management/ 
checkpointing?

Thanks,
Abhinav




Re: Checkpoint is not triggering as per configuration

2018-05-14 Thread Tao Xia
Thanks for the reply Piotr. Which jira ticket were you refer to?
We were trying to use the same code for normal stream process to process
very old historical backfill data.
The problem for me right now is that, backfill x years of data will be very
slow. And I cannot have any checkpoint during the whole time since
FileSource is "Finished". When anything goes wrong in the middle, the whole
pipeline will start over from beginning again.
Anyway I can skip the checkpoint of "Source: Custom File Source" but still
having checkpoint on "Split Reader: Custom File Source"?
Thanks,
Tao

On Fri, May 11, 2018 at 4:34 AM, Piotr Nowojski 
wrote:

> Hi,
>
> It’s not considered as a bug, only a missing not yet implemented feature
> (check my previous responses for the Jira ticket). Generally speaking using
> file input stream for DataStream programs is not very popular, thus this
> was so far low on our priority list.
>
> Piotrek
>
> > On 10 May 2018, at 06:26, xiatao123  wrote:
> >
> > I ran into a similar issue.
> >
> > Since it is a "Custom File Source", the first source just listing
> > folder/file path for all existing files. Next operator "Split Reader"
> will
> > read the content of the file.
> > "Custom File Source" went to "finished" state after first couple secs.
> > That's way we got this error message "Custom File Source (1/1) is not
> being
> > executed at the moment. Aborting checkpoint". Because the "Custom File
> > Source" finished already.
> >
> > Is this by design?  Although the "Custom File Source" finished in secs,
> the
> > rest of the pipeline can running for hours or days. Whenever anything
> went
> > wrong, the pipeline will restart and start to reading from the beginning
> > again, since there is not any checkpoint.
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>


Re: Missing MapState when Timer fires after restored state

2018-05-14 Thread Bowen Li
Hi Juho,

You are right, there's no transactional guarantee on timers and state in
processElement(). They may end up with inconsistency if your job was
cancelled in the middle of processing an element.

To avoid the situation, the best programming practice is to always check if
the state you're trying to get is null or not.

I've also created https://issues.apache.org/jira/browse/FLINK-9362 to
document this.

Thanks
Bowen



On Mon, May 14, 2018 at 4:00 AM, Juho Autio  wrote:

> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old
> state. After restoring state from a checkpoint, it seems like a timer had
> been restored, but not the data that was expected to be in a related
> MapState if such timer has been added.
>
> The way I see this is that there's a bug, either of these:
> - The writing of timers & map states to Flink state is not synchronized
> (or maybe there are no such guarantees by design?)
> - Flink may restore a checkpoint that is actually corrupted/incomplete
>
> Our code (simplified):
>
> private MapState mapState;
>
> public void processElement(..) {
> mapState.put("lastUpdated", ctx.timestamp().toString());
> ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
> stateRetentionMillis);
> }
>
> public void onTimer(long timestamp, OnTimerContext ctx, ..) {
> long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
> if (timestamp >= lastUpdated + stateRetentionMillis) {
> mapState.clear();
> }
> }
>
> Normally this "just works". As you can see, it shouldn't be possible that
> "lastUpdated" doesn't exist in state if timer was registered and onTimer
> gets called.
>
> However, after restoring state from a checkpoint, the job kept failing
> with this error:
>
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:552)
> at java.lang.Long.parseLong(Long.java:631)
> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.
> java:136)
> ..
>
> So apparently onTimer was called but lastUpdated wasn't found in the
> MapState.
>
> The background for restoring state in this case is not entirely clean.
> There was an OS level issue "Too many open files" after running a job for
> ~11 days. To fix that, we replaced the cluster with a new one and launched
> the Flink job again. State was successfully restored from the latest
> checkpoint that had been created by the "problematic execution". Now, I'm
> assuming that if the state wouldn't have been created successfully,
> restoring wouldn't succeed either – correct? This is just to rule out that
> the issue with state didn't happen because the checkpoint files were
> somehow corrupted due to the Too many open files problem.
>
> Thank you all for your continued support!
>
> P.S. I would be very much interested to hear if there's some cleaner way
> to achieve this kind of TTL for keyed state in Flink.
>


AvroInputFormat Serialisation Issue

2018-05-14 Thread Padarn Wilson
Hi all - sorry this seems like a silly question, but I can't figure it out.

I'm using an AvroInputFormat in order to read an Avro file like this:

val textInputFormat = new AvroInputFormat[GenericRecord](infile,
classOf[GenericRecord])
val lines = env.readFile(textInputFormat, path)

This works fine in local mode, but when submitted to a flink cluster I get
serialisation errors that look like this:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: org.apache.avro.Schema$StringSchema
Serialization trace:
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
elementType (org.apache.avro.Schema$ArraySchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at 
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
... 7 more
Caused by: java.lang.IllegalAccessException: Class
com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a
member of class org.apache.avro.Schema$StringSchema with modifiers
"public"
at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
at 

Re: Consumed input splits

2018-05-14 Thread Flavio Pompermaier
Anyone on this? Am I the only one that find this UI feature useful? :(

On Thu, Apr 26, 2018 at 2:54 PM, Flavio Pompermaier 
wrote:

> Hi to all,
> is there a way to see from the Flink UI the number of procesed splits o a
> source?
> For example, I'm reading data from a (batch) JDBC input and Flink creates
> 50 splits.
> However I don't know how many of them have been consumed and which one is
> waiting to be processed.
>
> Best,
> Flavio
>


Re: Consolidated log for a job?

2018-05-14 Thread Alexander Smirnov
Hi Alexey,

I know that Kibana(https://en.wikipedia.org/wiki/Kibana) can show logs from
different servers at one screen. May be this is what you are looking for

Alex

On Mon, May 14, 2018 at 5:17 PM NEKRASSOV, ALEXEI  wrote:

> Is there a way to see logs from multiple Task Managers **all in one place**
> (for a running or a completed job)? Or I need to check logs on each Task
> Manager individually?
>
>
>
> Thanks,
> Alex Nekrassov
>
>
>


Re: How to set fix JobId for my application.

2018-05-14 Thread shashank734
Thanks for suggestion now using Kafka for information sharing between apps.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Consolidated log for a job?

2018-05-14 Thread NEKRASSOV, ALEXEI
Is there a way to see logs from multiple Task Managers *all in one place* (for 
a running or a completed job)? Or I need to check logs on each Task Manager 
individually?

Thanks,
Alex Nekrassov



Re: Better way to clean up state when connect

2018-05-14 Thread Chengzhi Zhao
Hi Xingcan,

Thanks for your response, to give your more background about my use case, I
have Stream B with some split test name, and Stream A will be the actual
test. I want to have Stream A connect to Stream B to figure out whether
this test is still active or not. I am not sure this is the right way to
do: My watermark is based on event time for 15 mins, OnTimer will be emit
that records after 15 mins. I was wondering if there is way to purge the
state of entire Stream B so I can get all the active test, since the file
will include all the updated split testing name so I can refresh the lookup.

Also, I am not sure if I am using the right operator here, or if there is a
way to share variable globally so I can just perform filter on stream A.
Please let me know your thoughts and thanks for you suggestions again.

Regards,
Chengzhi

On Sat, May 12, 2018 at 8:55 PM, Xingcan Cui  wrote:

> Hi Chengzhi,
>
> you said the Stream B which comes from a file will be updated
> unpredictably. I wonder if you could share more about how to judge an item
> (from Stream A I suppose) is not in the file and what watermark generation
> strategy did you choose?
>
> Best,
> Xingcan
>
> > On May 12, 2018, at 12:48 AM, Chengzhi Zhao 
> wrote:
> >
> > Hi there,
> >
> > I have a use case to check for active ID, there are two streams and I
> connect them: one has actual data (Stream A) and the other one is for
> lookup purpose (Stream B), I am getting Stream B as a file which includes
> all active ID, so inactive ID would not be show up on this list. I tried to
> use watermark to clean up the state of inactivate ID, but the Stream B
> updates is unpredictable so I want to keep everything in state until I
> found the item is not in that file any more.
> >
> > Please suggest what is the best way to implement it in flink. Thanks in
> advance for your help.
> >
> > Regards,
> > Chengzhi
> >
> >
>
>


Re: A strange exception while consumption using a multi topic Kafka Connector

2018-05-14 Thread Vishal Santoshi
Yep, Thanks. We have a set up where topics are constantly being added on a
kakfa-to-hdfs pipeline.

On Sun, May 13, 2018 at 11:58 AM, Ted Yu  wrote:

> FLINK-9349 was logged.
>
> FYI
>
> On Sat, May 12, 2018 at 7:52 AM, Ted Yu  wrote:
>
>> I took a look at ./flink-connectors/flink-co
>> nnector-kafka-0.9/src/main/java/org/apache/flink/streaming/
>> connectors/kafka/internal/Kafka09Fetcher.java
>>
>> It seems the List subscribedPartitionStates was being modified
>> when runFetchLoop iterated the List.
>> This can happen if, e.g., FlinkKafkaConsumer runs the following code
>> concurrently:
>> kafkaFetcher.addDiscoveredPart
>> itions(discoveredPartitions);
>>
>> You can log a JIRA.
>> If you have unit test reproducing this, that would be great.
>>
>> FYI
>>
>> On Fri, May 11, 2018 at 5:15 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> java.util.ConcurrentModificationException
>>> at 
>>> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
>>> at java.util.LinkedList$ListItr.next(LinkedList.java:888)
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
>>>
>>> .
>>>
>>> .
>>>
>>> .
>>>
>>>
>>>
>>> This flink 1.4.0
>>>
>>> Any ideas folks ?
>>>
>>
>>
>


Async Source Function in Flink

2018-05-14 Thread Federico D'Ambrosio
 Hello everyone,

just wanted to ask a quick question: I have to retrieve data from 2 web
services via REST calls, use them as sources and push these data to Kafka.
So far, I implemented a SourceFunction which deals with making the calls
with the respective clients.

Now, the function does use, for each REST call, Await.result(). Do I
need to use Flink's AsyncFunction instead? What are the best practices when
it comes to AsyncSources?

Thank you,
-- 
Federico D'Ambrosio


Re: Taskmanager JVM crash

2018-05-14 Thread Stefan Richter
No, that problem I mentioned does not affect batch jobs. Must be something 
different then, but unfortunately the dump looks not very helpful to me because 
of the „error occurred during error reporting (printing native stack)“.

> Am 14.05.2018 um 14:26 schrieb Flavio Pompermaier :
> 
> My job is a batch one, not a streaming job. Is it possible that the cause is 
> the one you mentioned?
> 
> On Mon, 14 May 2018, 14:23 Stefan Richter,  > wrote:
> Hi,
> 
> that looks like a known issue where Flink did not wait for the shutdown of 
> the timer service before disposing state backends. This is problem fixed in 
> the >= 1.4 branches.
> 
> Best,
> Stefan 
> 
>> Am 14.05.2018 um 14:12 schrieb Flavio Pompermaier > >:
>> 
>> Hi to all,
>> I have a Flink 1.3.1 job that runs multiple times.
>> Everything goes well for some time (e.g. 10 jobs). Then, one or more TMs 
>> suddently die.
>> 
>> In the .out file I find something like this:
>> 
>> #
>> # A fatal error has been detected by the Java Runtime Environment:
>> #
>> #  SIGSEGV (0xb) at pc=0x7f6f3897712f, pid=18794, tid=140110535448320
>> #
>> # JRE version: Java(TM) SE Runtime Environment (8.0_72-b15) (build 
>> 1.8.0_72-b15)
>> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.72-b15 mixed mode 
>> linux-amd64 compressed oops)
>> # Problematic frame:
>> # C  [libc.so.6+0x7f12f]
>> #
>> # Failed to write core dump. Core dumps have been disabled. To enable core 
>> dumping, try "ulimit -c unlimited" before starting Java again
>> #
>> # An error report file with more information is saved as:
>> # /home/user/hs_err_pid18794.log
>> #
>> # If you would like to submit a bug report, please visit:
>> #   http://bugreport.java.com/bugreport/crash.jsp 
>> 
>> #
>> 
>> 
>> Attached the produced error report. Do you find anything useful?
>> I can even send you the job's jar with the data but it requires about 200 
>> MB..
>> 
>> Best,
>> Flavio
>> 
> 



Re: Taskmanager JVM crash

2018-05-14 Thread Flavio Pompermaier
My job is a batch one, not a streaming job. Is it possible that the cause
is the one you mentioned?

On Mon, 14 May 2018, 14:23 Stefan Richter, 
wrote:

> Hi,
>
> that looks like a known issue where Flink did not wait for the shutdown of
> the timer service before disposing state backends. This is problem fixed in
> the >= 1.4 branches.
>
> Best,
> Stefan
>
> Am 14.05.2018 um 14:12 schrieb Flavio Pompermaier :
>
> Hi to all,
> I have a Flink 1.3.1 job that runs multiple times.
> Everything goes well for some time (e.g. 10 jobs). Then, one or more TMs
> suddently die.
>
> In the .out file I find something like this:
>
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f6f3897712f, pid=18794, tid=140110535448320
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_72-b15) (build
> 1.8.0_72-b15)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.72-b15 mixed mode
> linux-amd64 compressed oops)
> # Problematic frame:
> # C  [libc.so.6+0x7f12f]
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # /home/user/hs_err_pid18794.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> #
>
>
> Attached the produced error report. Do you find anything useful?
> I can even send you the job's jar with the data but it requires about 200
> MB..
>
> Best,
> Flavio
> 
>
>
>


Re: Taskmanager JVM crash

2018-05-14 Thread Stefan Richter
Hi,

that looks like a known issue where Flink did not wait for the shutdown of the 
timer service before disposing state backends. This is problem fixed in the >= 
1.4 branches.

Best,
Stefan 

> Am 14.05.2018 um 14:12 schrieb Flavio Pompermaier :
> 
> Hi to all,
> I have a Flink 1.3.1 job that runs multiple times.
> Everything goes well for some time (e.g. 10 jobs). Then, one or more TMs 
> suddently die.
> 
> In the .out file I find something like this:
> 
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f6f3897712f, pid=18794, tid=140110535448320
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_72-b15) (build 
> 1.8.0_72-b15)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.72-b15 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # C  [libc.so.6+0x7f12f]
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # /home/user/hs_err_pid18794.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp 
> 
> #
> 
> 
> Attached the produced error report. Do you find anything useful?
> I can even send you the job's jar with the data but it requires about 200 MB..
> 
> Best,
> Flavio
> 



Taskmanager JVM crash

2018-05-14 Thread Flavio Pompermaier
Hi to all,
I have a Flink 1.3.1 job that runs multiple times.
Everything goes well for some time (e.g. 10 jobs). Then, one or more TMs
suddently die.

In the .out file I find something like this:

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7f6f3897712f, pid=18794, tid=140110535448320
#
# JRE version: Java(TM) SE Runtime Environment (8.0_72-b15) (build
1.8.0_72-b15)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.72-b15 mixed mode
linux-amd64 compressed oops)
# Problematic frame:
# C  [libc.so.6+0x7f12f]
#
# Failed to write core dump. Core dumps have been disabled. To enable core
dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /home/user/hs_err_pid18794.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#


Attached the produced error report. Do you find anything useful?
I can even send you the job's jar with the data but it requires about 200
MB..

Best,
Flavio
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7f6f3897712f, pid=18794, tid=140110535448320
#
# JRE version: Java(TM) SE Runtime Environment (8.0_72-b15) (build 1.8.0_72-b15)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.72-b15 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C  [libc.so.6+0x7f12f]
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#

---  T H R E A D  ---

Current thread (0x7f6e65021800):  Thread [stack: 0x7f6e06a1d000,0x7f6e06b1e000] [id=19384]

siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: 0x0028

Registers:
RAX=0x0010, RBX=0x7f6e64c87fd0, RCX=0x, RDX=0x
RSP=0x7f6e06b1c8d0, RBP=0x00f0, RSI=0x, RDI=0x7f6e6420
R8 =0x, R9 =0x7f6e65363a30, R10=0x0008, R11=0x0246
R12=0x7f6e6420, R13=0x7f6e64c880c0, R14=0x0130, R15=0x0001
RIP=0x7f6f3897712f, EFLAGS=0x00010246, CSGSFS=0x0033, ERR=0x0004
  TRAPNO=0x000e

Top of Stack: (sp=0x7f6e06b1c8d0)
0x7f6e06b1c8d0:   7f6f1b82d8e8 7f6e65021800
0x7f6e06b1c8e0:   0001 7f6e06b1ca30
0x7f6e06b1c8f0:   7f6e06b1c9e0 7f6e06b1c9a0
0x7f6e06b1c900:    7f6f38239b3f
0x7f6e06b1c910:   7f6f1b82d8e8 7f6e65021800
0x7f6e06b1c920:   7f6f1b82d8e8 7f6e64c87fe0
0x7f6e06b1c930:   7f6e06b1c9f0 7f6e65021800
0x7f6e06b1c940:   7f6f38890df8 00101000
0x7f6e06b1c950:   7f6ed8520a68 7f6f382326a0
0x7f6e06b1c960:   0004 7f6e182fc320
0x7f6e06b1c970:    7f6e182fc000
0x7f6e06b1c980:   03d8 7f6ecc5a9868
0x7f6e06b1c990:   7f6e06b1caf0 7f6e06b1cb10
0x7f6e06b1c9a0:   7f6e06b1cb80 7f6e182fc000
0x7f6e06b1c9b0:   7f6ecc5a9480 03d8
0x7f6e06b1c9c0:   7f6ecc5a9868 7f6f38381d2c
0x7f6e06b1c9d0:   7f6e06b1cb20 7f6e65021800
0x7f6e06b1c9e0:   7f6f38890df8 7f6e65363a30
0x7f6e06b1c9f0:   7f6e06b1ca40 7f6f383793e7
0x7f6e06b1ca00:   7f6e06b1ca40 
0x7f6e06b1ca10:   7f6e65021800 7f6e65021b20
0x7f6e06b1ca20:    7f6e65021800
0x7f6e06b1ca30:   03d8 7f6ed8520a68
0x7f6e06b1ca40:   7f6e06b1ca70 7f6f3837a452
0x7f6e06b1ca50:   7f6e65021800 7f6e06b1ca90
0x7f6e06b1ca60:   7f6e65021800 7f6ed8520680
0x7f6e06b1ca70:   7f6e06b1cb00 7f6f38381d2c
0x7f6e06b1ca80:   7f6e06a1d000 7f6ed8520690
0x7f6e06b1ca90:   7f6e65021800 7f6e65196b20
0x7f6e06b1caa0:   7f6e1801ddd0 7f6e1801dde0
0x7f6e06b1cab0:   7f6e1801deb8 00d8
0x7f6e06b1cac0:   7f6e651cc1c0 7f6f381ef55f 

Instructions: (pc=0x7f6f3897712f)
0x7f6f3897710f:   00 4d 39 6c 24 58 0f 84 32 04 00 00 43 f6 44 35
0x7f6f3897711f:   08 01 0f 85 b9 03 00 00 49 8b 45 10 49 8b 55 18
0x7f6f3897712f:   4c 3b 68 18 0f 85 59 06 00 00 4c 3b 6a 10 0f 85
0x7f6f3897713f:   4f 06 00 00 49 81 7d 08 ff 03 00 00 48 89 50 18 

Register to memory mapping:

RAX=0x0010 is an unknown value
RBX=0x7f6e64c87fd0 is an unknown value
RCX=0x is an unknown value
RDX=0x is an unknown value
RSP=0x7f6e06b1c8d0 is an unknown value
RBP=0x00f0 is an unknown value
RSI=0x is an unknown value
RDI=0x7f6e6420 is an unknown value
R8 

Missing MapState when Timer fires after restored state

2018-05-14 Thread Juho Autio
We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old
state. After restoring state from a checkpoint, it seems like a timer had
been restored, but not the data that was expected to be in a related
MapState if such timer has been added.

The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or
maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete

Our code (simplified):

private MapState mapState;

public void processElement(..) {
mapState.put("lastUpdated", ctx.timestamp().toString());
ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
stateRetentionMillis);
}

public void onTimer(long timestamp, OnTimerContext ctx, ..) {
long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
if (timestamp >= lastUpdated + stateRetentionMillis) {
mapState.clear();
}
}

Normally this "just works". As you can see, it shouldn't be possible that
"lastUpdated" doesn't exist in state if timer was registered and onTimer
gets called.

However, after restoring state from a checkpoint, the job kept failing with
this error:

Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..

So apparently onTimer was called but lastUpdated wasn't found in the
MapState.

The background for restoring state in this case is not entirely clean.
There was an OS level issue "Too many open files" after running a job for
~11 days. To fix that, we replaced the cluster with a new one and launched
the Flink job again. State was successfully restored from the latest
checkpoint that had been created by the "problematic execution". Now, I'm
assuming that if the state wouldn't have been created successfully,
restoring wouldn't succeed either – correct? This is just to rule out that
the issue with state didn't happen because the checkpoint files were
somehow corrupted due to the Too many open files problem.

Thank you all for your continued support!

P.S. I would be very much interested to hear if there's some cleaner way to
achieve this kind of TTL for keyed state in Flink.


org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer01

2018-05-14 Thread chandresh pancholi
Getting below error.

*Command:  * ./bin/flink run
~/workspace/thanos/thanos-stream/target/thanos-stream.jar --port 9000

It works well from IDE and java -jar command.

Cluster configuration: Standalone cluster with JobManager at localhost/
127.0.0.1:6123

Using address localhost:6123 to connect to JobManager.

JobManager web interface address http://localhost:8081

Starting execution of program


  .     ___ _ _

 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \

( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \

 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )

  '  || .__|_| |_|_| |_\__, | / / / /

 =|_|==|___/=/_/_/_/

 :: Spring Boot ::(v2.0.0.RELEASE)


Submitting job with JobID: 0facd9e9170ee455c5f6f921dd5e38e5. Waiting for
job completion.

Connected to JobManager at
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-952575443]
with leader session id ----.

05/14/2018 15:57:57 Job execution switched to status RUNNING.

05/14/2018 15:57:57 Source: Custom Source -> Sink: Unnamed(1/1) switched to
SCHEDULED

05/14/2018 15:57:57 Source: Custom Source -> Sink: Unnamed(1/1) switched to
DEPLOYING

05/14/2018 15:57:58 Source: Custom Source -> Sink: Unnamed(1/1) switched to
RUNNING

05/14/2018 15:57:58 Source: Custom Source -> Sink: Unnamed(1/1) switched to
FAILED

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

ClassLoader info: URL ClassLoader:

file:
'/var/folders/zz/zyxvpxvq6csfxvn_n0/T/blobStore-c7607b0c-7e6f-4be8-8099-8f60a5a9b715/job_0facd9e9170ee455c5f6f921dd5e38e5/blob_p-d8a4d4e1c5d4ac42dcc216fbfba190c56bcb4336-a3dcf774c0a128afb420bace431065f9'
(valid JAR)

Class not resolvable through given classloader.

at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

at java.lang.Thread.run(Thread.java:748)


05/14/2018 15:57:58 Job execution switched to status FAILING.

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

ClassLoader info: URL ClassLoader:

file:
'/var/folders/zz/zyxvpxvq6csfxvn_n0/T/blobStore-c7607b0c-7e6f-4be8-8099-8f60a5a9b715/job_0facd9e9170ee455c5f6f921dd5e38e5/blob_p-d8a4d4e1c5d4ac42dcc216fbfba190c56bcb4336-a3dcf774c0a128afb420bace431065f9'
(valid JAR)

Class not resolvable through given classloader.

at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

at java.lang.Thread.run(Thread.java:748)

05/14/2018 15:57:58 Job execution switched to status FAILED.

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)

at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)

at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)

at
com.ailiens.thanos.streams.LogmanFlinkStream.init(LogmanFlinkStream.java:49)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:369)

at
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:312)

at
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135)

at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:423)

at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1702)

at

Re: Best way to clean-up states in memory

2018-05-14 Thread Fabian Hueske
Hi Ashish,

Did you use per-window state (also called partitioned state) in your
Trigger?
If yes, you need to make sure that it is completely removed in the clear()
method because processing time timers won't fire once a window was purged.
So you cannot (fully) rely on timers to clean up per-window state.

Best, Fabian

2018-05-14 9:34 GMT+02:00 Kostas Kloudas :

> Hi Ashish,
>
> It would be helpful to share the code of your custom trigger for the first
> case.
> Without that, we cannot tell what state you create and how/when you
> update/clear it.
>
> Cheers,
> Kostas
>
> On May 14, 2018, at 1:04 AM, ashish pok  wrote:
>
> Hi Till,
>
> Thanks for getting back. I am sure that will fix the issue but I feel like
> that would potentially mask an issue. I have been going back and forth with
> Fabian on a use case where for some of our highly transient datasets, it
> might make sense to just use memory based state (except of course data loss
> becomes an issue when apps occasionally hit a problem and whole job
> restarts or app has to be taken down etc - ie. handling graceful shutdowns
> / restarts better essentially). I was on the hook to create a business case
> and post it back to this forum (which I am hoping I can get around to at
> some point soon). Long story short, this is one of those datasets.
>
> States in this case are either fired and cleared normally or on processing
> timeout. So technically, unless there is a memory leak in app code, memory
> usage should plateau out at a high-point. What I was noticing was memory
> would start to creep up ever so slowly.
>
> I couldn't tell exactly why heap utilization kept on growing (ever so
> slowly but it had upward trend for sure) because the states should
> technically be cleared if not as part of a reducing function then on
> timeout. App after running for couple of days would then run into Java Heap
> issues. So changing to RocksDB probably will fix the issue but not
> necessarily leak of states that should be cleared IMO. Interestingly, I
> switched my app from using something like this:
>
> WindowedStream windowedStats =
> statsStream
> .keyBy(BasicFactTuple::getKey)
> .window(GlobalWindows.create())
> .trigger(BitwiseOrTrigger.of(60, AppConfigs.getWindowSize(5*60*
> 1000)))
> ;
>
> To
>
>  DataStream processStats = pipStatsStream
> .keyBy(BasicFactTuple::getKey)
> .process(new 
> IfStatsReduceProcessFn(AppConfigs.getWindowSize(5*60*1000),
> 60))
>
> I basically moved logic of trigger to process function over the weekend.
> Once I did that, heap is completely stabilized. In trigger implementation,
> I was using FIRE_AND_PURGE on trigger condition or onProcessingTime and in
> process implementation I am using .clear() method for same.
>
> I seem to have solved the problem by using process but I'd be interested
> to understand the cause of why heap would creep up in trigger scenario.
>
> Hope this makes sense,
>
> Ashish
>
> On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann <
> till.rohrm...@gmail.com> wrote:
>
>
> Hi Ashish,
>
> have you tried using Flink's RocksDBStateBackend? If your job accumulates
> state exceeding the available main memory, then you have to use a state
> backend which can spill to disk. The RocksDBStateBackend offers you exactly
> this functionality.
>
> Cheers,
> Till
>
> On Mon, Apr 30, 2018 at 3:54 PM, ashish pok  wrote:
>
> All,
>
> I am using noticing heap utilization creeping up slowly in couple of apps
> which eventually lead to OOM issue. Apps only have 1 process function that
> cache state. I did make sure I have a clear method invoked when events are
> collected normally, on exception and on timeout.
>
> Are any other best practices others follow for memory backed states?
>
> Thanks,
>
> -- Ashish
>
>
>
>


Re: Late data before window end is even close

2018-05-14 Thread Fabian Hueske
Thanks for correcting me Piotr. I didn't look close enough at the code.
With the presently implemented logic, a record should not be emitted to a
side output if its window wasn't closed yet.

2018-05-11 14:13 GMT+02:00 Piotr Nowojski :

> Generally speaking best practise is always to simplify your program as
> much as possible to narrow down the scope of the search. Replace data
> source with statically generated events, remove unnecessary components Etc.
> Either such process help you figure out what’s wrong on your own and if
> not, if you share us such minimal program that reproduces the issue, it
> will allow  us to debug it.
>
> Piotrek
>
>
> On 11 May 2018, at 13:54, Juho Autio  wrote:
>
> Thanks for that code snippet, I should try it out to simulate my DAG.. If
> any suggestions how to debug futher what's causing late data on a
> production stream job, please let me know.
>
> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski 
> wrote:
>
>> Hey,
>>
>> Actually I think Fabian initial message was incorrect. As far as I can
>> see in the code of WindowOperator (last lines of org.apache.flink.streaming.
>> runtime.operators.windowing.WindowOperator#processElement ), the element
>> is sent to late side output if it is late AND it wasn’t assigned to any of
>> the existing windows (because they were late as well). In other words, it
>> should work as you Juho are wishing: element should be marked as late once
>> they are overdue/late for the window after one full day.
>>
>> I have tested it and it works as expected. Following program:
>>
>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>>
>> Prints only ONE number to the standard err:
>>
>> > 1394
>>
>> And there is nothing on the side output.
>>
>> Piotrek
>>
>> On 11 May 2018, at 12:32, Juho Autio  wrote:
>>
>> Thanks. What I still don't get is why my message got filtered in the
>> first place. Even if the allowed lateness filtering would be done "on the
>> window", data should not be dropped as late if it's not in fact late by
>> more than the allowedLateness setting.
>>
>> Assuming that these conditions hold:
>> - messages (and thus the extracted timestamps) were not out of order by
>> more than 5 secods (as far as I didn't make any mistake in my
>> partition-level analysis)
>> - allowedLateness=1 minute
>> - watermarks are assigned on kafka consumer meaning that they are
>> synchronized across all partitions
>>
>> I don't see how the watermark could have ever been more than 5 seconds
>> further when the message arrives on the isElementLate filter. Do you have
>> any idea on this? Is there some existing test that simulates out of order
>> input to flink's kafka consumer? I could try to build a test case based on
>> that to possibly reproduce my problem. I'm not sure how to gather enough
>> debug information on the production stream so that it would clearly show
>> the watermarks, how they progressed on each kafka partition & later in the
>> chain in case isElementLate filters something.
>>
>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske 
>> wrote:
>>
>>> Hi Juho,
>>>
>>> Thanks for bringing up this topic! I share your intuition.
>>> IMO, records should only be filtered out and send to a side output if
>>> any of the windows they would be assigned to is closed already.
>>>
>>> I had a look into the code and found that records are filtered out as
>>> late based on the following condition:
>>>
>>> protected boolean isElementLate(StreamRecord element){
>>>return (windowAssigner.isEventTime()) &&
>>>   (element.getTimestamp() + allowedLateness <=
>>> internalTimerService.currentWatermark());
>>> }
>>>
>>>
>>> This code shows that your analysis is correct.
>>> Records are filtered out based on their timestamp and the current
>>> watermark, even though they arrive before the window is closed.
>>>
>>> OTOH, filtering out records based on the window they would end up in can
>>> also be tricky if records are assigned to multiple windows (e.g., sliding
>>> windows).
>>> In this case, a side-outputted records could still be in some windows
>>> and not in others.
>>>
>>> @Aljoscha (CC) Might have an explanation for the current behavior.
>>>
>>> Thanks,
>>> Fabian
>>>
>>>
>>> 2018-05-11 10:55 GMT+02:00 Juho Autio :
>>>
 I don't understand why I'm getting some data discarded as late on my
 Flink stream job a long time before the window even closes.

 I can not be 100% sure, but to me it seems like the kafka consumer is
 basically causing the data to be dropped as "late", not the window. I
 didn't expect this to ever happen?

 I have a Flink stream job that gathers distinct values using a 24-hour
 window. It reads the data from Kafka, using a 
 BoundedOutOfOrdernessTimestampExtractor
 on the kafka consumer to synchronize watermarks accross all kafka
 

Flink FSStateBackend Checkpointing Buffer size

2018-05-14 Thread Chirag Dewan
Hi,
I am trying to use Gluster File System as my FileSystem backed by RocksDB as 
state backend. I can see from FsCheckpointStateOutputStream that the 
DEFAULT_WRITE_BUFFER_SIZE = 4096.
Is the buffer size configurable in any way? Any idea about the checkpointing 
performance with default buffer size? 
Thanks,
Chirag

Re: How to broadcast messages to all task manager instances in cluster?

2018-05-14 Thread Piotr Nowojski
Hi,

Thanks for the clarification. This might be though. Generally speaking having 
such static configuration shared across multiple operators/functions can pose 
lots of different problems including synchronisation, fault tolerance etc. 

To be honest you should treat such thing almost like an external system that 
has an external state, because from Flink’s perspective that’s exactly what it 
is - it’s an equivalent to having an external “configuration service” 
hosted/stored somewhere outside of Flink. With it you have to manually take 
care of fault tolerance (especially it’s state), since it’s outside of Flink’s 
control. Especially think about what should happen to your static configuration 
if one of your machine fails/restarts, and Flink chooses to restart only part 
of the job graph (possible one, many or all of the operators). How will your 
static configuration be kept in sync across all of the Task Managers in that 
case?

It would be easier if you could restructure your job/problem and replace such 
static configuration with a configuration stored in the Flink’s state (maybe in 
one operator? Or on parallel instances of one task?). Otherwise to make it 
fully reliable I think you would need to write quite a lot of code on your own. 

Alternatively you can consider using some third party systems for storing a 
configuration like Apache ZooKeeper.

Piotrek

> On 13 May 2018, at 10:38, Di Tang  wrote:
> 
> Thanks Piotr for the response. I have many data streams dependant on the 
> configuration by getting value from static variables in a class. The way the 
> configuration change works is to change the static variables' value in the 
> class. Since each task manager only has one JVM process, as long as the 
> message is broadcast to each task manager, the data streams will see the 
> change. The logic in data streams is quite simple, just get some parameters 
> from the static variable. So I think to add connect and flatmap to each of 
> them is too verbose. I am wondering is there any better way to express.
> 
> Piotr Nowojski > 于 
> 2018年5月11日周五 下午7:31写道:
> Hi,
> 
> I don’t quite understand your problem. If you broadcast message as an input 
> to your operator that depends on this configuration, each instance of your 
> operator will receive this configuration. It shouldn't matter whether Flink 
> scheduled your operator on one, some or all of the TaskManagers. It only 
> should matter if operators running your configuration sensitive code receive 
> the broadcasted message.
> 
> 
> DataStream<> input = xxx;
> DataStream<> controlConfigInput = yyy;
> 
> DataStream<> data = input.
>   .do()
>   .something()
>   .fancy();
> 
> controlConfigInput.broadcast()
>   .connect(data)
>   .flatMap(new MyFancyOperatorThatDependsOnConfigStream())
> 
> Or slide 36 from here: 
> https://www.slideshare.net/dataArtisans/apache-flink-datastream-api-basics 
> 
> 
> Piotrek
> 
>> On 11 May 2018, at 11:11, Di Tang > > wrote:
>> 
>> Hi guys:
>> 
>> I have a Flink job which contains multiple pipelines. Each pipeline depends 
>> on some configuration. I want to make the configuration dynamic and 
>> effective after change so I created a data source which periodically poll 
>> the database storing the configuration. However, how can I broadcast the 
>> events to all task manager instances?  The datastream.broadcast() only 
>> applies to the parallel instances of operator. And I don't want to connect 
>> the configuration data source to each pipeline because it is too verbose. If 
>> Flink cannot explicitly broadcast messages to task managers, is there any 
>> method to guarantee the parallel operator is distributed on all task 
>> managers?
>> 
>> Thanks,
>> Di
> 



Re: Batch writing from Flink streaming job

2018-05-14 Thread Fabian Hueske
Hi,

Avro provides schema for data and can be used to serialize individual
records in a binary format.
It does not compress the data (although this can be put on top) but is more
space efficient due to the binary serialization.

I think you can implement a Writer for the BucketingSink that writes
records encoded as Avro.

Writing formats such as Parquet or ORC that group records in batches and
write them compressed in columnar layout are a different story and not
supported by BucketingSink.
However, there are some plans to support these formats in the future as
well.
Avro is different than Parquet and ORC because it writes individual records
and not batches of records.

Best, Fabian

2018-05-13 14:09 GMT+02:00 Jörn Franke :

> If you want to write in batches from a streaming source you always will
> need some state ie a state database (here a NoSQL database such as a key
> value store makes sense). Then you can grab the data at certain points in
> time and convert it to Avro. You need to make sure that the state is
> logically consistent (eg all from the last day) to avoid that events arrive
> later then expected and they are not in the files.
>
> You can write your own sink, but it would require some state database  to
> write the data afterwards as batch.
>
> Maybe this could be a generic Flink component, ie writing to a state
> database to later write a logical consistent (ok this is defined by the
> application) state into other sinks (CSVsink, avrosink etc).
>
> > On 13. May 2018, at 14:02, Padarn Wilson  wrote:
> >
> > Hi all,
> >
> > I am writing some some jobs intended to run using the DataStream API
> using a Kafka source. However we also have a lot of data in Avro archives
> (of the same Kafka source). I would like to be able to run the processing
> code over parts of the archive so I can generate some "example output".
> >
> > I've written the transformations needed to read the data from the
> archives and process the data, but now I'm trying to figure out the best
> way to write the results of this to some storage.
> >
> > At the moment I can easily write to Json or CSV using the bucketing sink
> (although I'm curious about using the watermark time rather than system
> time to name the buckets), but I'd really like to store to something
> smaller like Avro.
> >
> > However I'm not sure this make sense. Writing to a compressed file
> format in this way from a streaming job doesn't sound intuitively right.
> What would make the most sense. I could write to some temporary database
> and then pipe that into an archive, but this seems like a lot of trouble.
> Is there a way to pipe the output directly into the batch API of flink?
> >
> > Thanks
>


Re: Default zookeeper

2018-05-14 Thread Ufuk Celebi
No, there is no difference if the version in your distro is part of
the ZooKeeper 3.4.x series. The script is there for convenience during
local testing/dev.

– Ufuk


On Sun, May 13, 2018 at 3:49 PM, miki haiat  wrote:
> When downloading the the flink source in order to run it local thire is a
> zookeper script and start-zookeeper-quorum script .
>
> Is thire any difference between the default zookeeper installation lets say
> in Ubuntu and the zookeeper that come with flink ?
>
> thanks,
>
> MIki
>


Re: Best way to clean-up states in memory

2018-05-14 Thread Kostas Kloudas
Hi Ashish,

It would be helpful to share the code of your custom trigger for the first case.
Without that, we cannot tell what state you create and how/when you 
update/clear it.

Cheers,
Kostas

> On May 14, 2018, at 1:04 AM, ashish pok  wrote:
> 
> Hi Till,
> 
> Thanks for getting back. I am sure that will fix the issue but I feel like 
> that would potentially mask an issue. I have been going back and forth with 
> Fabian on a use case where for some of our highly transient datasets, it 
> might make sense to just use memory based state (except of course data loss 
> becomes an issue when apps occasionally hit a problem and whole job restarts 
> or app has to be taken down etc - ie. handling graceful shutdowns / restarts 
> better essentially). I was on the hook to create a business case and post it 
> back to this forum (which I am hoping I can get around to at some point 
> soon). Long story short, this is one of those datasets. 
> 
> States in this case are either fired and cleared normally or on processing 
> timeout. So technically, unless there is a memory leak in app code, memory 
> usage should plateau out at a high-point. What I was noticing was memory 
> would start to creep up ever so slowly. 
> 
> I couldn't tell exactly why heap utilization kept on growing (ever so slowly 
> but it had upward trend for sure) because the states should technically be 
> cleared if not as part of a reducing function then on timeout. App after 
> running for couple of days would then run into Java Heap issues. So changing 
> to RocksDB probably will fix the issue but not necessarily leak of states 
> that should be cleared IMO. Interestingly, I switched my app from using 
> something like this:
> 
> WindowedStream windowedStats = 
> statsStream
>   .keyBy(BasicFactTuple::getKey)
>   .window(GlobalWindows.create())
>   .trigger(BitwiseOrTrigger.of(60, 
> AppConfigs.getWindowSize(5*60*1000)))
>   ;
> 
> To 
> 
>  DataStream processStats = pipStatsStream
>   .keyBy(BasicFactTuple::getKey)
>   .process(new 
> IfStatsReduceProcessFn(AppConfigs.getWindowSize(5*60*1000), 60))
> 
> I basically moved logic of trigger to process function over the weekend. Once 
> I did that, heap is completely stabilized. In trigger implementation, I was 
> using FIRE_AND_PURGE on trigger condition or onProcessingTime and in process 
> implementation I am using .clear() method for same. 
> 
> I seem to have solved the problem by using process but I'd be interested to 
> understand the cause of why heap would creep up in trigger scenario. 
> 
> Hope this makes sense,
> 
> Ashish
> 
> On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann 
>  wrote:
> 
> 
> Hi Ashish,
> 
> have you tried using Flink's RocksDBStateBackend? If your job accumulates 
> state exceeding the available main memory, then you have to use a state 
> backend which can spill to disk. The RocksDBStateBackend offers you exactly 
> this functionality.
> 
> Cheers,
> Till
> 
> On Mon, Apr 30, 2018 at 3:54 PM, ashish pok  > wrote:
> All,
> 
> I am using noticing heap utilization creeping up slowly in couple of apps 
> which eventually lead to OOM issue. Apps only have 1 process function that 
> cache state. I did make sure I have a clear method invoked when events are 
> collected normally, on exception and on timeout.
> 
> Are any other best practices others follow for memory backed states?
> 
> Thanks,
> 
> -- Ashish
>