Re: [flink-cep] Flick CEP support for the group By operator

2018-11-25 Thread Piotr Nowojski
Hey,

As a matter of fact, you do not need a Flink's CEP library to run the same 
query. The same functionality can be achieved by simple tumbling window with a 
“median” aggregate (“median" you would have to implement by your self).

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

Piotrek

> On 23 Nov 2018, at 16:32, Piotr Nowojski  wrote:
> 
> Hi,
> 
> Yes, sure. Just use CEP on top of KeyedStream. Take a look at `keyBy`:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
>  
> 
> 
> Piotrek 
> 
>> On 23 Nov 2018, at 16:04, Spico Florin > > wrote:
>> 
>> Hello!
>> 
>> I'm using Flink 1.4.2 and I would like to use a group by operator based on 
>> value of my event stream. The functionality that I would like to achieve is 
>> similar to the following Esper EPL
>> (excerpt 
>> http://esper.espertech.com/release-5.2.0/esper-reference/html/epl_clauses.html#epl-grouping-aggregating
>>  
>> )
>> 
>> 
>> select symbol, tickDataFeed, median(volume) 
>> from StockTickEvent.win :time(30 sec) 
>> group by symbol, tickDataFeed
>>  <>
>> 
>> So, does the Flick CEP  support such a group by functionality? 
>> 
>> If yes what is syntax?
>> 
>> 
>> 
>> I look forward for your answers.
>> 
>> 
>> 
>> Best regards,
>> 
>>  Florin 
>> 
>> 
>> 
>> 
>> 
>> (excerpt 
>> http://esper.espertech.com/release-5.2.0/esper-reference/html/epl_clauses.html#epl-grouping-aggregating
>>  
>> )
>> "You can list more then one expression in the group by clause to nest 
>> groups. Once the sets are established with group by the aggregation 
>> functions are applied. This statement posts the median volume for all stock 
>> tick events in the last 30 seconds per symbol and tick data feed. Esper 
>> posts one event for each group to statement listeners:
>> In the statement above the event properties in the select list (symbol, 
>> tickDataFeed) are also listed in the group by clause. The statement thus 
>> follows the SQL standard which prescribes that non-aggregated event 
>> properties in the select list must match the group by columns."
>> 
> 



Window is not working in streaming

2018-11-25 Thread Abhijeet Kumar
Hello Team,

I'm new to Flink and I don't know why window is not working

DataStream> 
window2 = stream2.assignTimestampsAndWatermarks(
new 
AscendingTimestampExtractor>() {
public long 
extractAscendingTimestamp(

Tuple7 t) {
return t.f6;
}

}).windowAll(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new 
Reducer2());

window2.print();

So, this is the code that I've written for 2 sec window and I'm using event 
time for processing window

My data format is like 

13,234234,34,32445,3423fsdf,201919

The last value in csv is time(MMDDHHmmss)

Definition of Reducer2:

public static final class Reducer2
implements ReduceFunction> {
public Tuple7 reduce(
Tuple7 t_new,
Tuple7 t_old) {
return new Tuple7(t_new.f0, t_new.f1, t_new.f2,
t_new.f3, t_new.f4, t_new.f5, t_new.f6);
}
}

With my understanding when the data comes similar to above sample, then first 
window is created. When timestamp is 201920 then, this will also be 
added to the window. Finally if something comes like 201922, the old 
window should be stopped and this code should print the result on the console. 
Problem is it's not working the same way. May be my understanding is not 
correct. please correct me if I'm wrong.

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !
abhijeet.ku...@sentienz.com  
|www.sentienz.com  | Bengaluru




Re: understadning kafka connector - rebalance

2018-11-25 Thread Taher Koitawala
Hi Avi,
  No, rebalance is not changing the number of kafka partitions.
Lets say you have 6 kafka partitions and your flink parallelism is 8, in
this case using rebalance will send records to all downstream operators in
a round robin fashion.

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Mon, Nov 26, 2018 at 11:33 AM Avi Levi  wrote:

> Hi
> Looking at this example
> ,
> doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation
> on heavy load stream wouldn't slow the stream ? is the rebalancing action
> occurs only when there is a partition change ?
> it says that "the rebelance call is causing a repartitioning of the data
> so that all machines" is it actually changing the num of partitions of
> the topic to match the num of flink operators ?
>
> Avi
>


understadning kafka connector - rebalance

2018-11-25 Thread Avi Levi
Hi
Looking at this example
,
doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation
on heavy load stream wouldn't slow the stream ? is the rebalancing action
occurs only when there is a partition change ?
it says that "the rebelance call is causing a repartitioning of the data so
that all machines" is it actually changing the num of partitions of the
topic to match the num of flink operators ?

Avi


回复:Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-25 Thread zhijiang
I think it is probably related with rockdb memory usage if you have not found 
OutOfMemory issue before.

There already existed a jira ticket [1] for fixing this issue, and you can 
watch it for updates. :)

