Restart from checkpoint after program failure

2018-10-17 Thread chrisr123
Hi Folks,
I'm trying to restart my program with restored state from a checkpoint after
a program failure (restart strategies tried but exhausted), but I'm not
picking up the restored state. What am I doing wrong here?  

*Summary*
I'm using a very simple app on 1 node just to learn checkpointing.
App reads from a socket stream and I deliberately send in some "bad" data to
throw an Exception using netcat (nc) as source. App uses  a simple file URL
as checkpoint backend. 

*Checkpoint Backend*
// specified in program:
env.setStateBackend((StateBackend)new
FsStateBackend("file:///home/hadoop/flink/checkpoints/"));

For restart strategy, I specify 3 attempts with 5 second delay between
attempts
// specified in program:
int restartAttempts = 3;
int restartDelaySeconds = 5;
long delayBetweenRestarts = restartDelaySeconds*1000;
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
delayBetweenRestarts));

*Checkpoint Backend*
*App Logic:*
All the application does is parse each line as key,Integer pair and outputs
accumulated sum to stdout. (See below)
If I start up nc -l  and type values like this it works fine:
key1,5
key1,3
key1,4

However if I type in "junk" the program throws Exception trying to parse
'junk' as an Integer 
key1,junk

When the application fails, nc also stops. If I start nc before all 3
restart attempts have been tried, everything is fine and the program
restarts, picking up state where it left off.

So after all the restarts have been tried and failed, I want to restart my
program manually and pick up where I left off. Since I am specifying
checkpoint backend in program , I thought it would just pick it up from
there. Then I tried passing in the backend using the -s parameter to my
program but that doesnot work either:

flink -c   -s c:\home\hadoop\flink\checkpoints 




*App Source:*
public class ComputeSumFaultTolerant {

public static void main(String[] args) throws Exception {   

// Execution Environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parms = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parms);
String host = "localhost";
int port = ;

System.out.println("ComputeSumFaultTolerant BEGIN");

// Setup Checkpoint and Retry
String checkpointBackendURL = 
"file:///home/hadoop/flink/checkpoints/";
Utils.configureCheckpoint(env,checkpointBackendURL);
Utils.configureRestartFixedDelay(env);

// Get Our Raw Data Stream
DataStream> eventStream = env
.socketTextStream(host, port)
.map(new MessageParser())
.keyBy(0)
.sum(1);
eventStream.print();

// Execute
env.execute("ComputeSumFaultTolerant");
}

private static class MessageParser implements
MapFunction> {
public Tuple2 map(String input) throws Exception {
String[] tokens = input.toLowerCase().split(",");
String key = tokens[0];
Long value = Long.valueOf(tokens[1]);
return new Tuple2(key,value);
}
}


}

public class Utils

public static void configureCheckpoint(StreamExecutionEnvironment env,
String checkpointBackend) throws Exception {
// Set Up Checkpoints
env.enableCheckpointing(5000L);

// set mode to exactly-once (this is the default)

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);

// checkpoints have to complete within one minute, or are 
discarded
env.getCheckpointConfig().setCheckpointTimeout(1);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// Checkpoint Back-end
env.setStateBackend((StateBackend)new 
FsStateBackend(checkpointBackend));

System.out.println("CHECKPOINT IS EXTERNALIZED");

env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

