回复:Need help to understand memory consumption

2018-10-16 Thread Zhijiang(wangzhijiang999)
The operators for stream jobs will not use memory management which is only for 
batch jobs as you said.
I guess the initial feedback is for batch jobs from the description?
--
发件人:Paul Lam 
发送时间:2018年10月17日(星期三) 14:35
收件人:Zhijiang(wangzhijiang999) 
抄 送:jpreisner ; user 
主 题:Re: Need help to understand memory consumption

Hi Zhijiang,

Does the memory management apply to streaming jobs as well? A previous post[1] 
said that it can only be used in batch API, but I might miss some updates on 
that. Thank you!

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525

Best,
Paul Lam

在 2018年10月17日,13:39,Zhijiang(wangzhijiang999)  写道:
Hi Julien,

Flink would manage the default 70% fraction of free memory in TaskManager for 
caching data efficiently, just as you mentioned in this article 
"https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html";. 
These managed memories are persistent resident and referenced by the 
MemoryManager once allocated, so they will be resident in old region of JVM and 
will not be recycled by gc. To do so, wecan aovid the costs of creating and 
recycling the objects repeatedly.

The default parameter "taskmanager.memory.preallocate" is false, that means 
these managed memories will not be allocated during starting TaskManager. When 
the job is running, the related tasks would request these managed memories and 
then you will see the memory consumption is high. When the job is cancelled, 
these managed memories will be released to the MemoryManager but not recycled 
by gc, so you will see no changes in memory consumption. After you restart the 
TaskManager, the initial memory consumption is low because of lazy allocating 
via taskmanager.memory.preallocate=false.

Best,
Zhijiang
--
发件人:Paul Lam 
发送时间:2018年10月17日(星期三) 12:31
收件人:jpreisner 
抄 送:user 
主 题:Re: Need help to understand memory consumption


Hi Julien,

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC 
to release the memory. 

Best,
Paul Lam

> 在 2018年10月12日,14:29,jpreis...@free.fr 写道:
> 
> Hi,
> 
> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 
> TaskManager)
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, 
> ...). All jobs are the same. They connect to Kafka topics and have two DB2 
> connector.
> - Depending on a special event, a job can self-restart via the command : 
> bin/flink cancel 
> - At the end of the day, I cancel all jobs
> - Each VM is configured with 16Gb RAM
> - Allocated memory configured for one taskmanager is 10Gb
> 
> After several days, the memory saturates (we exceed 14Gb of used memory).
> 
> I read the following posts but I did not succeed in understanding my problem :
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> 
> I did some tests on a machine (outside the cluster) with the top command and 
> this is what I concluded (please see attached file - Flink_memory.PNG) :
> - When a job is started and running, it consumes memory
> - When a job is cancelled, a large part of the memory is still used
> - When another job is started and running (after to have cancel the previous 
> job), even more memory is consumed
> - When I restart jobmanager and taskmanager, memory returns to normal
> 
> Why when a job is canceled, the memory is not released?
> 
> I added another attachment that represents the graph of a job - Graph.PNG.
> If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, 
> triggers and windows, ...
> 
> Thanks in advance,
> Julien





Re: Need help to understand memory consumption

2018-10-16 Thread Paul Lam
Hi Zhijiang,

Does the memory management apply to streaming jobs as well? A previous post[1] 
said that it can only be used in batch API, but I might miss some updates on 
that. Thank you!

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525

Best,
Paul Lam

