Re: Timers and Checkpoints
iggerCheckpointOnBarrier(StreamTask.java:543) >> ... 8 more >> Caused by: java.lang.Exception: Could not write timer service of Aggregator >> -> Sink: HBase (1/1) to checkpoint state stream. >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:438) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:98) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:385) >> ... 13 more >> Caused by: java.lang.NullPointerException >> at >> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(HeapInternalTimerService.java:304) >> at >> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:121) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) >> ... 15 more >> >> >> >> On Fri, May 25, 2018 at 3:11 PM Timo Walther wrote: >> >>> Hi Alberto, >>> >>> do you get exactly the same exception? Maybe you can share some logs >>> with us? >>> >>> Regards, >>> Timo >>> >>> Am 25.05.18 um 13:41 schrieb Alberto Mancini: >>> > Hello, >>> > I think we are experiencing this issue: >>> > https://issues.apache.org/jira/browse/FLINK-6291 >>> > >>> > In fact we have a long running job that is unable to complete a >>> > checkpoint and so we are unable to create a savepoint. >>> > >>> > I do not really understand from 6291 how the timer service has been >>> > removed in my job and mostly i do not find how i can let my job to >>> > create a savepoint. >>> > We are using flink 1.3.2. >>> > >>> > Thanks, >>> >Alberto. >>> > >>> >>> -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Providing Custom Serializer for Generic Type
Hi Gordon, thank you. The involved data structure is a complex abstraction owning a schema and values, it declares private fields which should not be edited directly from users. I'd say it's really akin to an Avro GenericRecord. How would you approach the problem if you have to serialize/deserialize efficiently an Avro GenericRecord? I think it cannot be a POJO and ser/de using avro brings so much overhead described also at [1]. Thank you really much for your help. Andrea [1] - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html Il giorno gio 4 lug 2019 alle ore 11:23 Tzu-Li (Gordon) Tai < tzuli...@apache.org> ha scritto: > Hi Andrea, > > Is there a specific reason you want to use a custom TypeInformation / > TypeSerializer for your type? > From the description in the original post, this part wasn't clear to me. > > If the only reason is because it is generally suggested to avoid generic > type serialization via Kryo, both for performance reasons as well as > evolvability in the future, then updating your type to be recognized by > Flink as one of the supported types [1] would be enough. > Otherwise, implementing your own type information and serializer is > usually only something users with very specific use cases might be required > to do. > Since you are also using that type as managed state, for a safer schema > evolvability story in the future, I would recommend either Avro or Pojo as > Jingsong Lee had already mentioned. > > Cheers, > Gordon > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#flinks-typeinformation-class > > On Thu, Jul 4, 2019 at 5:08 PM Andrea Spina > wrote: > >> Hi JingsongLee, thank you for your answer. >> I wanted to explore it as the last chance honestly. Anyway if defining >> custom serializers and types information involves quite a big effort, I >> would reconsider my guess. >> >> Cheers, >> >> Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee < >> lzljs3620...@aliyun.com> ha scritto: >> >>> Hi Andrea: >>> Why not make your *MyClass* POJO? [1] If it is a POJO, then flink >>> will use PojoTypeInfo and PojoSerializer that have a good >>> implementation already. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#rules-for-pojo-types >>> >>> Best, JingsongLee >>> >>> -- >>> From:Andrea Spina >>> Send Time:2019年7月4日(星期四) 14:37 >>> To:user >>> Subject:Providing Custom Serializer for Generic Type >>> >>> Dear community, >>> in my job, I run with a custom event type *MyClass* which is a sort of >>> "generic event" that I handle all along my streaming flow both as an event >>> (DataStream[MyClass]) and as a managed state. >>> >>> I see that Flink warns me about generic serialization of >>> *MyClass* >>> INFO [run-main-0] (TypeExtractor.java:1818) - class >>> io.radicalbit.MyClass does not contain a setter for field >>> io$radicalbit$MyClass$$schema >>> INFO [run-main-0] (TypeExtractor.java:1857) - Class class >>> io.radicalbit.MyClass cannot be used as a POJO type because not all fields >>> are valid POJO fields, and must be processed as GenericType. Please read >>> the Flink documentation on "Data Types & Serialization" for details of the >>> effect on performance. >>> INFO [run-main-0] (TypeExtractor.java:1818) - class >>> io.radicalbit.MyClass does not contain a setter for field >>> io$radicalbit$MyClass$schema >>> >>> So that I wanted to provide my custom serializer for MyClass, trying >>> first to register the Java one to check if the system recognizes it so I >>> followed [1] but it seems that it is not considered. >>> >>> I read then about [2] (the case is way akin to mine) and AFAIU I need to >>> implement a custom TypeInformation and TypeSerializer for my class as >>> suggested in [3] because Flink will ignore my registered serializer as long >>> as it considers my type as *generic*. >>> >>> config.registerTypeWithKryoSerializer(classOf[MyClass], >>> classOf[RadicalSerde]) >>> >>> >>> My question finally is: Do I need to provide this custom classes? Is >>> there any practical example for creating custom information like the above >>> mentioned? I have had a quick preliminary look at it but seems that I need &g
Re: Providing Custom Serializer for Generic Type
Hi JingsongLee, thank you for your answer. I wanted to explore it as the last chance honestly. Anyway if defining custom serializers and types information involves quite a big effort, I would reconsider my guess. Cheers, Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee ha scritto: > Hi Andrea: > Why not make your *MyClass* POJO? [1] If it is a POJO, then flink > will use PojoTypeInfo and PojoSerializer that have a good > implementation already. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#rules-for-pojo-types > > Best, JingsongLee > > -- > From:Andrea Spina > Send Time:2019年7月4日(星期四) 14:37 > To:user > Subject:Providing Custom Serializer for Generic Type > > Dear community, > in my job, I run with a custom event type *MyClass* which is a sort of > "generic event" that I handle all along my streaming flow both as an event > (DataStream[MyClass]) and as a managed state. > > I see that Flink warns me about generic serialization of > *MyClass* > INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass > does not contain a setter for field io$radicalbit$MyClass$$schema > INFO [run-main-0] (TypeExtractor.java:1857) - Class class > io.radicalbit.MyClass cannot be used as a POJO type because not all fields > are valid POJO fields, and must be processed as GenericType. Please read > the Flink documentation on "Data Types & Serialization" for details of the > effect on performance. > INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass > does not contain a setter for field io$radicalbit$MyClass$schema > > So that I wanted to provide my custom serializer for MyClass, trying first > to register the Java one to check if the system recognizes it so I followed > [1] but it seems that it is not considered. > > I read then about [2] (the case is way akin to mine) and AFAIU I need to > implement a custom TypeInformation and TypeSerializer for my class as > suggested in [3] because Flink will ignore my registered serializer as long > as it considers my type as *generic*. > > config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde]) > > > My question finally is: Do I need to provide this custom classes? Is there > any practical example for creating custom information like the above > mentioned? I have had a quick preliminary look at it but seems that I need > to provide a non-trivial amount of information to TypeInformation and > TypeSerializer interfaces. > > Thank you for your excellent work and help. > > Cheers. > > [1] - > https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html > [2] - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html > [3] - > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory > -- > Andrea Spina > Head of R @ Radicalbit Srl > Via Giovanni Battista Pirelli 11, 20124, Milano - IT > > > -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Providing Custom Serializer for Generic Type
Dear community, in my job, I run with a custom event type *MyClass* which is a sort of "generic event" that I handle all along my streaming flow both as an event (DataStream[MyClass]) and as a managed state. I see that Flink warns me about generic serialization of *MyClass* INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$$schema INFO [run-main-0] (TypeExtractor.java:1857) - Class class io.radicalbit.MyClass cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does not contain a setter for field io$radicalbit$MyClass$schema So that I wanted to provide my custom serializer for MyClass, trying first to register the Java one to check if the system recognizes it so I followed [1] but it seems that it is not considered. I read then about [2] (the case is way akin to mine) and AFAIU I need to implement a custom TypeInformation and TypeSerializer for my class as suggested in [3] because Flink will ignore my registered serializer as long as it considers my type as *generic*. config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde]) My question finally is: Do I need to provide this custom classes? Is there any practical example for creating custom information like the above mentioned? I have had a quick preliminary look at it but seems that I need to provide a non-trivial amount of information to TypeInformation and TypeSerializer interfaces. Thank you for your excellent work and help. Cheers. [1] - https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html [2] - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html [3] - https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: HDFS checkpoints for rocksDB state backend:
HI Qiu, my jar does not contain the class `org.apache.hadoop.hdfs.protocol.HdfsConstants*`, *but I do expect it is contained within `flink-shaded-hadoop2-uber-1.6.4.jar` which is located in Flink cluster libs. Il giorno gio 27 giu 2019 alle ore 04:03 Congxian Qiu < qcx978132...@gmail.com> ha scritto: > Hi Andrea > > As the NoClassDefFoundError, could you please verify that there exist > `org.apache.hadoop.hdfs.protocol.HdfsConstants*` *in your jar. > Or could you use Arthas[1] to check if there exists the class when running > the job? > > [1] https://github.com/alibaba/arthas > Best, > Congxian > > > Andrea Spina 于2019年6月27日周四 上午1:57写道: > >> Dear community, >> I'm trying to use HDFS checkpoints in flink-1.6.4 with the following >> configuration >> >> state.backend: rocksdb >> state.checkpoints.dir: hdfs:// >> rbl1.stage.certilogo.radicalbit.io:8020/flink/checkpoint >> state.savepoints.dir: hdfs:// >> rbl1.stage.certilogo.radicalbit.io:8020/flink/savepoints >> >> and I record the following exceptions >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *Caused by: java.io.IOException: Could not flush and close the file >> system output stream to >> hdfs://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a >> <http://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a> >> in order to obtain the stream state handleat >> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328) >> at >> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759) >> at >> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)at >> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) >> ... 7 moreCaused by: java.io.IOException: DataStreamer Exception: >> at >> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused >> by: java.lang.NoClassDefFoundError: Could not initialize class >> org.apache.hadoop.hdfs.protocol.HdfsConstantsat >> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318) >> at >> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262) >> at >> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)* >> >> or >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)at >> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) >> ... 7 moreCaused by: java.io.IOException: DataStreamer Exception: >> at >> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused >> by: javax.xml.parsers.FactoryConfigurationError: Provider for class >> javax.xml.parsers.DocumentBuilderFactory cannot be createdat >> javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311) >> at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267) >> at >> javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120) >> at >> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2515) >> at >> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492) >> at >> org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405) >> at org.apache.hadoop.conf.Configuration.get(Configuration.java:981) >> at >> org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031) >> at >> org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251) >> at >> org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76) >>
HDFS checkpoints for rocksDB state backend:
Dear community, I'm trying to use HDFS checkpoints in flink-1.6.4 with the following configuration state.backend: rocksdb state.checkpoints.dir: hdfs:// rbl1.stage.certilogo.radicalbit.io:8020/flink/checkpoint state.savepoints.dir: hdfs:// rbl1.stage.certilogo.radicalbit.io:8020/flink/savepoints and I record the following exceptions *Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a <http://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a> in order to obtain the stream state handleat org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328) at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 moreCaused by: java.io.IOException: DataStreamer Exception: at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hdfs.protocol.HdfsConstantsat org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)* or * at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 moreCaused by: java.io.IOException: DataStreamer Exception: at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused by: javax.xml.parsers.FactoryConfigurationError: Provider for class javax.xml.parsers.DocumentBuilderFactory cannot be createdat javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311) at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267) at javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2515) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405) at org.apache.hadoop.conf.Configuration.get(Configuration.java:981) at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031) at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251) at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)* In my lib folder I have the uber jar about hdfs as usual but I am not able to let the Job checkpointing its state correctly. I read also here [1] but is not helping. Thank you for the precious help [1] - https://www.cnblogs.com/chendapao/p/9170566.html -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Process Function's timers "postponing"
Hi Yun, thank you so much. That was an idea, I wanted to avoid to store an additional state for it. In the end, I went for coalescing as documentation suggested so that I will have just one timer per interval. What I didn't catch initially from the documentation is that* for a determined key and a determined timestamp Flink will retain just one timer, i.e. if I set two timers to trigger at the same time T, Flink will trigger the timer once.* I accept then to have at least one coalesced timer per interval. Thank you again for your support! Il giorno mar 25 giu 2019 alle ore 19:14 Yun Tang ha scritto: > If you are using processing time, one possible way is to track last > registered in another ValueState. And you could call > #deleteProcessingTimeTimer(time) when you register new timer and found > previous timer which stored in ValueState has smaller timestamp(T1) than > current time (T2). After delete that processing timer, T1 would not trigger > any action. You could refer to [1] and its usage for similar ideas. > > > [1] > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/CleanupState.java > > ------ > *From:* Andrea Spina > *Sent:* Tuesday, June 25, 2019 23:40 > *To:* Yun Tang > *Cc:* user > *Subject:* Re: Process Function's timers "postponing" > > Hi Yun, thank you for your answer. I'm not sure I got your point. My > question is: > for the same key K, I process two records R1 at t1 and R2 at t2. > When I process R1, I set a timer to be triggered at T1 which is > t2 > When I process R2, I set a timer to be triggered at T2 which is > T1, but > in order to do that, I want to remove the previous timer T1 in order to > "postpone" the triggering. > > In other words, I would like for a single key to be active just one-timer > and if a new timer is requested the old one should be deleted. > > Thank you, > > Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang ha > scritto: > > Hi Andrea > > If my understanding is correct, you just want to know when the eventual > timer would be deleted. When you register your timer into > 'processingTimeTimersQueue' (where your timer stored) at [1], the > 'SystemProcessingTimeService' would then schedule a runnable TriggerTask > after the "postpone" delay at [2]. When the scheduled runnable is > triggered, it would poll from the 'processingTimeTimersQueue' [3] which > means the timer would finally be removed. Hope this could help you. > > Best > Yun Tang > > [1] > https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208 > [2] > https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121 > [3] > https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237 > > <https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237> > > -- > *From:* Andrea Spina > *Sent:* Tuesday, June 25, 2019 2:06 > *To:* user > *Subject:* Process Function's timers "postponing" > > Dear Community, > I am using Flink (processing-time) timers along with a Process Function. > What I would like to do is to "postpone" eventually registered timers for > the given key: I would like to do it since I might process plenty of events > in a row (think about it as a session) so that I will able to trigger the > computation "just after" this session somehow stops. > > I wondered about deleting eventual existing timers but AFAIU I need to > know the previous timer triggering time, which I guess is not possible for > me since I use processing-time timers. > > I read also [1] but I am not really able to understand if it comes handy > to me; for instance, I don't understand what "Since Flink maintains only > one timer per key and timestamp...". Does this imply that a new PT timer > will automatically overwrite an eventual previously existing one? > > Thank you for your precious help, > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing > -- > *Andrea Spina* > Head of R @ Radicalbit Srl > Via Giovanni Battista Pirelli 11, 20124, Milano - IT > > > > -- > *Andrea Spina* > Head of R @ Radicalbit Srl > Via Giovanni Battista Pirelli 11, 20124, Milano - IT > -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Process Function's timers "postponing"
Hi Yun, thank you for your answer. I'm not sure I got your point. My question is: for the same key K, I process two records R1 at t1 and R2 at t2. When I process R1, I set a timer to be triggered at T1 which is > t2 When I process R2, I set a timer to be triggered at T2 which is > T1, but in order to do that, I want to remove the previous timer T1 in order to "postpone" the triggering. In other words, I would like for a single key to be active just one-timer and if a new timer is requested the old one should be deleted. Thank you, Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang ha scritto: > Hi Andrea > > If my understanding is correct, you just want to know when the eventual > timer would be deleted. When you register your timer into > 'processingTimeTimersQueue' (where your timer stored) at [1], the > 'SystemProcessingTimeService' would then schedule a runnable TriggerTask > after the "postpone" delay at [2]. When the scheduled runnable is > triggered, it would poll from the 'processingTimeTimersQueue' [3] which > means the timer would finally be removed. Hope this could help you. > > Best > Yun Tang > > [1] > https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208 > [2] > https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121 > [3] > https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237 > > <https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237> > > -- > *From:* Andrea Spina > *Sent:* Tuesday, June 25, 2019 2:06 > *To:* user > *Subject:* Process Function's timers "postponing" > > Dear Community, > I am using Flink (processing-time) timers along with a Process Function. > What I would like to do is to "postpone" eventually registered timers for > the given key: I would like to do it since I might process plenty of events > in a row (think about it as a session) so that I will able to trigger the > computation "just after" this session somehow stops. > > I wondered about deleting eventual existing timers but AFAIU I need to > know the previous timer triggering time, which I guess is not possible for > me since I use processing-time timers. > > I read also [1] but I am not really able to understand if it comes handy > to me; for instance, I don't understand what "Since Flink maintains only > one timer per key and timestamp...". Does this imply that a new PT timer > will automatically overwrite an eventual previously existing one? > > Thank you for your precious help, > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing > -- > *Andrea Spina* > Head of R @ Radicalbit Srl > Via Giovanni Battista Pirelli 11, 20124, Milano - IT > -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Process Function's timers "postponing"
Dear Community, I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops. I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers. I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one? Thank you for your precious help, [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Linkage Error RocksDB and flink-1.6.4
Hi Shu Su, the first point exactly pinpointed the issue I bumped into. I forgot to put that dependency to "provided". Thank you! Il giorno lun 24 giu 2019 alle ore 05:35 Shu Su ha scritto: > Hi Andrea > > Actually It’s caused by Flink’s ClassLoader. It’s because flink use > parent Classloader to load jar first and then you use it in your user’s > code, then user-code classloader will load it again so it raised the error. > There are two solutions. > 1. Add scope “provided” to maven pom.xml > > org.apache.flink > flink-statebackend-rocksdb_2.11 > ${flink_version} > *provided* > > 2. Set this classloader.resolve-order: parent-first in flink-conf.yml > > Hope this will help you. > > Thanks, > Simon > On 06/24/2019 11:27,Yun Tang wrote: > > Hi Andrea > > Since I have not written Scala for a while, I wonder why you need to > instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions > on JM side. As far as I can see, you could instantiate your on your TM side > like code: > > val rocksdbConfig = new OptionsFactory() { > override def createDBOptions(currentOptions: DBOptions): DBOptions = > currentOptions.setIncreaseParallelism(properties.threadNo) > > override def createColumnOptions(currentOptions: ColumnFamilyOptions): > ColumnFamilyOptions = > > currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize)) > } > > You just need to serialize the properties via closure to TMs. Hope this could > help you. > > Best > Yun Tang > -- > *From:* Andrea Spina > *Sent:* Monday, June 24, 2019 2:20 > *To:* user > *Subject:* Linkage Error RocksDB and flink-1.6.4 > > Dear community, > I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. > At the job Startp the following exception happens (it's recorded by the Job > Manager). > > > > > > > > > > > > *Caused by: java.lang.LinkageError: loader constraint violation: loader > (instance of > org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) > previously initiated loading for a different type with name > "org/rocksdb/DBOptions" at > java.lang.ClassLoader.defineClass1(Native Method) at > java.lang.ClassLoader.defineClass(ClassLoader.java:763) at > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) > at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at > java.net.URLClassLoader$1.run(URLClassLoader.java:369) at > java.net.URLClassLoader$1.run(URLClassLoader.java:363) at > java.security.AccessController.doPrivileged(Native Method) at > java.net.URLClassLoader.findClass(URLClassLoader.java:362) at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)* > > For this job, I programmatically set some RocksDB options by using the > code appended below. Anybody can help with this? Thank you so much, > Andrea > > > import org.apache.flink.configuration.MemorySize > import org.apache.flink.contrib.streaming.state.{OptionsFactory, > PredefinedOptions, RocksDBStateBackend} > import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions} > > object ConfigurableRocksDB { > > lazy val columnOptions = new ColumnFamilyOptions() with Serializable > lazy val tableConfig = new BlockBasedTableConfig() with Serializable > lazy val dbOptions = new DBOptions() with Serializable > > def configureStateBackendRocksDB(properties: FlinkDeployment): > RocksDBStateBackend = { > properties.threadNo.foreach(dbOptions.setIncreaseParallelism) > > properties.blockSize.foreach(bs => > tableConfig.setBlockSize(MemorySize.parseBytes(bs))) > properties.cacheSize.foreach(cs => > tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs))) > properties.cacheIndicesAndFilters.foreach(cif => if (cif) > tableConfig.cacheIndexAndFilterBlocks()) > properties.writeBufferSize.foreach(wbs => > columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs))) > > columnOptions.setTableFormatConfig(tableConfig) > properties.writeBufferToMerge.foreach(bm => > columnOptions.setMinWriteBufferNumberToMerge(bm)) > properties.writeBufferCount.foreach(bc => > columnOptions.setMaxWriteBufferNumber(bc)) > properties.optimizeFilterForHits.foreach(op => if (op)
Linkage Error RocksDB and flink-1.6.4
Dear community, I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At the job Startp the following exception happens (it's recorded by the Job Manager). *Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) previously initiated loading for a different type with name "org/rocksdb/DBOptions"at java.lang.ClassLoader.defineClass1(Native Method)at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74)at java.net.URLClassLoader$1.run(URLClassLoader.java:369)at java.net.URLClassLoader$1.run(URLClassLoader.java:363)at java.security.AccessController.doPrivileged(Native Method)at java.net.URLClassLoader.findClass(URLClassLoader.java:362)at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)* For this job, I programmatically set some RocksDB options by using the code appended below. Anybody can help with this? Thank you so much, Andrea import org.apache.flink.configuration.MemorySize import org.apache.flink.contrib.streaming.state.{OptionsFactory, PredefinedOptions, RocksDBStateBackend} import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions} object ConfigurableRocksDB { lazy val columnOptions = new ColumnFamilyOptions() with Serializable lazy val tableConfig = new BlockBasedTableConfig() with Serializable lazy val dbOptions = new DBOptions() with Serializable def configureStateBackendRocksDB(properties: FlinkDeployment): RocksDBStateBackend = { properties.threadNo.foreach(dbOptions.setIncreaseParallelism) properties.blockSize.foreach(bs => tableConfig.setBlockSize(MemorySize.parseBytes(bs))) properties.cacheSize.foreach(cs => tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs))) properties.cacheIndicesAndFilters.foreach(cif => if (cif) tableConfig.cacheIndexAndFilterBlocks()) properties.writeBufferSize.foreach(wbs => columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs))) columnOptions.setTableFormatConfig(tableConfig) properties.writeBufferToMerge.foreach(bm => columnOptions.setMinWriteBufferNumberToMerge(bm)) properties.writeBufferCount.foreach(bc => columnOptions.setMaxWriteBufferNumber(bc)) properties.optimizeFilterForHits.foreach(op => if (op) columnOptions.optimizeFiltersForHits()) val rocksdbConfig = new OptionsFactory() { override def createDBOptions(currentOptions: DBOptions): DBOptions = dbOptions override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = columnOptions } val stateBE = new RocksDBStateBackend(properties.checkpointDir.get, properties.checkpointIncremental.getOrElse(false)) stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) stateBE.setOptions(rocksdbConfig) stateBE } } -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Flink sent/received bytes related metrics: how are they calculated?
Hi Chesnay, just one downstream: Sin (Source: Enriched Code) outcome is the right part of the following operator as in the figure; this operator is the exclusive downstream of Sin. Thanks, [image: Screenshot 2019-06-14 at 13.52.52.png] Il giorno ven 14 giu 2019 alle ore 12:23 Chesnay Schepler < ches...@apache.org> ha scritto: > How does the *P1 *pipeline look like? Are there 2 downstream operators > reading from *Sin* (in this case the number of bytes would be measured > twice)? > > On 14/06/2019 12:09, Andrea Spina wrote: > > Sorry, I totally missed the version: flink-1.6.4, Streaming API > > Il giorno ven 14 giu 2019 alle ore 11:08 Chesnay Schepler < > ches...@apache.org> ha scritto: > >> Which version of Flink are you using? There were some issues at some >> point about double-counting. >> >> On 14/06/2019 09:49, Andrea Spina wrote: >> >> Dear Community, I'd like to ask for some details about bytes related >> metrics in Flink. Precisely, I'm looking at *bytes sent* and *bytes >> received *metrics: what I am recording is the following: >> >> I am reading from a Kafka topic *A* records with schema *K* using a >> source *Sin *belonging to a pipeline *P1*. The topic *A* is filled with >> *K* data using a separated pipeline *p2* and a sink *Sout*. >> >> Basically what I register is that the *bytes sent* metric of *Sin* - >> available >> through the Flink UI -measure twice or more respect of the *bytes >> received* of *Sout. *For instance, if my sink Sout records 10GB of *bytes >> received* inbound, then my source Sin emits between 20-25GB *bytes sent*. >> >> [image: Screenshot 2019-06-14 at 09.40.08.png] >> >> Is someone able to detail how these two metrics are calculated? >> >> Thank you, >> >> -- >> *Andrea Spina* >> Software Engineer @ Radicalbit Srl >> Via Borsieri 41, 20159, Milano - IT >> >> >> > > -- > *Andrea Spina* > Head of R @ Radicalbit Srl > Via Giovanni Battista Pirelli 11, 20124, Milano - IT > > > -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Flink sent/received bytes related metrics: how are they calculated?
Sorry, I totally missed the version: flink-1.6.4, Streaming API Il giorno ven 14 giu 2019 alle ore 11:08 Chesnay Schepler < ches...@apache.org> ha scritto: > Which version of Flink are you using? There were some issues at some point > about double-counting. > > On 14/06/2019 09:49, Andrea Spina wrote: > > Dear Community, I'd like to ask for some details about bytes related > metrics in Flink. Precisely, I'm looking at *bytes sent* and *bytes > received *metrics: what I am recording is the following: > > I am reading from a Kafka topic *A* records with schema *K* using a > source *Sin *belonging to a pipeline *P1*. The topic *A* is filled with > *K* data using a separated pipeline *p2* and a sink *Sout*. > > Basically what I register is that the *bytes sent* metric of *Sin* - available > through the Flink UI -measure twice or more respect of the *bytes > received* of *Sout. *For instance, if my sink Sout records 10GB of *bytes > received* inbound, then my source Sin emits between 20-25GB *bytes sent*. > > [image: Screenshot 2019-06-14 at 09.40.08.png] > > Is someone able to detail how these two metrics are calculated? > > Thank you, > > -- > *Andrea Spina* > Software Engineer @ Radicalbit Srl > Via Borsieri 41, 20159, Milano - IT > > > -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Flink sent/received bytes related metrics: how are they calculated?
Dear Community, I'd like to ask for some details about bytes related metrics in Flink. Precisely, I'm looking at *bytes sent* and *bytes received *metrics: what I am recording is the following: I am reading from a Kafka topic *A* records with schema *K* using a source *Sin *belonging to a pipeline *P1*. The topic *A* is filled with *K* data using a separated pipeline *p2* and a sink *Sout*. Basically what I register is that the *bytes sent* metric of *Sin* - available through the Flink UI -measure twice or more respect of the *bytes received* of *Sout. *For instance, if my sink Sout records 10GB of *bytes received* inbound, then my source Sin emits between 20-25GB *bytes sent*. [image: Screenshot 2019-06-14 at 09.40.08.png] Is someone able to detail how these two metrics are calculated? Thank you, -- *Andrea Spina* Software Engineer @ Radicalbit Srl Via Borsieri 41, 20159, Milano - IT
Re: Flink 1.7.2: All jobs are getting deployed on the same task manager
Hi everybody. We're currently experimenting the same characteristic on flink-1.6.2. I've been reading that Flink treats all the slot as equals, it doesn't even know where these slots reside https://stackoverflow.com/questions/54980104/uneven-assignment-of-tasks-to-workers-in-flink. So it should not be an issue; thus, the fact that it runs all the slots of a machine before moving to a new one should be just a rough coincidence. Given that, I'm pretty sure that I've never been recording this feature using previous majors (I recall flink-1.3 for sure). Moreover, this is damaging because you can get resources exhausted (e.g. memory, disk). Hope we might find a solution on this. Sincerely, Andrea Il giorno lun 18 mar 2019 alle ore 11:53 Kumar Bolar, Harshith < hk...@arity.com> ha scritto: > Hi all, > > > > We're running a Flink on a five node standalone cluster with three task > manager (TM1, TM2, TM3) and two job managers. > > > > Whenever I submit a new job, the job gets deployed on only TM3. When the > number of slots in TM3 get exhausted, the jobs start getting deployed on > TM2 and so on. How do I ensure that the jobs get distributed evenly across > all 3 task managers? > > > > Thanks, > > Harshith > > > -- *Andrea Spina* Software Engineer @ Radicalbit Srl Via Borsieri 41, 20159, Milano - IT
Re: Flink 1.3.2 RocksDB map segment fail if configured as state backend
Hi Stefan, thank you. You were right. It was an unexpected permits issue. Thank you again. 2018-09-17 16:50 GMT+02:00 Stefan Richter : > Hi, > > I think the exception pretty much says what is wrong, the native library > cannot be mapped into the process because of some access rights problem. > Please make sure that your path /tmp has the exec right. > > Best, > Stefan > > > Am 17.09.2018 um 11:37 schrieb Andrea Spina : > > Hi everybody, > > I run with a Flink 1.3.2 installation on a Red Hat Enterprise Linux Server > and I'm not able to set rocksdb as state.backend due to this error whenever > I try to deploy any job: > > *java.lang.IllegalStateException: Could not initialize keyed state > backend.* > *at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)* > *at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)* > *at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:678)* > *at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:666)* > *at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)* > *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)* > *at java.lang.Thread.run(Thread.java:748)* > *Caused by: java.io.IOException: Could not load the native RocksDB library* > *at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:560)* > *at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:298)* > *at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:756)* > *at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)* > *... 6 more* > *Caused by: java.lang.UnsatisfiedLinkError: > /tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so: > /tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so: > failed to map segment from shared object: Operation not permitted* > *at java.lang.ClassLoader$NativeLibrary.load(Native Method)* > *at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)* > *at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)* > *at java.lang.Runtime.load0(Runtime.java:809)* > *at java.lang.System.load(System.java:1086)* > *at > org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)* > *at > org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)* > *at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:537)* > *... 9 more* > > Machine detail > > *NAME="Red Hat Enterprise Linux Server"* > *VERSION="7.5 (Maipo)"* > *ID="rhel"* > *ID_LIKE="fedora"* > > Cpu architecture (cat /proc/cpuinfo) > > *processor: 0* > *vendor_id: GenuineIntel* > *cpu family: 6* > *model: 45* > *model name: Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz* > *stepping: 2* > *microcode: 0x710* > *cpu MHz: 2200.000* > *cache size: 20480 KB* > *physical id: 0* > *siblings: 4* > *core id: 0* > *cpu cores: 4* > *apicid: 0* > *initial apicid: 0* > *fpu: yes* > *fpu_exception: yes* > *cpuid level: 13* > *wp: yes* > *flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca > cmov pat pse36 clflush dts mmx fxsr sse sse2 ss ht syscall nx rdtscp lm > constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc > aperfmperf pni pclmulqdq ssse3 cx16 pcid sse4_1 sse4_2 x2apic popcnt aes > xsave avx hypervisor lahf_lm epb dtherm ida arat pln pts* > *bogomips: 4400.00* > *clflush size: 64* > *cache_alignment: 64* > *address sizes: 40 bits physical, 48 bits virtual* > > I found similar errors related to different systems [1] and similar > problems with rocksdb related to endianness [2], but mine sounds different. > Since the upgrade to newer Flink version atm might be painful, are there > any reason behind this exception, and is a workaround existing? > > Thank you so much, > > Andrea > > [1] - https://communities.ca.com/thread/241773398-em-failed-to-start- > tmplibrocksdbjni8883977260053861907so-failed-to-map-segment- > from-shared-object-operation-not-permitted > > [2] - http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/Unable-to-use-Flink-RocksDB-state-backend- > due-to-endianness-mismatch-td13473.html > > -- > *Andrea Spina* > Software Engineer @ Radicalbit Srl > Via Borsieri 41, 20159, Milano > <https://maps.google.com/?q=Via+Borsieri+41,+20159,+Milano=gmail=g> > - IT > > > -- *Andrea Spina* Software Engineer @ Radicalbit Srl Via Borsieri 41, 20159, Milano - IT
Flink 1.3.2 RocksDB map segment fail if configured as state backend
Hi everybody, I run with a Flink 1.3.2 installation on a Red Hat Enterprise Linux Server and I'm not able to set rocksdb as state.backend due to this error whenever I try to deploy any job: *java.lang.IllegalStateException: Could not initialize keyed state backend.* *at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)* *at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)* *at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:678)* *at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:666)* *at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)* *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)* *at java.lang.Thread.run(Thread.java:748)* *Caused by: java.io.IOException: Could not load the native RocksDB library* *at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:560)* *at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:298)* *at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:756)* *at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)* *... 6 more* *Caused by: java.lang.UnsatisfiedLinkError: /tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so: /tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so: failed to map segment from shared object: Operation not permitted* *at java.lang.ClassLoader$NativeLibrary.load(Native Method)* *at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)* *at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)* *at java.lang.Runtime.load0(Runtime.java:809)* *at java.lang.System.load(System.java:1086)* *at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)* *at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)* *at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:537)* *... 9 more* Machine detail *NAME="Red Hat Enterprise Linux Server"* *VERSION="7.5 (Maipo)"* *ID="rhel"* *ID_LIKE="fedora"* Cpu architecture (cat /proc/cpuinfo) *processor: 0* *vendor_id: GenuineIntel* *cpu family: 6* *model: 45* *model name: Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz* *stepping: 2* *microcode: 0x710* *cpu MHz: 2200.000* *cache size: 20480 KB* *physical id: 0* *siblings: 4* *core id: 0* *cpu cores: 4* *apicid: 0* *initial apicid: 0* *fpu: yes* *fpu_exception: yes* *cpuid level: 13* *wp: yes* *flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss ht syscall nx rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 cx16 pcid sse4_1 sse4_2 x2apic popcnt aes xsave avx hypervisor lahf_lm epb dtherm ida arat pln pts* *bogomips: 4400.00* *clflush size: 64* *cache_alignment: 64* *address sizes: 40 bits physical, 48 bits virtual* I found similar errors related to different systems [1] and similar problems with rocksdb related to endianness [2], but mine sounds different. Since the upgrade to newer Flink version atm might be painful, are there any reason behind this exception, and is a workaround existing? Thank you so much, Andrea [1] - https://communities.ca.com/thread/241773398-em-failed-to-start-tmplibrocksdbjni8883977260053861907so-failed-to-map-segment-from-shared-object-operation-not-permitted [2] - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unable-to-use-Flink-RocksDB-state-backend-due-to-endianness-mismatch-td13473.html -- *Andrea Spina* Software Engineer @ Radicalbit Srl Via Borsieri 41, 20159, Milano - IT
Re: streaming predictions
Dear Cederic, I did something similar as yours a while ago along this work [1] but I've always been working within the batch context. I'm also the co-author of flink-jpmml and, since a flink2pmml model saver library doesn't exist currently, I'd suggest you a twofold strategy to tackle this problem: - if your model is relatively simple, take the batch evaluate method (it belongs to your SVM classifier) and attempt to translate it in a flatMap function (hopefully you can reuse some internal utilities, Flink exploits breeze vector library under the hoods [3]). - if your model is a complex one, you should export the model into PMML and employ then [2]. For a first overview, this [4] is the library you should adopt as to export your model and this [5] can help you with the related implementation. Hope it can help and good luck! Andrea [1] https://dl.acm.org/citation.cfm?id=3070612 [2] https://github.com/FlinkML/flink-jpmml [3] https://github.com/apache/flink/blob/7034e9cfcb051ef90c5bf0960bfb50a79b3723f0/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L73 [4] https://github.com/jpmml/jpmml-model [5] https://github.com/jpmml/jpmml-sparkml 2018-07-24 13:29 GMT+02:00 David Anderson : > One option (which I haven't tried myself) would be to somehow get the > model into PMML format, and then use https://github.com/ > FlinkML/flink-jpmml to score the model. You could either use another > machine learning framework to train the model (i.e., a framework that > directly supports PMML export), or convert the Flink model into PMML. Since > SVMs are fairly simple to describe, that might not be terribly difficult. > > On Mon, Jul 23, 2018 at 4:18 AM Xingcan Cui wrote: > >> Hi Cederic, >> >> If the model is a simple function, you can just load it and make >> predictions using the map/flatMap function in the StreamEnvironment. >> >> But I’m afraid the model trained by Flink-ML should be a “batch job", >> whose predict method takes a Dataset as the parameter and outputs another >> Dataset as the result. That means you cannot easily apply the model on >> streams, at least for now. >> >> There are two options to solve this. (1) Train the dataset using another >> framework to produce a simple function. (2) Adjust your model serving as a >> series of batch jobs. >> >> Hope that helps, >> Xingcan >> >> On Jul 22, 2018, at 8:56 PM, Hequn Cheng wrote: >> >> Hi Cederic, >> >> I am not familiar with SVM or machine learning but I think we can work it >> out together. >> What problem have you met when you try to implement this function? From >> my point of view, we can rebuild the model in the flatMap function and use >> it to predict the input data. There are some flatMap documents here[1]. >> >> Best, Hequn >> >> [1] https://ci.apache.org/projects/flink/flink-docs- >> master/dev/stream/operators/#datastream-transformations >> >> >> >> >> >> On Sun, Jul 22, 2018 at 4:12 PM, Cederic Bosmans >> wrote: >> >>> Dear >>> >>> My name is Cederic Bosmans and I am a masters student at the Ghent >>> University (Belgium). >>> I am currently working on my masters dissertation which involves Apache >>> Flink. >>> >>> I want to make predictions in the streaming environment based on a model >>> trained in the batch environment. >>> >>> I trained my SVM-model this way: >>> val svm2 = SVM() >>> svm2.setSeed(1) >>> svm2.fit(trainLV) >>> val testVD = testLV.map(lv => (lv.vector, lv.label)) >>> val evalSet = svm2.evaluate(testVD) >>> >>> and saved the model: >>> val modelSvm = svm2.weightsOption.get >>> >>> Then I have an incoming datastream in the streaming environment: >>> dataStream[(Int, Int, Int)] >>> which should be bininary classified using this trained SVM model. >>> >>> Since the predict function does only support DataSet and not DataStream, >>> on stackoverflow a flink contributor mentioned that this should be done >>> using a map/flatMap function. >>> Unfortunately I am not able to work this function out. >>> >>> It would be incredible for me if you could help me a little bit further! >>> >>> Kind regards and thanks in advance >>> Cederic Bosmans >>> >> >> >> > > -- > *David Anderson* | Training Coordinator | data Artisans > -- > Join Flink Forward - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- *Andrea Spina* Software Engineer @ Radicalbit Srl Via Borsieri 41, 20159, Milano - IT
Re: Model serving in Flink DataStream
Hi Adarsh, we developed flink-JPMML for streaming model serving based on top of the PMML format and of course Flink: we didn't release any official benchmark numbers yet. We didn't bump into any performance issue along the library employment. In terms of throughput and latency it doesn't require more effort than using Flink streaming APIs by itself. What it can happen is high memory usage if you're deploying thousands of (fatty) models at a time within the same pipeline, but this was a design choice (you can see explanation here https://www.youtube.com/watch?v=0rWvMZ6JSD8=17s). AFAIK the lib is already deployed in a couple of projects. Don't hesitate to write on Github issues if you have more questions. https://github.com/FlinkML/flink-jpmml Cheers, Andrea -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: FlinkML ALS is taking too long to run
Dear Ziyad, Yep, I had encountered same very long runtimes with ALS as well at the time and I recorded improvements by increasing the number of blocks / decreasing #TSs/TM like you've stated out. Cheers, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkML-ALS-is-taking-too-long-to-run-tp14154p14192.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: FlinkML ALS is taking too long to run
Dear Ziyad, could you kindly share some additional info about your environment (local/cluster, nodes, machines' configuration)? What does exactly you mean by "indefinitely"? How much time the job is hanging? Hope to help you, then. Cheers, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkML-ALS-is-taking-too-long-to-run-tp14154p14186.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala
I Gordon, sadly no news since the last message. At the end I jumped over the issue, I was not able to solve it. I'll try provide a runnable example asap. Thank you. Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558p13896.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala
Hi guys, thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0 versions. Following Gordon suggestion I tried to put setReference to false but sadly it didn't help. What I did then was to declare a custom serializer as the following: class BlockSerializer extends Serializer[Block] with Serializable { override def read(kryo: Kryo, input: Input, block: Class[Block]): Block = { val serializer = new SparseMatrixSerializer val blockData = kryo.readObject(input, classOf[SparseMatrix], serializer) new Block(blockData) } override def write(kryo: Kryo, output: Output, block: Block): Unit = { val serializer = new SparseMatrixSerializer kryo.register(classOf[SparseMatrix], serializer) kryo.writeObject(output, block.blockData, serializer) output.close() } } class SparseMatrixSerializer extends Serializer[SparseMatrix] with Serializable { override def read(kryo: Kryo, input: Input, sparse: Class[SparseMatrix]): SparseMatrix = { val collectionIntSerializer = new CollectionSerializer() collectionIntSerializer.setElementClass(classOf[Int], new IntSerializer) val collectionDoubleSerializer = new CollectionSerializer() collectionDoubleSerializer.setElementClass(classOf[Double], new DoubleSerializer) val numRows = input.readInt val numCols = input.readInt val colPtrs = kryo.readObject(input, classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray val rowIndices = kryo.readObject(input, classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray val data = kryo.readObject(input, classOf[java.util.ArrayList[Double]], collectionDoubleSerializer).asScala.toArray input.close() new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs = colPtrs, rowIndices = rowIndices, data = data) } override def write(kryo: Kryo, output: Output, sparseMatrix: SparseMatrix): Unit = { val collectionIntSerializer = new CollectionSerializer() collectionIntSerializer.setElementClass(classOf[Int], new IntSerializer) val collectionDoubleSerializer = new CollectionSerializer() collectionDoubleSerializer.setElementClass(classOf[Double], new DoubleSerializer) kryo.register(classOf[java.util.ArrayList[Int]], collectionIntSerializer) kryo.register(classOf[java.util.ArrayList[Double]], collectionDoubleSerializer) output.writeInt(sparseMatrix.numRows) output.writeInt(sparseMatrix.numCols) kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava, collectionIntSerializer) kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava, collectionIntSerializer) kryo.writeObject(output, sparseMatrix.data.toList.asJava, collectionDoubleSerializer) output.close() } } What I obtained is the same previous exception but on different accessed index and size. Caused by: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.set(ArrayList.java:444) at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:680) at
Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala
Good afternoon dear Community, Since few days I'm really struggling to understand the reason behind this KryoException. Here the stack trace. 2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat ion$.main(MatrixMultiplication.scala:46)) (1/1) java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B lockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: E rror obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOu tOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 11 more 2017-06-07 10:18:52,594 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)] 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: 57/58/-1 MB (used/committed/max)], [Compressed Class Space:
Async Functions and Scala async-client for mySql/MariaDB database connection
Dear Flink community, I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve enriching information from MariaDB database. In order to do that, I firstly employed classical jdbc library (org.mariadb.jdbc) and it worked has expected. Due to the blocking behavior of jdbc, I'm trying to use this library https://github.com/mauricio/postgresql-async/tree/master/mysql-async which promises to offer a subset of features in a non-blocking fashion. Sadly I'm not able to use it. Following the async function code. * object AsyncEnricher { case class OutputType(field1: FieldType, field2: FieldType) } class AsyncEnricher(configuration: MariaDBConfig) extends AsyncFunction[InputType, OutputType] with Serializable with AutoCloseable with LazyLogging { private val queryString = s"SELECT FROM [table] WHERE = ;" implicit lazy val executor = ExecutionContext.fromExecutor(Executors.directExecutor()) private lazy val mariaDBClient: Connection = { val config = createConfiguration(configuration) val connection = new MySQLConnection(config) Await.result(connection.connect, 5 seconds) } override def asyncInvoke(input: InputType, collector: AsyncCollector[OutputType]): Unit = { val queryResult = mariaDBClient.sendPreparedStatement(queryString, Seq(input.fieldToSearch)) queryResult.map(_.rows) onSuccess { case Some(resultSet) => Try { resultSet.head(0).asInstanceOf[FieldType] } match { case Success(value) => collector.collect(Iterable(OutputType(value, value))) case Failure(e) => logger.error(s"retrieving value from MariaDB raised $e: $queryString executed") } case _ => logger.error(s"value not found: $queryString executed") } queryResult onFailure { case e: Throwable => logger.error(s"retrieving location volume from MariaDB raised $e: $queryString executed") } } override def close(): Unit = { Try(mariaDBClient.disconnect).recover { case t: Throwable => logger.info(s"MariaDB cannot be closed - ${t.getMessage}") } } } * Follows the stack / TimerException{java.lang.IllegalStateException: Timer service is shut down} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Timer service is shut down at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218) ... 7 more java.lang.NullPointerException at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) / I think it's involving connection.connect returning object which is a Future and so the Await. This is different than jdbc driver, which worked like a charm. I tried to move away the await from the lazy val. Can't wait for your opinion. Thank you so much in advance. Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Functions-and-Scala-async-client-for-mySql-MariaDB-database-connection-tp12469.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Cogrouped Stream never triggers tumbling event time window
Dear community, I finally solved the issue i was bumped into. Basically the reason of the encountered problem was the behavior of my input: incoming rates were so far different in behavior (really late and scarce presence of second type event in event time). The solution I employed was to assign timestamps and watermarks to the source stream just before splitting it into my first type and second type handled streams. I suppose this solved my problem due to EventTimeTrigger .getCurrentWatermark() method, which I think it returns the minimum watermark between the streams scoped by the TriggerContext. So the window was hanging because of the incoming rate behavior of the second type stream. Hope it could help someone in the future. Cheers, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12468.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Cogrouped Stream never triggers tumbling event time window
Sorry, I forgot to put the Flink version. 1.1.2 Thanks, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12374.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Cogrouped Stream never triggers tumbling event time window
Dear Community, I'm really struggling on a co-grouped stream. The workload is the following: * val firstStream: DataStream[FirstType] = firstRaw.assignTimestampsAndWatermarks(new MyCustomFirstExtractor(maxOutOfOrder)) val secondStream: DataStream[SecondType] = secondRaw .assignTimestampsAndWatermarks(new MyCustomSecondExtractor(maxOutOfOrder)) .map(new toSecondsStreamMapper()) * where both the Extractors extend BoundedOutOfOrdernessTimestampExtractor by overriding the extractTimestamp method and assigning timestamps owned respectively by FirstType and SecondType objects. *override def extractTimestamp(first: FirstType): Long = first.timestamp* Then I'm calling cogroup as follows * val stockDetails = firstStream .coGroup(secondStream) .where(_.id) .equalTo(_.id) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .apply(new MyCogroupFunction()) .uid("myCogroup") .name("My CoGroup") * The problem is the CoGroup function is never triggered. I did several tests and I was not able to solve it at all. The first relevant point is that event time can be seriously out-of-order. I can even bump into 0 timestamp. Then I faked also timestamps in order to distribute them in a set of two seconds, five seconds, so forth. These tries didn't change at all the behavior: no one window is raised. Another relevant is: I'm running locally by reading from a pre-loaded kafka topic, then all the events are ridden sequentially at startup. I will give a couple example Workload 1 (faked timestamps) fields (id, timestamp) FirstType(9781783433803 ,1490280129517) FirstType(9781783433803 ,1490280129517) FirstType(9781783433803 ,1490280131191) FirstType(9781783433803 ,1490280131191) FirstType(9781783433803 ,1490280131214) FirstType(9781783433803 ,1490280131214) FirstType(9781783433803 ,1490280131250) FirstType(9781783433803 ,1490280131250) FirstType(9781783433803 ,1490280131294) FirstType(9781783433803 ,1490280131294) FirstType(9781783433803 ,1490280131328) FirstType(9781783433803 ,1490280131328) SecondType(9781783433803,1490280130465) SecondType(9781783433803,1490280131027) SecondType(9781783433803,1490280131051) SecondType(9781783433803,1490280131070) SecondType(9781783433803,1490280131085) SecondType(9781783433803,1490280131103) SecondType(9781783433803,1490280131124) SecondType(9781783433803,1490280131143) SecondType(9781783433803,1490280131158) SecondType(9781783433803,1490280131175) Workload 2 (real case timestamps) > FirstType(9781783433803, 1490172958602) 1> FirstType(9781783433803, ,1490172958611) 1> FirstType(9781783433803, 1490172958611) 1> FirstType(9781783433803, 1490172958620) 1> FirstType(9781783433803, 1490172958620) 1> FirstType(9781783433803 ,1490196171869) 1> FirstType(9781783433803, 1490196171869) SecondType(9781783433803 ,0) SecondType(9781783433803, 0) SecondType(9781783433803, 1488834670490) SecondType(9781783433803, 1489577984143) SecondType(9781783433803, 0) SecondType(9781783433803, 0) SecondType(9781783433803, 0) SecondType(9781783433803, 1488834670490) SecondType(9781783433803, 1489577984143) SecondType(9781783433803, 1489689399726) SecondType(9781783433803, 1489689399726) I confirm that I have healthy incoming streams at the entrance of the coGroup operator. I think I'm likely missing something easy. Any help will be really appreciated. Sincerly, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
Hi Stefan, Thank you so much for the answer. Ok, I'll do it asap. For the sake of argument, could the issue be related to the low number of blocks? I noticed the Flink implementation, as default, set the number of blocks to the input count (which is actually a lot). So with a low cardinality and big sized blocks, maybe they don't fit somewhere... Thank you again. Andrea 2016-09-02 10:51 GMT+02:00 Stefan Richter <s.rich...@data-artisans.com>: > Hi, > > unfortunately, the log does not contain the required information for this > case. It seems like a sender to the SortMerger failed. The best way to find > this problem is to take a look to the exceptions that are reported in the > web front-end for the failing job. Could you check if you find any reported > exceptions there and provide them to us? > > Best, > Stefan > > Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <74...@studenti.unimore.it>: > > Sure. Here <https://drive.google.com/open?id=0B6TTuPO7UoeFRXY3RW1KQnNrd3c> > you can find the complete logs file. > Still can not run through the issue. Thank you for your help. > > 2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> I don't know whether my usual error is related to this one but is very >> similar and it happens randomly...I still have to figure out the root cause >> of the error: >> >> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce >> (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map >> at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted >> input: Thread 'SortMerger spilling thread' terminated due to an exception: >> -2 >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >> k.java:345) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger spilling thread' terminated due to an exception: -2 >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> .getIterator(UnilateralSortMerger.java:619) >> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >> ask.java:1079) >> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >> (GroupReduceDriver.java:94) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >> terminated due to an exception: -2 >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> $ThreadBase.run(UnilateralSortMerger.java:800) >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >> at java.util.ArrayList.elementData(ArrayList.java:418) >> at java.util.ArrayList.get(ArrayList.java:431) >> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadO >> bject(MapReferenceResolver.java:42) >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >> MapSerializer.java:135) >> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >> MapSerializer.java:21) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali >> zer.deserialize(KryoSerializer.java:219) >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali >> zer.deserialize(KryoSerializer.java:245) >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali >> zer.copy(KryoSerializer.java:255) >> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer. >> copy(PojoSerializer.java:556) >> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB >> ase.copy(TupleSerializerBase.java:75) >> at org.apache.flink.runtime.operators.sort.NormalizedKeySorter. >> writeToOutput(NormalizedKeySorter.java:499) >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> $SpillingThread.go(UnilateralSortMerger.java:1344) >> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >> $ThreadBase.run(UnilateralSortMerger.java:796) >> >> >> On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter < >> s.rich...@data-artisans.com> wrote: >> >>> Hi, >>> >>> could you provide the log outputs for your job (ideally with debug >>> logging enabled)? >>> >>> Best, >>> Stefan >>> &
Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
Sure. Here <https://drive.google.com/open?id=0B6TTuPO7UoeFRXY3RW1KQnNrd3c> you can find the complete logs file. Still can not run through the issue. Thank you for your help. 2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > I don't know whether my usual error is related to this one but is very > similar and it happens randomly...I still have to figure out the root cause > of the error: > > java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map > at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted > input: Thread 'SortMerger spilling thread' terminated due to an exception: > -2 > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger spilling thread' terminated due to an exception: -2 > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger. > getIterator(UnilateralSortMerger.java:619) > at org.apache.flink.runtime.operators.BatchTask.getInput( > BatchTask.java:1079) > at org.apache.flink.runtime.operators.GroupReduceDriver. > prepare(GroupReduceDriver.java:94) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger spilling thread' > terminated due to an exception: -2 > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:800) > Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 > at java.util.ArrayList.elementData(ArrayList.java:418) > at java.util.ArrayList.get(ArrayList.java:431) > at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject( > MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at com.esotericsoftware.kryo.serializers.MapSerializer. > read(MapSerializer.java:135) > at com.esotericsoftware.kryo.serializers.MapSerializer. > read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:219) > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:245) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy( > KryoSerializer.java:255) > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy( > PojoSerializer.java:556) > at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy( > TupleSerializerBase.java:75) > at org.apache.flink.runtime.operators.sort.NormalizedKeySorter. > writeToOutput(NormalizedKeySorter.java:499) > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > SpillingThread.go(UnilateralSortMerger.java:1344) > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:796) > > > On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter < > s.rich...@data-artisans.com> wrote: > >> Hi, >> >> could you provide the log outputs for your job (ideally with debug >> logging enabled)? >> >> Best, >> Stefan >> >> Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <74...@studenti.unimore.it>: >> >> Hi everyone. >> I'm running the FlinkML ALS matrix factorization and I bumped into the >> following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: Job execution failed. >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315) >> at org.apache.flink.client.program.ContextEnvironment.execute(C >> ontextEnvironment.java:60) >> at org.apache.flink.api.scala.ExecutionEnvironment.execute(Exec >> utionEnvironment.scala:652) >> at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTool >> s.scala:94) >> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507) >> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433) >> at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55) >> at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122) >> at dima.tu.berlin.bench
FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657) at java.lang.Thread.run(Thread.java:745) I'm running with* flink-1.0.3*. I really can't figure out the reason behind that. My code simply calls the library as follows: val als = ALS() .setIterations(numIterations) .setNumFactors(rank) .setBlocks(degreeOfParallelism) .setSeed(42) .setTemporaryPath(tempPath) als.fit(ratings, parameters) val (users, items) = als.factorsOption match { case Some(_) => als.factorsOption.get case _ => throw new RuntimeException } users.writeAsText(outputPath, WriteMode.OVERWRITE) items.writeAsText(outputPath, WriteMode.OVERWRITE) env.execute("ALS matrix factorization") where - ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average. - numIterations 10 - rank 50 - degreeOfParallelism 240 *The error seems to be related to the final .persists() call.*at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507) I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties: jobmanager.heap.mb = 2048 taskmanager.memory.fraction = 0.5 taskmanager.heap.mb = 28672 taskmanager.network.bufferSizeInBytes = 32768 taskmanager.network.numberOfBuffers = 98304 akka.ask.timeout = 300s Any help will be appreciated. Thank you. -- *Andrea Spina* N.Tessera: *74598* MAT: *89369* *Ingegneria Informatica* *[LM] *(D.M. 270)
Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Hi everybody, increasing the akka.ask.timeout solved the second issue. Anyway that was a warning about a congestioned network. So I worked to improve the algorithm. Increasing the numberOfBuffers and the corresponding size solved the first issue, thus now I can run with the full DOP. In my case enabling the off-heap memory didn't the trick. Thank you. All the bests, Andrea 2016-06-29 17:10 GMT+02:00 Martin Scholl <m...@funkpopes.org>: > Other than increasing the ask.timeout, we've seen such failures being > caused by long GC pauses over bigger heaps. In such a case, you could > fiddle with a) enabling object reuse, or b) enabling off-heap memory (i.e. > taskmanager.memory.off-heap == true) to mitigate GC-induced issues a bit. > > Hope it helps, > Martin > > > On Wed, Jun 29, 2016 at 3:29 PM Ufuk Celebi <u...@apache.org> wrote: > >> OK, looks like you can easily give more memory to the network stack, >> e.g. for 2 GB set >> >> taskmanager.network.numberOfBuffers = 65536 >> taskmanager.network.bufferSizeInBytes = 32768 >> >> For the other exception, your logs confirm that there is something >> else going on. Try increasing the akka ask timeout: >> >> akka.ask.timeout: 100 s >> >> Does this help? >> >> >> On Wed, Jun 29, 2016 at 3:10 PM, ANDREA SPINA <74...@studenti.unimore.it> >> wrote: >> > Hi Ufuk, >> > >> > so the memory available per node is 48294 megabytes per node, but I >> reserve >> > 28 by flink conf file. >> > taskmanager.heap.mb = 28672 >> > taskmanager.memory.fraction = 0.7 >> > taskmanager.network.numberOfBuffers = 32768 >> > taskmanager.network.bufferSizeInBytes = 16384 >> > >> > Anyway Follows what I found in log files. >> > >> > Follows the taskmanager log (task manager that seems failed) >> > >> > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task >> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli >> > >> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) >> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver >> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) >> (1/1) >> > switched to FAILED with exception. >> > java.lang.IllegalStateException: Received unexpected partition state >> null >> > for partition request. This is a bug. >> > at >> > >> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) >> > at >> > org.apache.flink.runtime.taskmanager.TaskManager.org >> $apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala: >> > 468) >> > at >> > >> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265) >> > at >> > >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> > at >> > >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> > at >> > >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> > at >> > >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) >> > at >> > >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> > at >> > >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> > at >> > >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> > at >> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >> > at >> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >> > at >> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> > at >> > >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> > at >> > >> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119) >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> > at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> > at akka.dispatch.Ma
Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
l(Future.scala:244) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]] after [1 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) Really appreciating your help here. :) Cheers, Andrea 2016-06-29 13:48 GMT+02:00 Ufuk Celebi <u...@apache.org>: > Hey Andrea! Sorry for the bad user experience. > > Regarding the network buffers: you should be able to run it after > increasing the number of network buffers, just account for it when > specifying the heap size etc. You currently allocate 32768 * 16384 > bytes = 512 MB for them. If you have a very long pipeline and high > parallelism, you should increase it accordingly. How much memory do > you have on your machines? > > Regarding the IllegalStateException: I suspect that this is **not** > the root failure cause. The null ExecutionState can only happen, if > the producer task (from which data is requested) failed during the > request. The error message is confusing and I opened a JIRA to fix it: > https://issues.apache.org/jira/browse/FLINK-4131. Can you please check > your complete logs to see what the root cause might be, e.g. why did > the producer fail? > > > On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA > <74...@studenti.unimore.it> wrote: > > Hi everyone, > > > > I am running some Flink experiments with Peel benchmark > > http://peel-framework.org/ and I am struggling with exceptions: the > > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is > ~80GiB > > and is located on Hdfs 2.7.1. Flink version is 1.0.3. > > > > At the beginning I tried with 400 as degree of parallelism but not enough > > numberOfBuffers was raised so I changed the parallelism to 200. Flink > > configuration follows: > > > > jobmanager.rpc.address = ${runtime.hostname} > > akka.log.lifecycle.events = ON > > akka.ask.timeout = 300s > > jobmanager.rpc.port = 6002 > > jobmanager.heap.mb = 1024 > > jobmanager.web.port = 6004 > > taskmanager.heap.mb = 28672 > > taskmanager.memory.fraction = 0.7 > > taskmanager.network.numberOfBuffers = 32768 > > taskmanager.network.bufferSizeInBytes = 16384 > > taskmanager.tmp.dirs = > > > "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp" > > taskmanager.debug.memory.startLogThread = true > > > > With a parallelism of 200 the following exception will raise from a node > of > > the cluster: > > > > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task > > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli > > > nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) > > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver > > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) > (1/1) > > switched to FAILED with exception. > > java.lang.IllegalStateException: Received unexpected partition state null > > for partition request. This is a bug. > > at > > > org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) > > > > > > The reduce code is: > > > > 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) > > > > The map code is: > > > > 68 def createI
Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Hi everyone, I am running some Flink experiments with Peel benchmark http://peel-framework.org/ and I am struggling with exceptions: the environment is a 25-nodes cluster, 16 cores per nodes. The dataset is ~80GiB and is located on Hdfs 2.7.1. Flink version is 1.0.3. At the beginning I tried with 400 as degree of parallelism but not enough numberOfBuffers was raised so I changed the parallelism to 200. Flink configuration follows: jobmanager.rpc.address = ${runtime.hostname} akka.log.lifecycle.events = ON akka.ask.timeout = 300s jobmanager.rpc.port = 6002 jobmanager.heap.mb = 1024 jobmanager.web.port = 6004 taskmanager.heap.mb = 28672 taskmanager.memory.fraction = 0.7 taskmanager.network.numberOfBuffers = 32768 taskmanager.network.bufferSizeInBytes = 16384 taskmanager.tmp.dirs = "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp" taskmanager.debug.memory.startLogThread = true With a parallelism of 200 the following exception will raise from a node of the cluster: 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1) switched to FAILED with exception. java.lang.IllegalStateException: Received unexpected partition state null for partition request. This is a bug. at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) The reduce code is: 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) The map code is: 68 def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = { 69dimensionDS.map { 70 dimension => 71 val values = DenseVector(Array.fill(dimension)(0.0)) 72 values 73} 74 } I can't figure out a solution, thank you for your help. Andrea -- *Andrea Spina* N.Tessera: *74598* MAT: *89369* *Ingegneria Informatica* *[LM] *(D.M. 270)
Re: Flink java.io.FileNotFoundException Exception with Peel Framework
Hi, the problem was solved after I figured out there was an istance of Flink TaskManager running on a node of the cluster. Thank you, Andrea 2016-06-28 12:17 GMT+02:00 ANDREA SPINA <74...@studenti.unimore.it>: > Hi Max, > thank you for the fast reply and sorry: I use flink-1.0.3. > Yes I tested on dummy dataset with numOfBuffers = 16384 and decreasing the > parallelism degree and this solution solved the first exception. Anyway on > the 80GiB dataset I struggle with the second exception. > > Regards, > Andrea > > 2016-06-28 12:08 GMT+02:00 Maximilian Michels <m...@apache.org>: > >> Hi Andrea, >> >> The number of network buffers should be sufficient. Actually, assuming >> you have 16 task slots on each of the 25 nodes, it should be enough to >> have 16^2 * 25 * 4 = 14400 network buffers. >> >> See >> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#background >> >> So we have to investigate a little more. Which version of Flink are you >> using? >> >> Cheers, >> Max >> >> On Tue, Jun 28, 2016 at 11:57 AM, ANDREA SPINA >> <74...@studenti.unimore.it> wrote: >> > Hi everyone, >> > >> > I am running some Flink experiments with Peel benchmark >> > http://peel-framework.org/ and I am struggling with exceptions: the >> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is >> ~80GiB >> > and is located on Hdfs 2.7.1. >> > >> > At the beginning I tried with 400 as degree of parallelism and with the >> > following configuration: >> > >> > jobmanager.rpc.address = ${runtime.hostname} >> > akka.log.lifecycle.events = ON >> > akka.ask.timeout = 300s >> > jobmanager.rpc.port = 6002 >> > >> > jobmanager.heap.mb = 1024 >> > jobmanager.web.port = 6004 >> > >> > taskmanager.heap.mb = 28672 >> > taskmanager.memory.fraction = 0.7 >> > taskmanager.network.numberOfBuffers = 32768 >> > taskmanager.network.bufferSizeInBytes = 16384 >> > taskmanager.tmp.dirs = >> > >> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp" >> > taskmanager.debug.memory.startLogThread = true >> > >> > the following exception will raise >> > >> > Caused by: java.io.IOException: Insufficient number of network buffers: >> > required 350, but only 317 available. The total number of network >> buffers is >> > currently set to 32768. You can increase this number by setting the >> > configuration key 'taskmanager.network.numberOfBuffers'. >> > at >> > >> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) >> > at >> > >> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:469) >> > at java.lang.Thread.run(Thread.java:745) >> > >> > So I tried different solutions, both with increasing numberOfBuffers >> (Max >> > value tried 98304) or decreasing the degreeOfParallelism (Min value >> tried >> > 300) and testing those configs with a dummy dataset seems to solve the >> > number of buffers issue. >> > But In each case with the 80GiB dataset now I struggle with a new >> exception; >> > the following with a degree of parallelism = 300 and numberOfBuffers = >> 32768 >> > >> > org.apache.flink.client.program.ProgramInvocationException: The program >> > execution failed: Job execution failed. >> > at org.apache.flink.client.program.Client.runBlocking(Client.java:381) >> > at org.apache.flink.client.program.Client.runBlocking(Client.java:355) >> > at org.apache.flink.client.program.Client.runBlocking(Client.java:315) >> > at >> > >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >> > at >> > >> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652) >> > at >> > >> dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon$.main(FlinkSLRTrainCommon.scala:110) >> > at >> > >> dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon.main(FlinkSLRTrainCommon.scala) >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > at >> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Re: Flink java.io.FileNotFoundException Exception with Peel Framework
Hi Max, thank you for the fast reply and sorry: I use flink-1.0.3. Yes I tested on dummy dataset with numOfBuffers = 16384 and decreasing the parallelism degree and this solution solved the first exception. Anyway on the 80GiB dataset I struggle with the second exception. Regards, Andrea 2016-06-28 12:08 GMT+02:00 Maximilian Michels <m...@apache.org>: > Hi Andrea, > > The number of network buffers should be sufficient. Actually, assuming > you have 16 task slots on each of the 25 nodes, it should be enough to > have 16^2 * 25 * 4 = 14400 network buffers. > > See > https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#background > > So we have to investigate a little more. Which version of Flink are you > using? > > Cheers, > Max > > On Tue, Jun 28, 2016 at 11:57 AM, ANDREA SPINA > <74...@studenti.unimore.it> wrote: > > Hi everyone, > > > > I am running some Flink experiments with Peel benchmark > > http://peel-framework.org/ and I am struggling with exceptions: the > > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is > ~80GiB > > and is located on Hdfs 2.7.1. > > > > At the beginning I tried with 400 as degree of parallelism and with the > > following configuration: > > > > jobmanager.rpc.address = ${runtime.hostname} > > akka.log.lifecycle.events = ON > > akka.ask.timeout = 300s > > jobmanager.rpc.port = 6002 > > > > jobmanager.heap.mb = 1024 > > jobmanager.web.port = 6004 > > > > taskmanager.heap.mb = 28672 > > taskmanager.memory.fraction = 0.7 > > taskmanager.network.numberOfBuffers = 32768 > > taskmanager.network.bufferSizeInBytes = 16384 > > taskmanager.tmp.dirs = > > > "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp" > > taskmanager.debug.memory.startLogThread = true > > > > the following exception will raise > > > > Caused by: java.io.IOException: Insufficient number of network buffers: > > required 350, but only 317 available. The total number of network > buffers is > > currently set to 32768. You can increase this number by setting the > > configuration key 'taskmanager.network.numberOfBuffers'. > > at > > > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) > > at > > > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:469) > > at java.lang.Thread.run(Thread.java:745) > > > > So I tried different solutions, both with increasing numberOfBuffers (Max > > value tried 98304) or decreasing the degreeOfParallelism (Min value tried > > 300) and testing those configs with a dummy dataset seems to solve the > > number of buffers issue. > > But In each case with the 80GiB dataset now I struggle with a new > exception; > > the following with a degree of parallelism = 300 and numberOfBuffers = > 32768 > > > > org.apache.flink.client.program.ProgramInvocationException: The program > > execution failed: Job execution failed. > > at org.apache.flink.client.program.Client.runBlocking(Client.java:381) > > at org.apache.flink.client.program.Client.runBlocking(Client.java:355) > > at org.apache.flink.client.program.Client.runBlocking(Client.java:315) > > at > > > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) > > at > > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652) > > at > > > dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon$.main(FlinkSLRTrainCommon.scala:110) > > at > > > dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon.main(FlinkSLRTrainCommon.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > > at > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > > at > > > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > > at org.apache.flink.client.CliFrontend
Flink java.io.FileNotFoundException Exception with Peel Framework
or.java:69) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Channel to path '/data/3/peel/flink/tmp/flink-io-3a97d62c-ada4-44f1-ab72-dd018386f9aa/79305169d69f7e8b361a175af87353fa.channel' could not be opened. at org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:61) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.(AsynchronousFileIOChannel.java:86) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.(AsynchronousBufferFileWriter.java:31) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBufferFileWriter(IOManagerAsync.java:257) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:151) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:366) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:159) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) ... 6 more Caused by: java.io.FileNotFoundException: /data/3/peel/flink/tmp/flink-io-3a97d62c-ada4-44f1-ab72-dd018386f9aa/79305169d69f7e8b361a175af87353fa.channel (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.(RandomAccessFile.java:243) at java.io.RandomAccessFile.(RandomAccessFile.java:124) at org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:57) ... 15 more here <https://dl.dropboxusercontent.com/u/78598929/flink-hadoop-jobmanager-0-cloud-11.log> the related jobmanager full log. I can't figure out a solution. Thank you and have a nice day. -- *Andrea Spina* Guest student at DIMA, TU Berlin N.Tessera: *74598* MAT: *89369* *Ingegneria Informatica* *[LM] *(D.M. 270)