System.out.println("External enabled=

Re: State Recovery when job fails and auto-recovers

2018-10-17 Thread Hequn Cheng
Hi Sameer,

In case of a failure, the job will restarts the operators and resets them
to the latest successful checkpoint. So if you turn off checkpoints, all
data will be lost.
Generally speaking, snapshots are very light-weight and can be drawn
frequently without much impact on performance. If it do affect performance
of your job and you don't want to lose all of your state, you can try to
increase the checkpoint interval.

> // start a checkpoint every 60 ms (10min)
> env.enableCheckpointing(60);


Best, Hequn

On Thu, Oct 18, 2018 at 7:19 AM Sameer Wadkar  wrote:

> Hi,
>
> We have a job which is using ValueState. We have turned off checkpoints.
> The state is backed by rocksdb which is backed by S3.
>
>  If the job fails for any exception (ex. Partitions not available or an
> occasional S3 404 error) and auto-recovers, is the entire state lost or
> does it continue from the last saved state. We see that the job has the
> same identifier. We don’t mind losing data during the small interval when
> the job is recovering. But because we are using ValueState as a custom
> global window to accumulate state for a key over a 3 hour window we don’t
> want to lose all of it.
>
> Checkpointing is not an option because it takes longer per checkpoint and
> the state is huge.
>
> Thanks,
> Sameer
>
> Sent from my iPhone


State Recovery when job fails and auto-recovers

2018-10-17 Thread Sameer Wadkar
Hi,

We have a job which is using ValueState. We have turned off checkpoints. The 
state is backed by rocksdb which is backed by S3. 

 If the job fails for any exception (ex. Partitions not available or an 
occasional S3 404 error) and auto-recovers, is the entire state lost or does it 
continue from the last saved state. We see that the job has the same 
identifier. We don’t mind losing data during the small interval when the job is 
recovering. But because we are using ValueState as a custom global window to 
accumulate state for a key over a 3 hour window we don’t want to lose all of 
it. 

Checkpointing is not an option because it takes longer per checkpoint and the 
state is huge. 

Thanks,
Sameer

Sent from my iPhone

Re: How do I initialize the window state on first run?

2018-10-17 Thread Rafi Aroch
Hi Jiayi,

This topic has been discussed by others, take a look here for some options
by Lyft: https://youtu.be/WdMcyN5QZZQ

Rafi

On Fri, Oct 12, 2018, 16:51 bupt_ljy  wrote:

> Yes…that’s an option, but it’ll be very complicated because of our storage
> and business.
>
> Now I’m trying to write an handler like the “KvStateHandler” so that I can
> access(read/write) the state from my client.
>
>  Original Message
> *Sender:* Congxian Qiu
> *Recipient:* bupt_ljy
> *Cc:* yanghua1127; user
> *Date:* Friday, Oct 12, 2018 20:14
> *Subject:* Re: How do I initialize the window state on first run?
>
> IIUC, we can't  initialize state at first run,  maybe you could store the
> aggregated data in another place other than use flink's state, then use
> flink to aggregate the data realtime.
>
> bupt_ljy  于2018年10月12日周五 下午3:33写道:
>
>> Hi, vivo,
>>
>> My Flink program is to aggregate the data of a whole day, assume we
>> start this program on 6:00 am, the default state in the window should be
>> the aggregated result of 0:00 am to 6:00 am.
>>
>>  Original Message
>> *Sender:* vino yang
>> *Recipient:* bupt_ljy
>> *Cc:* user
>> *Date:* Friday, Oct 12, 2018 15:13
>> *Subject:* Re: How do I initialize the window state on first run?
>>
>> Hi Jiayi,
>>
>> If you don't mind, I would like to ask you what kind of situation do you
>> have in this situation?
>>
>> Thanks, vino.
>>
>> bupt_ljy  于2018年10月12日周五 下午1:59写道:
>>
>>> Hi,
>>>
>>>I’m going to run a new Flink program with some initialized window
>>> states.
>>>
>>>I can’t see there is an official way to do this, right? I’ve tried
>>> the bravo project, but it doesn’t support FsStateBackend and it costs too
>>> much work if we add a new StateBackend in it.
>>>
>>>Any good ideas about this?
>>>
>>>
>>>
>>>
>>> Jiayi Liao,Best
>>>
>>>
>>>
>>
>
> --
> Blog:http://www.klion26.com
> GTalk:qcx978132955
> 一切随心
>


Re: Take RocksDB state dump

2018-10-17 Thread Gyula Fóra
Hi,

If you dont mind a little trying out stuff I have some nice tooling for
exactly this:

https://github.com/king/bravo

Let me know if it works :)

Gyula

Harshvardhan Agrawal  ezt írta (időpont:
2018. okt. 17., Sze, 21:50):

> Hello,
>
> We are currently using a RocksDBStateBackend for our Flink pipeline. We
> want to analyze the data that is stored in Rocksdb state.Is  there a
> recommended process to do that? The sst_dump tool available from RocksDB
> isn’t working for us and we keep on getting errors like “Snappy not
> supported or corrupted Snappy compressed block contents”. My thought was
> that it might be happening since I am trying to take a dump while the Flink
> pipeline is running. Upon cancelling the pipeline all the state was removed
> and I didn’t have any sst files to look at. I was wondering how have people
> approached this problem.
>
> Regards,
> Harsh
>
>
> --
> Regards,Harshvardhan
>


Take RocksDB state dump

2018-10-17 Thread Harshvardhan Agrawal
Hello,