[1] https://issues.apache.org/jira/browse/FLINK-10884

Best,
Zhijiang
--
发件人:Gagan Agrawal 
发送时间:2018年11月24日(星期六) 14:14
收件人:user 
主 题:Flink job failing due to "Container is running beyond physical memory 
limits" error.

Hi,
I am running flink job on yarn where it ran fine so far (4-5 days) and have now 
started failing with following errors.

2018-11-24 03:46:21,029 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_1542008917197_0038_01_06 because: Container 
[pid=18380,containerID=container_1542008917197_0038_01_06] is running 
beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory 
used; 5.0 GB of 15 GB virtual memory used. Killing container.

This is simple job where we are reading 2 Avro streams from Kafka and applying 
some custom UDF after creating keyed stream from union on those 2 streams and 
writing back output to Kafka. Udf internally uses Map State with RocksDB 
backend. Currently size of checkpoint is around 300 GB and we are running this 
with 10 task manager with 3 GB memory each. I have also set 
"containerized.heap-cutoff-ratio: 0.5" but still facing same issue. Flink 
version is 1.6.2

Here is the flink command
./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar

I want to understand what are typical reasons for this issue? Also why would 
flink consume more memory than allocated as JVM memory is fixed and will not 
grow beyond max heap. Can this be something related to RocksDB where it may be 
consuming memory outside heap and hence over using defined limits? I didn't 
find this issue when checkpoint size was small (<50 GB). But ever since we are 
now at 300GB size, this issue is coming frequently. I can try increasing 
memory, but I am still interested in knowing what are typical reasons for this 
error if Jvm heap memory can not grow beyond defined limit.

Gagan





Re: Flink Exception - assigned slot container was removed

2018-11-25 Thread 罗齐
Hi,

It looks that some of your slots were freed during the job execution (possibly 
due to idle for too long). AFAIK the exception was thrown when a pending Slot 
request was removed. You can try increase the “Slot.idle.timeout” to mitigate 
this issue (default is 5, try 360 or higher).

Regards,
Qi

> On Nov 26, 2018, at 7:36 AM, Flink Developer  
> wrote:
> 
> Hi, I have a Flink application sourcing from a topic in Kafka (400 
> partitions) and sinking to S3 using bucketingsink and using RocksDb for 
> checkpointing every 2 mins. The Flink app runs with parallelism 400 so that 
> each worker handles a partition. This is using Flink 1.5.2. The Flink cluster 
> uses 10 task managers with 40 slots each.
> 
> After running for a few days straight, it encounters a Flink exception:
> Org.apache.flink.util.FlinkException: The assigned slot 
> container_1234567_0003_01_09_1 was removed.
> 
> This causes the Flink job to fail. It is odd to me. I am unsure what causes 
> this. Also, during this time, I see some checkpoints stating "checkpoint was 
> declined (tasks not ready)". At this point, the job is unable to recover and 
> fails. Does this happen if a slot or worker is not doing processing for X 
> amount of time? Would I need to increase the Flink config properties for the 
> following when creating the Flink cluster in yarn?
> 
> Slot.idle.timeout
> Slot.request.timeout
> Web.timeout
> Heartbeat.interval
> Heartbeat.timeout
> 
> Any help would be greatly appreciated.
> 



Re: Reset kafka offets to latest on restart

2018-11-25 Thread Tony Wei
Hi Vishal,

Sorry, I didn't notice this requirement, but I can't come up with another
solution, unless disable checkpointing or customize your own
kafka source function. For the first case, you may have to give up storing
states in flink's state backend. For the second one, write
your own implementation with kafka client and always seek to the latest
position when the job begin to run.

Best,
Tony Wei

Vishal Santoshi  於 2018年11月25日 週日 上午4:51寫道:

> I think I can set . a new uuid but it seems `allowNonRestoreState` is a
> CLI hint. I need the "automatic" restart on failure to use the new uuid.
> Our use case has no use of data on Kafka that is not current.
>
> On Thu, Nov 22, 2018 at 11:16 PM Tony Wei  wrote:
>
>> Hi Vishal,
>>
>> AFAIK, the current behavior of kafka source will always use checkpoint
>> state as the start position, ignoring other configuration.
>> A workaround solution I can come up with is to set a new uuid to your
>> kafka source and restore your job with `allowNonRestoreState`.
>> Therefore, you can use the way that Rong provided to set the initial
>> start position.
>>
>> cc. Gordon who know more about the details of kafka source.
>>
>> Best,
>> Tony Wei
>>
>> Rong Rong  於 2018年11月22日 週四 上午8:23寫道:
>>
>>> Hi Vishal,
>>>
>>> You can probably try using similar offset configuration as a service
>>> consumer.
>>> Maybe this will be useful to look at [1]
>>>
>>> Thanks,
>>> Rong
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>
>>> On Wed, Nov 21, 2018 at 1:32 PM Jamie Grier  wrote:
>>>
 Hi Vishal,

 No, there is no way to do this currently.


 On Wed, Nov 21, 2018 at 10:22 AM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Any one ?
>
> On Tue, Nov 20, 2018 at 12:48 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Is it possible to have checkpointing but reset the kafka offsets to
>> latest on restart on failure ?
>>
>


Flink Exception - assigned slot container was removed

2018-11-25 Thread Flink Developer
Hi, I have a Flink application sourcing from a topic in Kafka (400 partitions) 
and sinking to S3 using bucketingsink and using RocksDb for checkpointing every 
2 mins. The Flink app runs with parallelism 400 so that each worker handles a 
partition. This is using Flink 1.5.2. The Flink cluster uses 10 task managers 
with 40 slots each.

After running for a few days straight, it encounters a Flink exception:
Org.apache.flink.util.FlinkException: The assigned slot 
container_1234567_0003_01_09_1 was removed.

This causes the Flink job to fail. It is odd to me. I am unsure what causes 
this. Also, during this time, I see some checkpoints stating "checkpoint was 
declined (tasks not ready)". At this point, the job is unable to recover and 
fails. Does this happen if a slot or worker is not doing processing for X 
amount of time? Would I need to increase the Flink config properties for the 
following when creating the Flink cluster in yarn?

Slot.idle.timeout
Slot.request.timeout
Web.timeout
Heartbeat.interval
Heartbeat.timeout

Any help would be greatly appreciated.

Re: where can I see logs from code

2018-11-25 Thread Avi Levi
Hi Miki,
Thanks for your reply. However I do not see the log written from the code
(I do use logback)

On Sun, Nov 25, 2018 at 12:30 PM miki haiat  wrote:

>
> You can see the logs in the webUI.
> If you click on the Task manager tab you can find the logs
>
> http://SERVERADD/#/taskmanager/TM_ID/log
>
>
>
>
>
> On Sun, Nov 25, 2018 at 12:11 PM Avi Levi  wrote:
>
>> Hi,
>> Where can I see the logs written by the app code (i.e by the app
>> developer) ?
>>
>> BR
>> Avi
>>
>


Flink with YARN: Error while calling YARN Node Manager to stop container

2018-11-25 Thread Marke Builder
Hi,

I get the follow WARN and Exception in the Job Manager Logs (the job
continues).
Why do I get this exception and what do I have to consider?
I have a flink streaming job which write the data via OutputFormat to Hbase.

2018-11-25 12:08:34,721 WARN  org.apache.hadoop.util.NativeCodeLoader
 - Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
2018-11-25 12:09:28,457 WARN  org.apache.flink.yarn.YarnResourceManager
 - Error while calling YARN Node Manager to stop container
org.apache.hadoop.yarn.exceptions.YarnException: Container
container_1541828054499_0441_01_06 is not handled by this NodeManager
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
at
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
at
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainerInternal(NMClientImpl.java:297)
at
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.stopContainer(NMClientImpl.java:247)
at
org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:304)
at
org.apache.flink.yarn.YarnResourceManager.stopWorker(YarnResourceManager.java:73)
at
org.apache.flink.runtime.resourcemanager.ResourceManager.releaseResource(ResourceManager.java:852)
at
org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.releaseResource(ResourceManager.java:1057)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.checkTaskManagerTimeouts(SlotManager.java:911)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.lambda$null$0(SlotManager.java:195)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Thanks!
Marke


Re: where can I see logs from code

2018-11-25 Thread miki haiat
You can see the logs in the webUI.
If you click on the Task manager tab you can find the logs

http://SERVERADD/#/taskmanager/TM_ID/log





On Sun, Nov 25, 2018 at 12:11 PM Avi Levi  wrote:

> Hi,
> Where can I see the logs written by the app code (i.e by the app
> developer) ?
>
> BR
> Avi
>


where can I see logs from code

2018-11-25 Thread Avi Levi
Hi,
Where can I see the logs written by the app code (i.e by the app developer)
?

BR
Avi