What's the advantage of using BroadcastState?

2018-08-18 Thread Paul Lam
Hi,

AFAIK, the difference between a BroadcastStream and a normal DataStream is
that the BroadcastStream is with a BroadcastState, but it seems that the
functionality of BroadcastState can also be achieved by MapState in a
CoMapFunction or something since the control stream is still broadcasted
without being turned into BroadcastStream. So, I’m wondering what’s the
advantage of using BroadcastState? Thanks a lot!

Best Regards,
Paul Lam


Re: processWindowFunction

2018-08-18 Thread antonio saldivar
hi Vino

it is possible to use global window, then set the trigger onElement
comparing the element that has arrived with for example 10 mins, 20 mins
and 60 mins of data?

I have rules evaluating sum of amount for 10,20 or 60 mins for the same
keyed element if the same id sum like $200 total within those thresholds
and count more or equals to 3 I need to be able to set some values to the
object if the object does not reach those thresholds i do not set the
values and keep sending the output with or without those value.

just processing the object on the fly and send output







El vie., 17 ago. 2018 a las 22:14, vino yang ()
escribió:

> Hi antonio,
>
> Yes, ProcessWindowFunction is a very low level window function.
> It allows you to access the data in the window and allows you to customize
> the output of the window.
> So if you use it, while giving you flexibility, you need to think about
> other things, which may require you to write more processing logic.
>
> Generally speaking, sliding windows usually have some data that is
> repeated, but a common mode is to apply a reduce function on it to get your
> calculation results.
> If you only send data, there will definitely be some duplication.
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>
>> Hi Vino
>> thank you for the information, actually I am using a trigger alert and
>> processWindowFunction to send my results, but when my window slides or ends
>> it sends again the objects and I an getting duplicated data
>>
>> El jue., 16 ago. 2018 a las 22:05, vino yang ()
>> escribió:
>>
>>> Hi Antonio,
>>>
>>> What results do not you want to get when creating each window?
>>> Examples of the use of ProcessWindowFunction are included in many test
>>> files in Flink's project, such as SideOutputITCase.scala or
>>> WindowTranslationTest.scala.
>>>
>>> For more information on ProcessWindowFunction, you can refer to the
>>> official website.[1]
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>>>
 Hello

 I am implementing a data stream where I use sliding windows but I am
 stuck because I need to set values to my object based on some if statements
 in my process function  and send the object to the next step but I don't
 want results every time a window is creating

 if anyone has a good example on this that can help me

>>>


Job Manager killed by Kubernetes during recovery

2018-08-18 Thread Bruno Aranda
Hi,

I am experiencing an issue when a job manager is trying to recover using a
HA setup. When the job manager starts again and tries to resume from the
last checkpoints, it gets killed by Kubernetes (I guess), since I can see
the following in the logs while the jobs are deployed:

INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.

I am requesting enough memory for it, 3000Gi, and it is configured to use
2048Gb of memory. I have tried to increase the max perm size, but did not
see an improvement.

Any suggestions to help diagnose this?

I have the following:

Flink 1.6.0 (same with 1.5.1)
Azure AKS with Kubernetes 1.11
State management using RocksDB with checkpoints stored in Azure Data Lake

Thanks!

Bruno


Re: Error in KyroSerializer

2018-08-18 Thread Rong Rong
This sounds very much related to FLINK-10160 [1].
Would you mind upgrading your Flink version to 1.4.3 and try again?

Thanks,
Rong

[1] https://issues.apache.org/jira/browse/FLINK-10160

On Fri, Aug 17, 2018 at 4:20 PM Pankaj Chaudhary 
wrote:

> Hi,
>
> I am on Flink 1.4.2 and as part of my operator logic (i.e.
> RichFlatMapFunction) I am collecting the values in the Collector object.
>
> But I am getting an error stating “Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator”
>
> On debugging it looks like the root cause of this exception is in
> *KyroSerializer* where its try to do some copy operation. Can some one
> please let me know how I can get around this issue.
>
> *Below is the stack trace of the error*
>
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> ... 11 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -14
> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
> ... 17 more
>
> Regards,
> Pankaj.
>


Re: Flink not rolling log files

2018-08-18 Thread Gary Yao
Hi,

When using org.apache.log4j.rolling.RollingFileAppender, it is apparently
not
allowed to set:

log4j.appender.file.file=${log.file}