We are currently using a RocksDBStateBackend for our Flink pipeline. We
want to analyze the data that is stored in Rocksdb state.Is  there a
recommended process to do that? The sst_dump tool available from RocksDB
isn’t working for us and we keep on getting errors like “Snappy not
supported or corrupted Snappy compressed block contents”. My thought was
that it might be happening since I am trying to take a dump while the Flink
pipeline is running. Upon cancelling the pipeline all the state was removed
and I didn’t have any sst files to look at. I was wondering how have people
approached this problem.

Regards,
Harsh


-- 
Regards,Harshvardhan


[ANNOUNCE] Weekly community update #42

2018-10-17 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #42. Please post any news and
updates you want to share with the community to this thread.

# Discussion about Flink SQL integration with Hive

Xuefu started a discussion about how to integrate Flink SQL with the Hive
ecosystem [1]. If that's of your interest, then please join the discussion.

# Flink intro slide set

Fabian started to prepare a Flink intro slide set for the community [2]. If
you want to help with the preparation of the slide set, reach out to him.

# Discussion how to share state between tasks

Thomas started a discussion about how to share state/information between
multiple tasks of an operator [3]. That way it would be possible to control
the ingestion rate of sources depending how advanced they are wrt event
time, for example.

# Releasing Flink 1.5.5 and 1.6.2

Chesnay kicked off a discussion about releasing the next bug fix releases
for Flink 1.5 and 1.6 [4]. He is currently creating the RCs which will be
published on the ML for testing soon. Please help the community with
testing them.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Integrate-Flink-SQL-well-with-Hive-ecosystem-td24538.html#a24568
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Creating-a-slide-set-for-a-Flink-intro-talk-td24605.html#a24643
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-1-5-5-and-1-6-2-td24626.html

Cheers,
Till


Re: Not all files are processed? Stream source with ContinuousFileMonitoringFunction

2018-10-17 Thread Juan Miguel Cejuela
Update:

not 100% sure, but I think I fixed my bug. This is what I did:

I reduced (quite a lot) the maximum number of parallel operations in my
`AsyncDataStream`. I had set it initially to 1000. The default of 100 did
not work for me either. But somehow when I set the value to 10, everything
is working fine now.

```
AsyncDataStream.unorderedWait(dataSource, new AsyncProcessing(), 5,
TimeUnit.MINUTES, 10)
```

Perhaps too much memory was used at once and therefore some files were
discarded? Don't know, but hopefully my solutions throws some clues to
other people in the future.

On Sat, 13 Oct 2018 at 12:48 Juan Miguel Cejuela  wrote:

> I’m using both a local (Unix) file system and hdfs.
>
> I’m going to check those to get ideas, thank you!
>
> I’m also checking the internal code of the class and my own older patch
> code.
> On Fri 12. Oct 2018 at 21:32, Fabian Hueske  wrote:
>
>> Hi,
>>
>> Which file system are you reading from? If you are reading from S3, this
>> might be cause by S3's eventual consistency property.
>> Have a look at FLINK-9940 [1] for a more detailed discussion.
>> There is also an open PR [2], that you could try to patch the source
>> operator with.
>>
>> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9940
>> [2] https://github.com/apache/flink/pull/6613
>>
>> Am Fr., 12. Okt. 2018 um 20:41 Uhr schrieb Juan Miguel Cejuela <
>> jua...@tagtog.net>:
>>
>>> Dear flinksters,
>>>
>>>
>>> I'm using the class `ContinuousFileMonitoringFunction` as a source to
>>> monitor a folder for new incoming files.* I have the problem that not
>>> all the files that are sent to the folder get processed / triggered by the
>>> function*. Specific details of my workflow is that I send up to 1k
>>> files per minute, and that I consume the stream as a `AsyncDataStream`.
>>>
>>> I myself raised an unrelated issue with the
>>> `ContinuousFileMonitoringFunction` class some time ago (
>>> https://issues.apache.org/jira/browse/FLINK-8046): if two or more files
>>> shared the very same timestamp, only the first one (non-deterministically
>>> chosen) would be processed. However, I patched the file myself to fix that
>>> problem by using a LinkedHashMap to remember which files had been really
>>> processed before or not. My patch is working fine as far as I can tell.
>>>
>>> The problem seems to be rather that some files (when many are sent at
>>> once to the same folder) do not even get triggered/activated/registered by
>>> the class.
>>>
>>>
>>> Am I properly explaining my problem?
>>>
>>>
>>> Any hints to solve this challenge would be greatly appreciated ! ❤ THANK
>>> YOU
>>>
>>> --
>>> Juanmi, CEO and co-founder @ 🍃tagtog.net
>>>
>>> Follow tagtog updates on 🐦 Twitter: @tagtog_net
>>> 
>>>
>>> --
> Juanmi, CEO and co-founder @ 🍃tagtog.net
>
> Follow tagtog updates on 🐦 Twitter: @tagtog_net
> 
>
> --
Juanmi, CEO and co-founder @ 🍃tagtog.net

Follow tagtog updates on 🐦 Twitter: @tagtog_net



Re: Manual trigger the window in fold operator or incremental aggregation

2018-10-17 Thread Niels van Kaam
Sorry, I would not know that. I have worked with custom triggers, but
haven't actually had to implement a custom window function yet.

By looking at the interfaces I would not say that is possible.

Niels

On Wed, Oct 17, 2018 at 2:18 PM Ahmad Hassan  wrote:

> Hi Niels,
>
> Can we distinguish within apply function of 'RichWindowFunction' whether
> it was called due to onElement trigger call or onProcessingtime trigger
> call of a custom Trigger ?
>
> Thanks!
>
> On Wed, 17 Oct 2018 at 12:51, Niels van Kaam  wrote:
>
>> Hi Zhen Li,
>>
>> You can control when a windowed stream emits data with "Triggers". See:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>>
>> Flink comes with a couple of default triggers, but you can also create
>> your own by implementing
>> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.html
>> .
>>
>> Note that this does not change the window, but just causes the
>> windowedstream to emit intermediate results to the next operator.
>>
>> Does this answer your question?
>>
>> Cheers,
>> Niels
>>
>> On Wed, Oct 17, 2018 at 12:34 PM zhen li  wrote:
>>
>>> Hi all:
>>> How can I trigger the window manually in  fold operator or
>>> incremental aggregation? For example, when some conditions is meet,althouth
>>> the count window or time window is not reach
>>
>>


Re: IndexOutOfBoundsException on deserialization after updating to 1.6.1

2018-10-17 Thread Bruno Aranda
Hi,

Thanks for your reply. We are still trying to isolate it, because this job
was using a more complex state. I think it is caused by a case class that
has an Option[MyOtherClass], and MyOtherClass is an enumerator, implemented
using the enumeratum library. I have changed that option to be just a
Option[Boolean], and the failure seems not to happen anymore.

We may continue with the Boolean for now, I guess though this was not a
problem in an earlier Flink version, possible Kryo change?

Cheers,

Bruno

On Wed, 17 Oct 2018 at 15:40 aitozi  wrote:

> Hi,Bruno Aranda
>
> Could you provide an complete example to reproduce the exception?
>
> Thanks,
> Aitozi
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Either bug in the flink, or out of date documentation ( flink 1.6 , cancel API rest endpoint )

2018-10-17 Thread Chesnay Schepler
The section you're looking at is the legacy documentation which only 
applies if the cluster is running in legacy mode.


You want to look at the "Dispatcher" section 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#dispatcher), 
which documents the PATCH operation.


On 17.10.2018 15:40, Barisa Obradovic wrote:

In the flink documenation, to cancel the job, request should be made to

DELETE request to /jobs/:jobid/cancel
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#job-cancellation

However, when I run this command, I get 404 back from the jobmanager.

After reading the source code, I can see that clojure client uses PATCH
operation instead, and doesn't include `cancel` suffix in the URL:
https://github.com/apache/flink/blob/ad79e6953cdbd9571f0605810b7d5f42749b38f4/flink-jepsen/test/jepsen/flink/client_test.clj#L48
Also, the job termination headers say PATCH,
https://github.com/apache/flink/blob/824a823ba4ca6f29075a187ed1a3a301089f1a99/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationHeaders.java#L59

Locally testing, I can see that
```
curl -k -v -X PATCH
'https://localhost:6125/jobs/088a37f04e867e5dd0cde558f31dd02b'
``` Gives me 202, for success.

So, does this mean that documentation is out of date? If so, I'm happy to
update it, just wanted to check here I didn't miss anything




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: IndexOutOfBoundsException on deserialization after updating to 1.6.1

2018-10-17 Thread aitozi
Hi,Bruno Aranda

Could you provide an complete example to reproduce the exception?

Thanks,
Aitozi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Either bug in the flink, or out of date documentation ( flink 1.6 , cancel API rest endpoint )

2018-10-17 Thread Barisa Obradovic
In the flink documenation, to cancel the job, request should be made to

DELETE request to /jobs/:jobid/cancel
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#job-cancellation

However, when I run this command, I get 404 back from the jobmanager.

After reading the source code, I can see that clojure client uses PATCH
operation instead, and doesn't include `cancel` suffix in the URL:
https://github.com/apache/flink/blob/ad79e6953cdbd9571f0605810b7d5f42749b38f4/flink-jepsen/test/jepsen/flink/client_test.clj#L48
Also, the job termination headers say PATCH,
https://github.com/apache/flink/blob/824a823ba4ca6f29075a187ed1a3a301089f1a99/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationHeaders.java#L59

Locally testing, I can see that 
```
curl -k -v -X PATCH
'https://localhost:6125/jobs/088a37f04e867e5dd0cde558f31dd02b'
``` Gives me 202, for success.

So, does this mean that documentation is out of date? If so, I'm happy to
update it, just wanted to check here I didn't miss anything




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Manual trigger the window in fold operator or incremental aggregation

2018-10-17 Thread Ahmad Hassan
Hi Niels,

Can we distinguish within apply function of 'RichWindowFunction' whether it
was called due to onElement trigger call or onProcessingtime trigger call
of a custom Trigger ?

Thanks!

On Wed, 17 Oct 2018 at 12:51, Niels van Kaam  wrote:

> Hi Zhen Li,
>
> You can control when a windowed stream emits data with "Triggers". See:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>
> Flink comes with a couple of default triggers, but you can also create
> your own by implementing
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.html
> .
>
> Note that this does not change the window, but just causes the
> windowedstream to emit intermediate results to the next operator.
>
> Does this answer your question?
>
> Cheers,
> Niels
>
> On Wed, Oct 17, 2018 at 12:34 PM zhen li  wrote:
>
>> Hi all:
>> How can I trigger the window manually in  fold operator or
>> incremental aggregation? For example, when some conditions is meet,althouth
>> the count window or time window is not reach
>
>


Re: Checkpointing when one of the sources has completed

2018-10-17 Thread Paul Lam
Hi Niels,

The link was broken, it should be  
https://issues.apache.org/jira/browse/FLINK-2491 
.

A similar question was asked a few days ago.

Best,
Paul Lam


> 在 2018年10月17日,19:56,Niels van Kaam  写道:
> 
> Hi All,
> 
> Thanks for the responses, the finished source explains my issue then. I can 
> work around the problem by letting my sources negotiate a "final" checkpoint 
> via zookeeper.
> 
> @Paul, I think your answer was meant for the earlier question asked by Joshua?
> 
> Cheers,
> Niels
> 
> On Wed, Oct 17, 2018 at 11:15 AM Joshua Fan  > wrote:
> Hi Niels,
> 
> Probably not, an operator begins to do checkpoint until it gets all the 
> barriers from all the upstream sources, if one source can not send a barrier, 
> the downstream operator can not do checkpoint, FYI.
> 
> Yours sincerely
> Joshua
> 
> On Wed, Oct 17, 2018 at 4:58 PM Niels van Kaam  > wrote:
> Hi All,
> 
> I am debugging an issue where the periodic checkpointing has halted. I 
> noticed that one of the sources of my job has completed (finished). The other 
> sources and operators would however still be able to produce output.
> 
> Does anyone know if Flink's periodic checkpoints are supposed to continue 
> when one or more sources of a job are in the "FINISHED" state?
> 
> Cheers,
> Niels
> 



Re: Checkpointing when one of the sources has completed

2018-10-17 Thread Niels van Kaam
Hi All,

Thanks for the responses, the finished source explains my issue then. I can
work around the problem by letting my sources negotiate a "final"
checkpoint via zookeeper.

@Paul, I think your answer was meant for the earlier question asked by
Joshua?

Cheers,
Niels

On Wed, Oct 17, 2018 at 11:15 AM Joshua Fan  wrote:

> Hi Niels,
>
> Probably not, an operator begins to do checkpoint until it gets all the
> barriers from all the upstream sources, if one source can not send a
> barrier, the downstream operator can not do checkpoint, FYI.
>
> Yours sincerely
> Joshua
>
> On Wed, Oct 17, 2018 at 4:58 PM Niels van Kaam  wrote:
>
>> Hi All,
>>
>> I am debugging an issue where the periodic checkpointing has halted. I
>> noticed that one of the sources of my job has completed (finished). The
>> other sources and operators would however still be able to produce output.
>>
>> Does anyone know if Flink's periodic checkpoints are supposed to continue
>> when one or more sources of a job are in the "FINISHED" state?
>>
>> Cheers,
>> Niels
>>
>>


Re: Manual trigger the window in fold operator or incremental aggregation

2018-10-17 Thread Niels van Kaam
Hi Zhen Li,

You can control when a windowed stream emits data with "Triggers". See:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers

Flink comes with a couple of default triggers, but you can also create your
own by implementing
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.html
.

Note that this does not change the window, but just causes the
windowedstream to emit intermediate results to the next operator.

Does this answer your question?

Cheers,
Niels

On Wed, Oct 17, 2018 at 12:34 PM zhen li  wrote:

> Hi all:
> How can I trigger the window manually in  fold operator or incremental
> aggregation? For example, when some conditions is meet,althouth the count
> window or time window is not reach


IndexOutOfBoundsException on deserialization after updating to 1.6.1

2018-10-17 Thread Bruno Aranda
Hi,

We are trying to update from 1.3.2 to 1.6.1, but one of our jobs keeps
throwing an exception during deserialization:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
at
org.apache.flink.api.scala.typeutils.OptionSerializer.copy(OptionSerializer.scala:51)
at
org.apache.flink.api.scala.typeutils.OptionSerializer.copy(OptionSerializer.scala:29)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at scala.collection.immutable.List.foreach(List.scala:392)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
at
org.apache.flink.streaming.api.scala.function.StatefulFunction$class.applyWithState(StatefulFunction.scala:41)
...

We haven't changed the code for the job (apart from updating to 1.6.1), so
we are not sure what may have changed. This is caused in a RichMapFunction
that extends a StatefulFunction. The state is a case class, and we create
its state serializer with the following code:

override protected lazy val stateSerializer: TypeSerializer[ClipinState] =
  
api.scala.createTypeInformation[MyCaseClass].createSerializer(getRuntimeContext.getExecutionConfig)

Any clues on what may be going on or where to look further? This was not an
issue on 1.3.2...

Thanks!

Bruno


Manual trigger the window in fold operator or incremental aggregation

2018-10-17 Thread zhen li
Hi all:
How can I trigger the window manually in  fold operator or incremental 
aggregation? For example, when some conditions is meet,althouth the count 
window or time window is not reach

Re: Checkpointing when one of the sources has completed

2018-10-17 Thread Joshua Fan
Hi Niels,

Probably not, an operator begins to do checkpoint until it gets all the
barriers from all the upstream sources, if one source can not send a
barrier, the downstream operator can not do checkpoint, FYI.

Yours sincerely
Joshua

On Wed, Oct 17, 2018 at 4:58 PM Niels van Kaam  wrote:

> Hi All,
>
> I am debugging an issue where the periodic checkpointing has halted. I
> noticed that one of the sources of my job has completed (finished). The
> other sources and operators would however still be able to produce output.
>
> Does anyone know if Flink's periodic checkpoints are supposed to continue
> when one or more sources of a job are in the "FINISHED" state?
>
> Cheers,
> Niels
>
>


Re: Checkpointing when one of the sources has completed

2018-10-17 Thread Fabian Hueske
Hi Niels,

Checkpoints can only complete if all sources are running.
That's because the checkpoint mechanism relies on injecting checkpoint
barriers into the stream at the sources.

Best, Fabian

Am Mi., 17. Okt. 2018 um 11:11 Uhr schrieb Paul Lam :

> Hi Niels,
>
> Please see https://issues.apache.org/jira/browse/FLINK-249.
>
> Best,
> Paul Lam
>
> 在 2018年10月17日,16:58,Niels van Kaam  写道:
>
> Hi All,
>
> I am debugging an issue where the periodic checkpointing has halted. I
> noticed that one of the sources of my job has completed (finished). The
> other sources and operators would however still be able to produce output.
>
> Does anyone know if Flink's periodic checkpoints are supposed to continue
> when one or more sources of a job are in the "FINISHED" state?
>
> Cheers,
> Niels
>
>
>


Re: Checkpointing when one of the sources has completed

2018-10-17 Thread Paul Lam
Hi Niels,

Please see https://issues.apache.org/jira/browse/FLINK-249 
.

Best,
Paul Lam

> 在 2018年10月17日,16:58,Niels van Kaam  写道:
> 
> Hi All,
> 
> I am debugging an issue where the periodic checkpointing has halted. I 
> noticed that one of the sources of my job has completed (finished). The other 
> sources and operators would however still be able to produce output.
> 
> Does anyone know if Flink's periodic checkpoints are supposed to continue 
> when one or more sources of a job are in the "FINISHED" state?
> 
> Cheers,
> Niels
> 



Get nothing from TaskManager in UI

2018-10-17 Thread Joshua Fan
Hi,all

Frequently, for some cluster, there is  no data from Task Manager in UI, as
the picture shows below.
[image: tm-hang.png]
but the cluster and the job is running well, just no metrics can be got.
anything can do to improve this?

Thanks for your assistance.

Your sincerely
Joshua


Checkpointing when one of the sources has completed

2018-10-17 Thread Niels van Kaam
Hi All,

I am debugging an issue where the periodic checkpointing has halted. I
noticed that one of the sources of my job has completed (finished). The
other sources and operators would however still be able to produce output.

Does anyone know if Flink's periodic checkpoints are supposed to continue
when one or more sources of a job are in the "FINISHED" state?

Cheers,
Niels


Re: Need help to understand memory consumption

2018-10-17 Thread jpreisner
Hi all,

Thanks for answers. I confirm I have streaming jobs.

If I resume :
- "When the job is cancelled, these managed memories will be released to the 
MemoryManager but not recycled by gc, so you will see no changes in memory 
consumption" is incorrect because MemoryManager functionnality is available 
only for batch jobs
- My issue could be resolved by storing state backend in an embedded RocksDB 
instance on disk

Is it exact ? If yes, does that mean that I have to purge old state backend in 
RocksDB ?

Thanks a lot !

Regards,
Julien.

- Mail original -
De: "Fabian Hueske" 
À: "wangzhijiang999" 
Cc: "Paul Lam" , jpreis...@free.fr, "user" 

Envoyé: Mercredi 17 Octobre 2018 10:03:35
Objet: Re: Need help to understand memory consumption



Hi, 


As was said before, managed memory (as described in the blog post [1]) is only 
used for batch jobs. 
By default, managed memory is only lazily allocated, i.e., when a batch job is 
executed. 


Streaming jobs maintain state in state backends. Flink provides state backends 
that store the state on the JVM heap or in an embedded RocksDB instance on 
disk. 
The state backend can be chosen per job (the default backend stores state on 
the JVM heap). 


Best, Fabian 



[1] https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html 


Am Mi., 17. Okt. 2018 um 08:53 Uhr schrieb Zhijiang(wangzhijiang999) < 
wangzhijiang...@aliyun.com >: 




The operators for stream jobs will not use memory management which is only for 
batch jobs as you said. 
I guess the initial feedback is for batch jobs from the description? 


-- 
发件人:Paul Lam < paullin3...@gmail.com > 
发送时间:2018年10月17日(星期三) 14:35 
收件人:Zhijiang(wangzhijiang999) < wangzhijiang...@aliyun.com > 
抄 送:jpreisner < jpreis...@free.fr >; user < user@flink.apache.org > 
主 题:Re: Need help to understand memory consumption 

Hi Zhijiang, 


Does the memory management apply to streaming jobs as well? A previous post[1] 
said that it can only be used in batch API, but I might miss some updates on 
that. Thank you! 


[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 


Best, 
Paul Lam 


在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) < wangzhijiang...@aliyun.com > 
写道: 


Hi Julien, 


Flink would manage the default 70% fraction of free memory in TaskManager for 
caching data efficiently, just as you mentioned in this article " 
https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html ". 
These managed memories are persistent resident and referenced by the 
MemoryManager once allocated, so they will be resident in old region of JVM and 
will not be recycled by gc. To do so, wecan aovid the costs of creating and 
recycling the objects repeatedly. 


The default parameter "taskmanager.memory.preallocate" is false, that means 
these managed memories will not be allocated during starting TaskManager. When 
the job is running, the related tasks would request these managed memories and 
then you will see the memory consumption is high. When the job is cancelled, 
these managed memories will be released to the MemoryManager but not recycled 
by gc, so you will see no changes in memory consumption. After you restart the 
TaskManager, the initial memory consumption is low because of lazy allocating 
via taskmanager.memory.preallocate=false. 


Best, 
Zhijiang 
-- 
发件人:Paul Lam < paullin3...@gmail.com > 
发送时间:2018年10月17日(星期三) 12:31 
收件人:jpreisner < jpreis...@free.fr > 
抄 送:user < user@flink.apache.org > 
主 题:Re: Need help to understand memory consumption 


Hi Julien, 

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC 
to release the memory. 

Best, 
Paul Lam 

> 在 2018年10月12日,14:29, jpreis...@free.fr 写道: 
> 
> Hi, 
> 
> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 
> TaskManager) 
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, 
> ...). All jobs are the same. They connect to Kafka topics and have two DB2 
> connector. 
> - Depending on a special event, a job can self-restart via the command : 
> bin/flink cancel  
> - At the end of the day, I cancel all jobs 
> - Each VM is configured with 16Gb RAM 
> - Allocated memory configured for one taskmanager is 10Gb 
> 
> After several days, the memory saturates (we exceed 14Gb of used memory). 
> 
> I read the following posts but I did not succeed in understanding my problem 
> : 
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html 
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser 
> 
> I did some tests on a machine (outside the cluster) with the top command and 
> this is what I concluded (please see attached file - Flink_memory.PNG) : 
> - When a job is started and running, it consumes memory 
> - When a job is cancelled, a large part 