> 在 2018年10月17日,13:39,Zhijiang(wangzhijiang999)  写道:
> 
> Hi Julien,
> 
> Flink would manage the default 70% fraction of free memory in TaskManager for 
> caching data efficiently, just as you mentioned in this article 
> "https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html";. 
> These managed memories are persistent resident and referenced by the 
> MemoryManager once allocated, so they will be resident in old region of JVM 
> and will not be recycled by gc. To do so, wecan aovid the costs of creating 
> and recycling the objects repeatedly.
> 
> The default parameter "taskmanager.memory.preallocate" is false, that means 
> these managed memories will not be allocated during starting TaskManager. 
> When the job is running, the related tasks would request these managed 
> memories and then you will see the memory consumption is high. When the job 
> is cancelled, these managed memories will be released to the MemoryManager 
> but not recycled by gc, so you will see no changes in memory consumption. 
> After you restart the TaskManager, the initial memory consumption is low 
> because of lazy allocating via taskmanager.memory.preallocate=false.
> 
> Best,
> Zhijiang
> --
> 发件人:Paul Lam 
> 发送时间:2018年10月17日(星期三) 12:31
> 收件人:jpreisner 
> 抄 送:user 
> 主 题:Re: Need help to understand memory consumption
> 
> 
> Hi Julien,
> 
> AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM 
> GC to release the memory. 
> 
> Best,
> Paul Lam
> 
> > 在 2018年10月12日,14:29,jpreis...@free.fr 写道:
> > 
> > Hi,
> > 
> > My use case is : 
> > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 
> > 1 TaskManager)
> > - I run N jobs per days. N may vary (one day : N=20, another day : N=50, 
> > ...). All jobs are the same. They connect to Kafka topics and have two DB2 
> > connector.
> > - Depending on a special event, a job can self-restart via the command : 
> > bin/flink cancel 
> > - At the end of the day, I cancel all jobs
> > - Each VM is configured with 16Gb RAM
> > - Allocated memory configured for one taskmanager is 10Gb
> > 
> > After several days, the memory saturates (we exceed 14Gb of used memory).
> > 
> > I read the following posts but I did not succeed in understanding my 
> > problem :
> > - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> > - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> > 
> > I did some tests on a machine (outside the cluster) with the top command 
> > and this is what I concluded (please see attached file - Flink_memory.PNG) :
> > - When a job is started and running, it consumes memory
> > - When a job is cancelled, a large part of the memory is still used
> > - When another job is started and running (after to have cancel the 
> > previous job), even more memory is consumed
> > - When I restart jobmanager and taskmanager, memory returns to normal
> > 
> > Why when a job is canceled, the memory is not released?
> > 
> > I added another attachment that represents the graph of a job - Graph.PNG.
> > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, 
> > triggers and windows, ...
> > 
> > Thanks in advance,
> > Julien
> 



回复:Need help to understand memory consumption

2018-10-16 Thread Zhijiang(wangzhijiang999)
Hi Julien,

Flink would manage the default 70% fraction of free memory in TaskManager for 
caching data efficiently, just as you mentioned in this article 
"https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html";. 
These managed memories are persistent resident and referenced by the 
MemoryManager once allocated, so they will be resident in old region of JVM and 
will not be recycled by gc. To do so, wecan aovid the costs of creating and 
recycling the objects repeatedly.

The default parameter "taskmanager.memory.preallocate" is false, that means 
these managed memories will not be allocated during starting TaskManager. When 
the job is running, the related tasks would request these managed memories and 
then you will see the memory consumption is high. When the job is cancelled, 
these managed memories will be released to the MemoryManager but not recycled 
by gc, so you will see no changes in memory consumption. After you restart the 
TaskManager, the initial memory consumption is low because of lazy allocating 
via taskmanager.memory.preallocate=false.

Best,
Zhijiang
--
发件人:Paul Lam 
发送时间:2018年10月17日(星期三) 12:31
收件人:jpreisner 
抄 送:user 
主 题:Re: Need help to understand memory consumption


Hi Julien,

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC 
to release the memory. 

Best,
Paul Lam

