What's the advantage of using BroadcastState?
Hi, AFAIK, the difference between a BroadcastStream and a normal DataStream is that the BroadcastStream is with a BroadcastState, but it seems that the functionality of BroadcastState can also be achieved by MapState in a CoMapFunction or something since the control stream is still broadcasted without being turned into BroadcastStream. So, I’m wondering what’s the advantage of using BroadcastState? Thanks a lot! Best Regards, Paul Lam
Re: processWindowFunction
hi Vino it is possible to use global window, then set the trigger onElement comparing the element that has arrived with for example 10 mins, 20 mins and 60 mins of data? I have rules evaluating sum of amount for 10,20 or 60 mins for the same keyed element if the same id sum like $200 total within those thresholds and count more or equals to 3 I need to be able to set some values to the object if the object does not reach those thresholds i do not set the values and keep sending the output with or without those value. just processing the object on the fly and send output El vie., 17 ago. 2018 a las 22:14, vino yang () escribió: > Hi antonio, > > Yes, ProcessWindowFunction is a very low level window function. > It allows you to access the data in the window and allows you to customize > the output of the window. > So if you use it, while giving you flexibility, you need to think about > other things, which may require you to write more processing logic. > > Generally speaking, sliding windows usually have some data that is > repeated, but a common mode is to apply a reduce function on it to get your > calculation results. > If you only send data, there will definitely be some duplication. > > Thanks, vino. > > antonio saldivar 于2018年8月17日周五 下午12:01写道: > >> Hi Vino >> thank you for the information, actually I am using a trigger alert and >> processWindowFunction to send my results, but when my window slides or ends >> it sends again the objects and I an getting duplicated data >> >> El jue., 16 ago. 2018 a las 22:05, vino yang () >> escribió: >> >>> Hi Antonio, >>> >>> What results do not you want to get when creating each window? >>> Examples of the use of ProcessWindowFunction are included in many test >>> files in Flink's project, such as SideOutputITCase.scala or >>> WindowTranslationTest.scala. >>> >>> For more information on ProcessWindowFunction, you can refer to the >>> official website.[1] >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction >>> >>> Thanks, vino. >>> >>> antonio saldivar 于2018年8月17日周五 上午6:24写道: >>> Hello I am implementing a data stream where I use sliding windows but I am stuck because I need to set values to my object based on some if statements in my process function and send the object to the next step but I don't want results every time a window is creating if anyone has a good example on this that can help me >>>
Job Manager killed by Kubernetes during recovery
Hi, I am experiencing an issue when a job manager is trying to recover using a HA setup. When the job manager starts again and tries to resume from the last checkpoints, it gets killed by Kubernetes (I guess), since I can see the following in the logs while the jobs are deployed: INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. I am requesting enough memory for it, 3000Gi, and it is configured to use 2048Gb of memory. I have tried to increase the max perm size, but did not see an improvement. Any suggestions to help diagnose this? I have the following: Flink 1.6.0 (same with 1.5.1) Azure AKS with Kubernetes 1.11 State management using RocksDB with checkpoints stored in Azure Data Lake Thanks! Bruno
Re: Error in KyroSerializer
This sounds very much related to FLINK-10160 [1]. Would you mind upgrading your Flink version to 1.4.3 and try again? Thanks, Rong [1] https://issues.apache.org/jira/browse/FLINK-10160 On Fri, Aug 17, 2018 at 4:20 PM Pankaj Chaudhary wrote: > Hi, > > I am on Flink 1.4.2 and as part of my operator logic (i.e. > RichFlatMapFunction) I am collecting the values in the Collector object. > > But I am getting an error stating “Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator” > > On debugging it looks like the root cause of this exception is in > *KyroSerializer* where its try to do some copy operation. Can some one > please let me know how I can get around this issue. > > *Below is the stack trace of the error* > > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > ... 11 more > Caused by: java.lang.ArrayIndexOutOfBoundsException: -14 > at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) > ... 17 more > > Regards, > Pankaj. >
Re: Flink not rolling log files
Hi, When using org.apache.log4j.rolling.RollingFileAppender, it is apparently not allowed to set: log4j.appender.file.file=${log.file} It works for me if I remove this property from the log4j.properties file. Moreover, you have configured: log4j.appender.file.RollingPolicy.FileNamePattern = logs/log.%d{MMdd-HHmm}.log This will create the log files in the "logs" directory relative to where you start the Flink cluster. You may want to change FileNamePattern to an absolute path. Also note that the Flink default logging directory is "log" and not "logs". Best, Gary On Fri, Aug 17, 2018 at 8:28 PM, Navneet Kumar Pandey wrote: > Hi Gary, > > Thanks for quick reply. > > Following is output of "cat /usr/lib/flink/conf/log4j.properties" > > log4j.rootLogger=INFO,file > > # Log all infos in the given file > log4j.appender.file=org.apache.log4j.rolling.RollingFileAppender > log4j.appender.file.file=${log.file} > log4j.appender.file.append=false > log4j.appender.file.layout=org.apache.log4j.PatternLayout > log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} > %-5p %-60c %x - %m%n > > # suppress the irrelevant (wrong) warnings from the netty channel handler > log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file > log4j.appender.file.RollingPolicy = org.apache.log4j.rolling. > TimeBasedRollingPolicy > log4j.appender.file.RollingPolicy.FileNamePattern = > logs/log.%d{MMdd-HHmm}.log > log4j.logger.no = DEBUG > > > and I double checked the log4j library is in the lib > > [hadoop@ip-XX lib]$ ls /usr/lib/flink/lib/ > apache-log4j-extras-1.2.17.jar flink-metrics-datadog-1.4.2.jar > flink-queryable-state-runtime_2.11-1.4.2.jar log4j-1.2.17.jar > flink-dist_2.11-1.4.2.jar flink-python_2.11-1.4.2.jar > flink-shaded-hadoop2-uber-1.4.2.jar slf4j-log4j12-1.7.7.jar > > On Fri, Aug 17, 2018 at 5:15 PM, Gary Yao wrote: > >> Hello Navneet Kumar Pandey, >> >> org.apache.log4j.rolling.RollingFileAppender is part of Apache Extras >> Companion for Apache log4j [1]. Is that library in your classpath? >> >> Are there hints in taskmanager.err? >> >> Can you run: >> >> cat /usr/lib/flink/conf/log4j.properties >> >> on the EMR master node and show the output? >> >> For troubleshooting, you can also try org.apache.log4j.RollingFileAp >> pender >> which can roll the file if a certain size is exceeded. An example >> configuration can be found here (I have not tested it): >> >> https://github.com/apache/flink/pull/5371/files >> >> Best, >> Gary >> >> [1] https://logging.apache.org/log4j/extras/ >> >> >> On Fri, Aug 17, 2018 at 4:09 PM, Navneet Kumar Pandey >> wrote: >> >>> I am using Flink in EMR with following configuration. >>> >>> { >>> "Classification": "flink-log4j", >>> "Properties": { >>> "log4j.logger.no":"DEBUG", >>> "log4j.appender.file":"org.apache.log4j.rolling.RollingFileA >>> ppender", >>> "log4j.appender.file.RollingPolicy.FileNamePattern":"logs/lo >>> g.%d{MMdd-HHmm}.log", >>> "log4j.appender.file.RollingPolicy":"org.apache.log4j.rollin >>> g.TimeBasedRollingPolicy", >>> "log4j.appender.file.append":"false", >>> "log4j.appender.file.layout":"org.apache.log4j.PatternLayout >>> ", >>> "log4j.appender.file.layout.ConversionPattern":"%d{-MM-dd >>> HH:mm:ss,SSS} %-5p %-60c %x - %m%n" >>> >>> } >>> } >>> >>> FYI this configuration get written into flink's log4j.properties.As you >>> can see even after this setting taskmanager and jobmanager log files are >>> not getting rolled. >>> >>> [hadoop@ip-XX ~]$ sudo ls -lh /mnt/var/log/hadoop-yarn/conta >>> iners/application_DDD_0002/container__0002_01_02 >>> total 7.0G >>> -rw-r--r-- 1 yarn yarn 770K Aug 17 14:02 taskmanager.err >>> -rw-r--r-- 1 yarn yarn 6.0G Aug 17 14:02 taskmanager.log >>> -rw-r--r-- 1 yarn yarn 526K Aug 17 13:54 taskmanager.out >>> >>> Can somebody help me to give pointer about how to roll these log files? >>> Note that these files are also being copied into s3. >>> >>> >> >> >