Re: Need help to understand memory consumption

2018-10-17 Thread Fabian Hueske
Hi,

As was said before, managed memory (as described in the blog post [1]) is
only used for batch jobs.
By default, managed memory is only lazily allocated, i.e., when a batch job
is executed.

Streaming jobs maintain state in state backends. Flink provides state
backends that store the state on the JVM heap or in an embedded RocksDB
instance on disk.
The state backend can be chosen per job (the default backend stores state
on the JVM heap).

Best, Fabian

[1]
https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html

Am Mi., 17. Okt. 2018 um 08:53 Uhr schrieb Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com>:

> The operators for stream jobs will not use memory management which is only
> for batch jobs as you said.
> I guess the initial feedback is for batch jobs from the description?
>
> --
> 发件人:Paul Lam 
> 发送时间:2018年10月17日(星期三) 14:35
> 收件人:Zhijiang(wangzhijiang999) 
> 抄 送:jpreisner ; user 
> 主 题:Re: Need help to understand memory consumption
>
> Hi Zhijiang,
>
> Does the memory management apply to streaming jobs as well? A previous
> post[1] said that it can only be used in batch API, but I might miss some
> updates on that. Thank you!
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
>
> Best,
> Paul Lam
>
> 在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) 
> 写道:
>
> Hi Julien,
>
> Flink would manage the default 70% fraction of free memory in TaskManager
> for caching data efficiently, just as you mentioned in this article "
> https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html";.
> These managed memories are persistent resident and referenced by the
> MemoryManager once allocated, so they will be resident in old region of JVM
> and will not be recycled by gc. To do so, wecan aovid the costs of creating
> and recycling the objects repeatedly.
>
> The default parameter "taskmanager.memory.preallocate" is false, that
> means these managed memories will not be allocated during starting
> TaskManager. When the job is running, the related tasks would request these
> managed memories and then you will see the memory consumption is high. When
> the job is cancelled, these managed memories will be released to the
> MemoryManager but not recycled by gc, so you will see no changes in memory
> consumption. After you restart the TaskManager, the initial memory
> consumption is low because of lazy allocating
> via taskmanager.memory.preallocate=false.
>
> Best,
> Zhijiang
> --
> 发件人:Paul Lam 
> 发送时间:2018年10月17日(星期三) 12:31
> 收件人:jpreisner 
> 抄 送:user 
> 主 题:Re: Need help to understand memory consumption
>
>
> Hi Julien,
>
>
> AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM 
> GC to release the memory.
>
> Best,
> Paul Lam
>
> > 在 2018年10月12日,14:29,jpreis...@free.fr 写道:
> >
> > Hi,
> >
> > My use case is :
>
> > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 
> > 1 TaskManager)
>
> > - I run N jobs per days. N may vary (one day : N=20, another day : N=50, 
> > ...). All jobs are the same. They connect to Kafka topics and have two DB2 
> > connector.
>
> > - Depending on a special event, a job can self-restart via the command : 
> > bin/flink cancel 
> > - At the end of the day, I cancel all jobs
> > - Each VM is configured with 16Gb RAM
> > - Allocated memory configured for one taskmanager is 10Gb
> >
> > After several days, the memory saturates (we exceed 14Gb of used memory).
> >
>
> > I read the following posts but I did not succeed in understanding my 
> > problem :
> > -
> https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> > -
> http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> >
>
> > I did some tests on a machine (outside the cluster) with the top command 
> > and this is what I concluded (please see attached file - Flink_memory.PNG) :
> > - When a job is started and running, it consumes memory
> > - When a job is cancelled, a large part of the memory is still used
>
> > - When another job is started and running (after to have cancel the 
> > previous job), even more memory is consumed
> > - When I restart jobmanager and taskmanager, memory returns to normal
> >
> > Why when a job is canceled, the memory is not released?
> >
>
> > I added another attachment that represents the graph of a job - Graph.PNG.
>
> > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, 
> > triggers and windows, ...
> >
> > Thanks in advance,
> > Julien
>
>
>
>