> 在 2018年10月12日,14:29,jpreis...@free.fr 写道:
> 
> Hi,
> 
> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 
> TaskManager)
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, 
> ...). All jobs are the same. They connect to Kafka topics and have two DB2 
> connector.
> - Depending on a special event, a job can self-restart via the command : 
> bin/flink cancel 
> - At the end of the day, I cancel all jobs
> - Each VM is configured with 16Gb RAM
> - Allocated memory configured for one taskmanager is 10Gb
> 
> After several days, the memory saturates (we exceed 14Gb of used memory).
> 
> I read the following posts but I did not succeed in understanding my problem :
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> 
> I did some tests on a machine (outside the cluster) with the top command and 
> this is what I concluded (please see attached file - Flink_memory.PNG) :
> - When a job is started and running, it consumes memory
> - When a job is cancelled, a large part of the memory is still used
> - When another job is started and running (after to have cancel the previous 
> job), even more memory is consumed
> - When I restart jobmanager and taskmanager, memory returns to normal
> 
> Why when a job is canceled, the memory is not released?
> 
> I added another attachment that represents the graph of a job - Graph.PNG.
> If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, 
> triggers and windows, ...
> 
> Thanks in advance,
> Julien



Re: Need help to understand memory consumption

2018-10-16 Thread Paul Lam


Hi Julien,

AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC 
to release the memory. 

Best,
Paul Lam

> 在 2018年10月12日,14:29,jpreis...@free.fr 写道:
> 
> Hi,
> 
> My use case is : 
> - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 
> TaskManager)
> - I run N jobs per days. N may vary (one day : N=20, another day : N=50, 
> ...). All jobs are the same. They connect to Kafka topics and have two DB2 
> connector.
> - Depending on a special event, a job can self-restart via the command : 
> bin/flink cancel 
> - At the end of the day, I cancel all jobs
> - Each VM is configured with 16Gb RAM
> - Allocated memory configured for one taskmanager is 10Gb
> 
> After several days, the memory saturates (we exceed 14Gb of used memory).
> 
> I read the following posts but I did not succeed in understanding my problem :
> - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
> - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser
> 
> I did some tests on a machine (outside the cluster) with the top command and 
> this is what I concluded (please see attached file - Flink_memory.PNG) :
> - When a job is started and running, it consumes memory
> - When a job is cancelled, a large part of the memory is still used
> - When another job is started and running (after to have cancel the previous 
> job), even more memory is consumed
> - When I restart jobmanager and taskmanager, memory returns to normal
> 
> Why when a job is canceled, the memory is not released?
> 
> I added another attachment that represents the graph of a job - Graph.PNG.
> If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, 
> triggers and windows, ...
> 
> Thanks in advance,
> Julien



Re: Window State is not being store on check-pointing

2018-10-16 Thread sohimankotia
Hi Hequn,

I tried with following :

Configuration conf = new Configuration();
   
conf.setString("state.checkpoints.dir","file:///home/sohanvir/Desktop/flink/checkpoints2");
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1,conf);
CheckpointConfig config = env.getCheckpointConfig();
   
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setParallelism(1);
env.enableCheckpointing(20 * SECOND);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new
RocksDBStateBackend("file:///home/sohanvir/Desktop/flink/checkpoints"));
   

Still issue persists. 

Any idea ?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Window State is not being store on check-pointing

2018-10-16 Thread Hequn Cheng
Hi sohimankotia,

Have you ever enableExternalizedCheckpoints[1]?

> // enable externalized checkpoints which are retained after job
> cancellation
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing

On Tue, Oct 16, 2018 at 11:47 PM sohimankotia 
wrote:

> Hi,
>
> I am using following in code :
>
> 1. flink 1.4
> 2. running example on IDE
> 3. Enabled Exactly once semantics
> 4. Window Aggregation
> 5. Checkpoint is enabled at 20 Sec
> 6/ RocksDB as state backend
>
>
> Workflow :
>
> Kafka Source -> map -> keyBy -> Window(60 Sec) -> ApplyFunction ->
> Aggregated Record to Kafka
>
> Issues :
>
> I am having issues with checkpointing . If job reads few records from kafka
> and Window still needs to be evaluated , even then checkpointed is
> triggered
> and getting completed successfully.
> If i stop job after 30 seconds (by this kafka checkpoint is completed) and
> restart my job .. all inflight messages for window are getting lost . Flink
> is not restoring them from state backend.
>
> Attaching code .
>
>
> CheckpointTest1.java
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/CheckpointTest1.java>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Custom Trigger + SQL Pattern