It works for me if I remove this property from the log4j.properties file.

Moreover, you have configured:

log4j.appender.file.RollingPolicy.FileNamePattern =
logs/log.%d{MMdd-HHmm}.log

This will create the log files in the "logs" directory relative to where you
start the Flink cluster. You may want to change FileNamePattern to an
absolute
path. Also note that the Flink default logging directory is "log" and not
"logs".

Best,
Gary


On Fri, Aug 17, 2018 at 8:28 PM, Navneet Kumar Pandey 
wrote:

> Hi Gary,
>
> Thanks for quick reply.
>
> Following is output of  "cat /usr/lib/flink/conf/log4j.properties"
>
> log4j.rootLogger=INFO,file
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.rolling.RollingFileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file
> log4j.appender.file.RollingPolicy = org.apache.log4j.rolling.
> TimeBasedRollingPolicy
> log4j.appender.file.RollingPolicy.FileNamePattern =
> logs/log.%d{MMdd-HHmm}.log
> log4j.logger.no = DEBUG
>
>
> and I double checked the log4j library is in the lib
>
> [hadoop@ip-XX lib]$ ls /usr/lib/flink/lib/
> apache-log4j-extras-1.2.17.jar  flink-metrics-datadog-1.4.2.jar
> flink-queryable-state-runtime_2.11-1.4.2.jar  log4j-1.2.17.jar
> flink-dist_2.11-1.4.2.jar   flink-python_2.11-1.4.2.jar
> flink-shaded-hadoop2-uber-1.4.2.jar   slf4j-log4j12-1.7.7.jar
>
> On Fri, Aug 17, 2018 at 5:15 PM, Gary Yao  wrote:
>
>> Hello Navneet Kumar Pandey,
>>
>> org.apache.log4j.rolling.RollingFileAppender is part of Apache Extras
>> Companion for Apache log4j [1]. Is that library in your classpath?
>>
>> Are there hints in taskmanager.err?
>>
>> Can you run:
>>
>> cat /usr/lib/flink/conf/log4j.properties
>>
>> on the EMR master node and show the output?
>>
>> For troubleshooting, you can also try org.apache.log4j.RollingFileAp
>> pender
>> which can roll the file if a certain size is exceeded. An example
>> configuration can be found here (I have not tested it):
>>
>> https://github.com/apache/flink/pull/5371/files
>>
>> Best,
>> Gary
>>
>> [1] https://logging.apache.org/log4j/extras/
>>
>>
>> On Fri, Aug 17, 2018 at 4:09 PM, Navneet Kumar Pandey 
>> wrote:
>>
>>> I am using Flink in EMR with following configuration.
>>>
>>>  {
>>>   "Classification": "flink-log4j",
>>>   "Properties": {
>>> "log4j.logger.no":"DEBUG",
>>> "log4j.appender.file":"org.apache.log4j.rolling.RollingFileA
>>> ppender",
>>> "log4j.appender.file.RollingPolicy.FileNamePattern":"logs/lo
>>> g.%d{MMdd-HHmm}.log",
>>> "log4j.appender.file.RollingPolicy":"org.apache.log4j.rollin
>>> g.TimeBasedRollingPolicy",
>>> "log4j.appender.file.append":"false",
>>> "log4j.appender.file.layout":"org.apache.log4j.PatternLayout
>>> ",
>>> "log4j.appender.file.layout.ConversionPattern":"%d{-MM-dd
>>> HH:mm:ss,SSS} %-5p %-60c %x - %m%n"
>>>
>>>   }
>>> }
>>>
>>> FYI this configuration get written into flink's log4j.properties.As you
>>> can see even after this setting taskmanager and jobmanager log files are
>>> not getting rolled.
>>>
>>> [hadoop@ip-XX ~]$ sudo ls -lh  /mnt/var/log/hadoop-yarn/conta
>>> iners/application_DDD_0002/container__0002_01_02
>>> total 7.0G
>>> -rw-r--r-- 1 yarn yarn 770K Aug 17 14:02 taskmanager.err
>>> -rw-r--r-- 1 yarn yarn 6.0G Aug 17 14:02 taskmanager.log
>>> -rw-r--r-- 1 yarn yarn 526K Aug 17 13:54 taskmanager.out
>>>
>>> Can somebody help me to give pointer about how to roll these log files?
>>> Note that these files are also being copied into s3.
>>>
>>>
>>
>>
>