2018-10-16 Thread Hequn Cheng
Hi Shahar,

The table function takes a single row but can output multi rows. You can
split the row based on the "last" event. The code looks like:

val sessionResult =
>   "SELECT " +
> "  lastUDAF(line) AS lastEvents "
> "FROM MyTable " +
> "GROUP BY SESSION(rowtime, INTERVAL '4' HOUR)"
> val result =
>   s"SELECT lastEvent FROM ($sessionResult), LATERAL 
> TABLE(splitUDTF(lastEvents))
> as T(lastEvent)"


The lastUDAF is used to process data in a session window. As your lastEvent
is base on either window end or a special "last" event, the lastUDAF
outputs multi last events.
After the window, we perform a splitUDTF to split the lastEvents to multi
single events.

Best, Hequn


On Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Im wondering how does that work, it seems that a table function still
> takes a single row's values as an input, am i wrong (or at least that is
> how the examples show)?
> How would the SQL look like?
>
> On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng  wrote:
>
>> Hi shkob1,
>>
>> > while one is time(session inactivity) the other is based on a specific
>> event marked as a "last" event.
>> How about using a session window and an udtf[1] to solve the problem. The
>> session window may output multi `last` elements. However, we can use a udtf
>> to split them into single ones. Thus, we can use SQL for the whole job.
>>
>> Best, Hequn.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions
>>
>> On Sat, Oct 13, 2018 at 2:28 AM shkob1 
>> wrote:
>>
>>> Hey!
>>>
>>> I have a use case in which im grouping a stream by session id - so far
>>> pretty standard, note that i need to do it through SQL and not by the
>>> table
>>> api.
>>> In my use case i have 2 trigger conditions though - while one is time
>>> (session inactivity) the other is based on a specific event marked as a
>>> "last" event.
>>> AFAIK SQL does not support custom triggers - so what i end up doing is
>>> doing
>>> group by in the SQL - then converting the result to a stream along with a
>>> boolean field that marks whether at least one of the events was the end
>>> event - then adding my custom trigger on top of it.
>>> It looks something like this:
>>>
>>>  Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
>>> sessionId, count(*) FROM source Group By sessionId");
>>> tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
>>> .filter(tuple -> tuple.f0)
>>> .map(...)
>>> .returns(...)
>>> .keyBy("sessionId")
>>> .window(EventTimeSessionWindows.withGap(Time.hours(4)))
>>> .trigger(new SessionEndedByTimeOrEndTrigger())
>>> .process(...take last element from the group by result..)
>>>
>>> This seems like a weird work around to, isn't it? my window is basically
>>> of
>>> the SQL result rather than on the source stream. Ideally i would keyby
>>> the
>>> sessionId before running the SQL but then a) would I need to register a
>>> table per key? b) would i be able to use the custom trigger per window?
>>>
>>> basically i want to group by session id and have a window for every
>>> session
>>> that supports both time and custom trigger. Assuming i need to use SQL
>>> (reason is the query is dynamically loaded), is there a better solution
>>> for
>>> it?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: When does Trigger.clear() get called?

2018-10-16 Thread Hequn Cheng
Hi Andrew,

You should call it manually, as the global window does not have a natural
end.

Best, Hequn

On Wed, Oct 17, 2018 at 2:47 AM Andrew Danks  wrote:

> Hi Fabian & Hequn,
>
> Thank you for your responses. I am just responding now as I was out of
> office for the last few days
>
> You mentioned that clear() is called when the time exceeds the window’s
> end timestamp. For my application I am using a GlobalWindow on a keyed
> stream -- would clear() get called at all in this case or should I be
> calling it manually?
>
>
> Andrew
>
> On Oct 12, 2018, at 12:48 AM, Fabian Hueske  wrote:
>
> Hi Andrew,
>
> The PURGE action of a window removes the window state (i.e., the collected
> events or computed aggregate) but the window meta data including the
> Trigger remain.
> The Trigger.close() method is called, when the winodw is completely (i.e.,
> all meta data) discarded. This happens, when the time (wallclock time for
> processing time or watermark for event time windows) exceeds the window's
> end timestamp.
>
> Best, Fabian
>
> Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng <
> chenghe...@gmail.com>:
>
>> Hi Andrew,
>>
>> Do you use CountWindow? You can switch to TimeWindow to have a test.
>> I'm not quite familiar with window. I checked the code and found that
>> clear() is called only when timer is triggered, i.e, called at the end of
>> time window.
>> Hope this helps.
>>
>> Best, Hequn
>>
>> On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks  wrote:
>>
>>> Hello,
>>>
>>> I see that the clear() function is implemented for various types of
>>> Triggers in the Flink API. For example:
>>>
>>> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
>>>
>>> I am working on a custom Trigger for my application and have implemented
>>> clear() in a similar way.
>>>
>>> However, having put a breakpoint in this function it doesn’t seem to get
>>> called when I expect. The source code says that is called "when a window is
>>> purged”[1] but when my Trigger emits a PURGE this function never seems to
>>> get called. I am on Flink 1.3.
>>>
>>> Hoping someone can shed more light on the purpose of clear() and
>>> how/when it gets called
>>>
>>> Thanks!
>>> Andrew
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
>>>
>>>
>


Re: When does Trigger.clear() get called?

2018-10-16 Thread Andrew Danks
Hi Fabian & Hequn,

Thank you for your responses. I am just responding now as I was out of office 
for the last few days

You mentioned that clear() is called when the time exceeds the window’s end 
timestamp. For my application I am using a GlobalWindow on a keyed stream -- 
would clear() get called at all in this case or should I be calling it manually?


Andrew

> On Oct 12, 2018, at 12:48 AM, Fabian Hueske  wrote:
> 
> Hi Andrew,
> 
> The PURGE action of a window removes the window state (i.e., the collected 
> events or computed aggregate) but the window meta data including the Trigger 
> remain.
> The Trigger.close() method is called, when the winodw is completely (i.e., 
> all meta data) discarded. This happens, when the time (wallclock time for 
> processing time or watermark for event time windows) exceeds the window's end 
> timestamp.
> 
> Best, Fabian
> 
> Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng  >:
> Hi Andrew,
> 
> Do you use CountWindow? You can switch to TimeWindow to have a test.
> I'm not quite familiar with window. I checked the code and found that clear() 
> is called only when timer is triggered, i.e, called at the end of time window.
> Hope this helps.
> 
> Best, Hequn
> 
> On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks  > wrote:
> Hello,
> 
> I see that the clear() function is implemented for various types of Triggers 
> in the Flink API. For example:
> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
>  
> 
> 
> I am working on a custom Trigger for my application and have implemented 
> clear() in a similar way.
> 
> However, having put a breakpoint in this function it doesn’t seem to get 
> called when I expect. The source code says that is called "when a window is 
> purged”[1] but when my Trigger emits a PURGE this function never seems to get 
> called. I am on Flink 1.3.
> 
> Hoping someone can shed more light on the purpose of clear() and how/when it 
> gets called
> 
> Thanks!
> Andrew
> 
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
>  
> 
> 



Re: Custom Trigger + SQL Pattern

2018-10-16 Thread Shahar Cizer Kobrinsky
Im wondering how does that work, it seems that a table function still takes
a single row's values as an input, am i wrong (or at least that is how the
examples show)?
How would the SQL look like?

On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng  wrote:

> Hi shkob1,
>
> > while one is time(session inactivity) the other is based on a specific
> event marked as a "last" event.
> How about using a session window and an udtf[1] to solve the problem. The
> session window may output multi `last` elements. However, we can use a udtf
> to split them into single ones. Thus, we can use SQL for the whole job.
>
> Best, Hequn.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions
>
> On Sat, Oct 13, 2018 at 2:28 AM shkob1  wrote:
>
>> Hey!
>>
>> I have a use case in which im grouping a stream by session id - so far
>> pretty standard, note that i need to do it through SQL and not by the
>> table
>> api.
>> In my use case i have 2 trigger conditions though - while one is time
>> (session inactivity) the other is based on a specific event marked as a
>> "last" event.
>> AFAIK SQL does not support custom triggers - so what i end up doing is
>> doing
>> group by in the SQL - then converting the result to a stream along with a
>> boolean field that marks whether at least one of the events was the end
>> event - then adding my custom trigger on top of it.
>> It looks something like this:
>>
>>  Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
>> sessionId, count(*) FROM source Group By sessionId");
>> tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
>> .filter(tuple -> tuple.f0)
>> .map(...)
>> .returns(...)
>> .keyBy("sessionId")
>> .window(EventTimeSessionWindows.withGap(Time.hours(4)))
>> .trigger(new SessionEndedByTimeOrEndTrigger())
>> .process(...take last element from the group by result..)
>>
>> This seems like a weird work around to, isn't it? my window is basically
>> of
>> the SQL result rather than on the source stream. Ideally i would keyby the
>> sessionId before running the SQL but then a) would I need to register a
>> table per key? b) would i be able to use the custom trigger per window?
>>
>> basically i want to group by session id and have a window for every
>> session
>> that supports both time and custom trigger. Assuming i need to use SQL
>> (reason is the query is dynamically loaded), is there a better solution
>> for
>> it?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Fire and Purge with Idle State

2018-10-16 Thread Shahar Cizer Kobrinsky
Thanks!

On Fri, Oct 12, 2018 at 9:29 PM Hequn Cheng  wrote:

> Hi shkob1,
>
> Currently, the idle state retention time is only used for unbounded
> operators in sql/table-api. The unbounded operators include non-window
> group by, non-window join, unbounded over, etc. The retention time affects
> neither sql/table-api window operators nor DataStream operators.
>
> Best, Hequn
>
> On Sat, Oct 13, 2018 at 2:40 AM shkob1  wrote:
>
>> Hey
>>
>> Say im aggregating an event stream by sessionId in SQL and im emitting the
>> results once the session is "over", i guess i should be using Fire and
>> Purge
>> - i dont expect to need to session data once over. How should i treat the
>> Idle state retention time - is it needed at all if im using purge? will it
>> become relevant only if a session is both never-ending AND never has more
>> records?
>>
>> Thanks!
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Window State is not being store on check-pointing

2018-10-16 Thread sohimankotia
Hi,

I am using following in code :

1. flink 1.4 
2. running example on IDE
3. Enabled Exactly once semantics
4. Window Aggregation
5. Checkpoint is enabled at 20 Sec
6/ RocksDB as state backend


Workflow :

Kafka Source -> map -> keyBy -> Window(60 Sec) -> ApplyFunction ->
Aggregated Record to Kafka 

Issues :

I am having issues with checkpointing . If job reads few records from kafka
and Window still needs to be evaluated , even then checkpointed is triggered
and getting completed successfully.
If i stop job after 30 seconds (by this kafka checkpoint is completed) and
restart my job .. all inflight messages for window are getting lost . Flink
is not restoring them from state backend.

Attaching code .


CheckpointTest1.java

  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Report failed job submission

2018-10-16 Thread Flavio Pompermaier
Hi to all,
which is the correct wat to report back to the user a failure from a job
submission in FLink?
If everything is OK the job run API returns the job id but what if there
are error in parameter validation or some other problem?
Which is the right way to report back to the user the job error detail
(apart from throwing an Exception)?

Best,
Flavio


Re: Flink Table API and table name

2018-10-16 Thread Flavio Pompermaier
Done: https://issues.apache.org/jira/browse/FLINK-10562

On Tue, Oct 16, 2018 at 11:12 AM Timo Walther  wrote:

> Hi Flavio,
>
> yes you are right, I don't see a reason why we should not support such
> table names. Feel free to open an issue for it.
>
> Regards,
> Timo
>
>
> Am 16.10.18 um 10:56 schrieb miki haiat:
>
> Im not sure if it will solve this issue but can you try to register the
> your catalog [1]
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html#register-an-external-catalog
>
>
> On Tue, Oct 16, 2018 at 11:40 AM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> in my job I'm trying to read a dataset whose name/id starts with a number.
>> It seems that when using the Table API to read that dataset, if the name
>> starts with a number it is a problem..am I wrong?  I can't find anything
>> about table id constraints on the documentation and it seems that it's not
>> possible to escape the name..for the moment I've added a 'T' in front of
>> the name in order to have something  like T1 or T2 but it looks like a
>> workaround to me..
>>
>> Best,
>> Flavio
>>
>
>


Re: Flink Table API and table name

2018-10-16 Thread Timo Walther

Hi Flavio,

yes you are right, I don't see a reason why we should not support such 
table names. Feel free to open an issue for it.


Regards,
Timo


Am 16.10.18 um 10:56 schrieb miki haiat:
Im not sure if it will solve this issue but can you try to register 
the your catalog [1]


1.https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html#register-an-external-catalog


On Tue, Oct 16, 2018 at 11:40 AM Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:


Hi to all,
in my job I'm trying to read a dataset whose name/id starts with a
number.
It seems that when using the Table API to read that dataset, if
the name starts with a number it is a problem..am I wrong?  I
can't find anything about table id constraints on the
documentation and it seems that it's not possible to escape the
name..for the moment I've added a 'T' in front of the name in
order to have something like T1 or T2 but it looks like a
workaround to me..

Best,
Flavio





Re: Flink Table API and table name

2018-10-16 Thread miki haiat
Im not sure if it will solve this issue but can you try to register the
your catalog [1]

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html#register-an-external-catalog


On Tue, Oct 16, 2018 at 11:40 AM Flavio Pompermaier 
wrote:

> Hi to all,
> in my job I'm trying to read a dataset whose name/id starts with a number.
> It seems that when using the Table API to read that dataset, if the name
> starts with a number it is a problem..am I wrong?  I can't find anything
> about table id constraints on the documentation and it seems that it's not
> possible to escape the name..for the moment I've added a 'T' in front of
> the name in order to have something  like T1 or T2 but it looks like a
> workaround to me..
>
> Best,
> Flavio
>


Flink Table API and table name

2018-10-16 Thread Flavio Pompermaier
Hi to all,
in my job I'm trying to read a dataset whose name/id starts with a number.
It seems that when using the Table API to read that dataset, if the name
starts with a number it is a problem..am I wrong?  I can't find anything
about table id constraints on the documentation and it seems that it's not
possible to escape the name..for the moment I've added a 'T' in front of
the name in order to have something  like T1 or T2 but it looks like a
workaround to me..

Best,
Flavio


Re: Can't start taskmanager in Minikube

2018-10-16 Thread miki haiat
Did you execute this command ?

Note: If using MiniKube please make sure to execute minikube ssh 'sudo ip
> link set docker0 promisc on' before deploying a Flink cluster. Otherwise
> Flink components are not able to self reference themselves through a
> Kubernetes service.


On Tue, Oct 16, 2018 at 10:01 AM zpp  wrote:

> I followed the Doc(
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/kubernetes.html#session-cluster-resource-definitions)
>  to
> run flink on kubernetes,
> but there is an exception(java.net.UnknownHostException: flink-jobmanager:
> Temporary failure in name resolution).
> I use a newly installed Minikube on Windows.How can I solve this problem?
> Thanks a lot.
>
>
>
>
>


Can't start taskmanager in Minikube

2018-10-16 Thread zpp
I followed the Doc( 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/kubernetes.html#session-cluster-resource-definitions)
 to run flink on kubernetes,
but there is an exception(java.net.UnknownHostException: flink-jobmanager: 
Temporary failure in name resolution).
I use a newly installed Minikube on Windows.How can I solve this problem? 
Thanks a lot.