Re: Timers and Checkpoints

2019-08-01 Thread Andrea Spina
iggerCheckpointOnBarrier(StreamTask.java:543)
>>  ... 8 more
>> Caused by: java.lang.Exception: Could not write timer service of Aggregator 
>> -> Sink: HBase (1/1) to checkpoint state stream.
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:438)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:98)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:385)
>>  ... 13 more
>> Caused by: java.lang.NullPointerException
>>  at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(HeapInternalTimerService.java:304)
>>  at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:121)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
>>  ... 15 more
>>
>>
>>
>> On Fri, May 25, 2018 at 3:11 PM Timo Walther  wrote:
>>
>>> Hi Alberto,
>>>
>>> do you get exactly the same exception? Maybe you can share some logs
>>> with us?
>>>
>>> Regards,
>>> Timo
>>>
>>> Am 25.05.18 um 13:41 schrieb Alberto Mancini:
>>> > Hello,
>>> > I think we are experiencing this issue:
>>> > https://issues.apache.org/jira/browse/FLINK-6291
>>> >
>>> > In fact we have a long running job that is unable to complete a
>>> > checkpoint and so we are unable to create a savepoint.
>>> >
>>> > I do not really understand from 6291 how the timer service has been
>>> > removed in my job and mostly i do not find how i can let my job to
>>> > create a savepoint.
>>> > We are using flink 1.3.2.
>>> >
>>> > Thanks,
>>> >Alberto.
>>> >
>>>
>>>

-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Providing Custom Serializer for Generic Type

2019-07-05 Thread Andrea Spina
Hi Gordon, thank you.
The involved data structure is a complex abstraction owning a schema and
values, it declares private fields which should not be edited directly from
users. I'd say it's really akin to an Avro GenericRecord. How would you
approach the problem if you have to serialize/deserialize efficiently an
Avro GenericRecord? I think it cannot be a POJO and ser/de using avro
brings so much overhead described also at [1].

Thank you really much for your help.

Andrea

[1] -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html

Il giorno gio 4 lug 2019 alle ore 11:23 Tzu-Li (Gordon) Tai <
tzuli...@apache.org> ha scritto:

> Hi Andrea,
>
> Is there a specific reason you want to use a custom TypeInformation /
> TypeSerializer for your type?
> From the description in the original post, this part wasn't clear to me.
>
> If the only reason is because it is generally suggested to avoid generic
> type serialization via Kryo, both for performance reasons as well as
> evolvability in the future, then updating your type to be recognized by
> Flink as one of the supported types [1] would be enough.
> Otherwise, implementing your own type information and serializer is
> usually only something users with very specific use cases might be required
> to do.
> Since you are also using that type as managed state, for a safer schema
> evolvability story in the future, I would recommend either Avro or Pojo as
> Jingsong Lee had already mentioned.
>
> Cheers,
> Gordon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#flinks-typeinformation-class
>
> On Thu, Jul 4, 2019 at 5:08 PM Andrea Spina 
> wrote:
>
>> Hi JingsongLee, thank you for your answer.
>> I wanted to explore it as the last chance honestly. Anyway if defining
>> custom serializers and types information involves quite a big effort, I
>> would reconsider my guess.
>>
>> Cheers,
>>
>> Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee <
>> lzljs3620...@aliyun.com> ha scritto:
>>
>>> Hi Andrea:
>>> Why not make your *MyClass* POJO? [1] If it is a POJO, then flink
>>> will use PojoTypeInfo and PojoSerializer that have a good
>>> implementation already.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#rules-for-pojo-types
>>>
>>> Best, JingsongLee
>>>
>>> --
>>> From:Andrea Spina 
>>> Send Time:2019年7月4日(星期四) 14:37
>>> To:user 
>>> Subject:Providing Custom Serializer for Generic Type
>>>
>>> Dear community,
>>> in my job, I run with a custom event type *MyClass* which is a sort of
>>> "generic event" that I handle all along my streaming flow both as an event
>>> (DataStream[MyClass]) and as a managed state.
>>>
>>> I see that Flink warns me about generic serialization of
>>> *MyClass*
>>>  INFO [run-main-0] (TypeExtractor.java:1818) - class
>>> io.radicalbit.MyClass does not contain a setter for field
>>> io$radicalbit$MyClass$$schema
>>>  INFO [run-main-0] (TypeExtractor.java:1857) - Class class
>>> io.radicalbit.MyClass cannot be used as a POJO type because not all fields
>>> are valid POJO fields, and must be processed as GenericType. Please read
>>> the Flink documentation on "Data Types & Serialization" for details of the
>>> effect on performance.
>>>  INFO [run-main-0] (TypeExtractor.java:1818) - class
>>> io.radicalbit.MyClass does not contain a setter for field
>>> io$radicalbit$MyClass$schema
>>>
>>> So that I wanted to provide my custom serializer for MyClass, trying
>>> first to register the Java one to check if the system recognizes it so I
>>> followed [1] but it seems that it is not considered.
>>>
>>> I read then about [2] (the case is way akin to mine) and AFAIU I need to
>>> implement a custom TypeInformation and TypeSerializer for my class as
>>> suggested in [3] because Flink will ignore my registered serializer as long
>>> as it considers my type as *generic*.
>>>
>>> config.registerTypeWithKryoSerializer(classOf[MyClass], 
>>> classOf[RadicalSerde])
>>>
>>>
>>> My question finally is: Do I need to provide this custom classes? Is
>>> there any practical example for creating custom information like the above
>>> mentioned? I have had a quick preliminary look at it but seems that I need
&g

Re: Providing Custom Serializer for Generic Type

2019-07-04 Thread Andrea Spina
Hi JingsongLee, thank you for your answer.
I wanted to explore it as the last chance honestly. Anyway if defining
custom serializers and types information involves quite a big effort, I
would reconsider my guess.

Cheers,

Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee 
ha scritto:

> Hi Andrea:
> Why not make your *MyClass* POJO? [1] If it is a POJO, then flink
> will use PojoTypeInfo and PojoSerializer that have a good
> implementation already.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#rules-for-pojo-types
>
> Best, JingsongLee
>
> --
> From:Andrea Spina 
> Send Time:2019年7月4日(星期四) 14:37
> To:user 
> Subject:Providing Custom Serializer for Generic Type
>
> Dear community,
> in my job, I run with a custom event type *MyClass* which is a sort of
> "generic event" that I handle all along my streaming flow both as an event
> (DataStream[MyClass]) and as a managed state.
>
> I see that Flink warns me about generic serialization of
> *MyClass*
>  INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass
> does not contain a setter for field io$radicalbit$MyClass$$schema
>  INFO [run-main-0] (TypeExtractor.java:1857) - Class class
> io.radicalbit.MyClass cannot be used as a POJO type because not all fields
> are valid POJO fields, and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance.
>  INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass
> does not contain a setter for field io$radicalbit$MyClass$schema
>
> So that I wanted to provide my custom serializer for MyClass, trying first
> to register the Java one to check if the system recognizes it so I followed
> [1] but it seems that it is not considered.
>
> I read then about [2] (the case is way akin to mine) and AFAIU I need to
> implement a custom TypeInformation and TypeSerializer for my class as
> suggested in [3] because Flink will ignore my registered serializer as long
> as it considers my type as *generic*.
>
> config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde])
>
>
> My question finally is: Do I need to provide this custom classes? Is there
> any practical example for creating custom information like the above
> mentioned? I have had a quick preliminary look at it but seems that I need
> to provide a non-trivial amount of information to TypeInformation and
> TypeSerializer interfaces.
>
> Thank you for your excellent work and help.
>
> Cheers.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
> [2] -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
> [3] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
> --
> Andrea Spina
> Head of R @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>
>
>

-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Providing Custom Serializer for Generic Type

2019-07-04 Thread Andrea Spina
Dear community,
in my job, I run with a custom event type *MyClass* which is a sort of
"generic event" that I handle all along my streaming flow both as an event
(DataStream[MyClass]) and as a managed state.

I see that Flink warns me about generic serialization of
*MyClass*
 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass
does not contain a setter for field io$radicalbit$MyClass$$schema
 INFO [run-main-0] (TypeExtractor.java:1857) - Class class
io.radicalbit.MyClass cannot be used as a POJO type because not all fields
are valid POJO fields, and must be processed as GenericType. Please read
the Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass
does not contain a setter for field io$radicalbit$MyClass$schema

So that I wanted to provide my custom serializer for MyClass, trying first
to register the Java one to check if the system recognizes it so I followed
[1] but it seems that it is not considered.

I read then about [2] (the case is way akin to mine) and AFAIU I need to
implement a custom TypeInformation and TypeSerializer for my class as
suggested in [3] because Flink will ignore my registered serializer as long
as it considers my type as *generic*.

config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde])


My question finally is: Do I need to provide this custom classes? Is there
any practical example for creating custom information like the above
mentioned? I have had a quick preliminary look at it but seems that I need
to provide a non-trivial amount of information to TypeInformation and
TypeSerializer interfaces.

Thank you for your excellent work and help.

Cheers.

[1] -
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
[2] -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
[3] -
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: HDFS checkpoints for rocksDB state backend:

2019-06-27 Thread Andrea Spina
HI Qiu,
my jar does not contain the class
`org.apache.hadoop.hdfs.protocol.HdfsConstants*`, *but I do expect it is
contained within `flink-shaded-hadoop2-uber-1.6.4.jar` which is located in
Flink cluster libs.

Il giorno gio 27 giu 2019 alle ore 04:03 Congxian Qiu <
qcx978132...@gmail.com> ha scritto:

> Hi  Andrea
>
> As the NoClassDefFoundError, could you please verify that there exist
> `org.apache.hadoop.hdfs.protocol.HdfsConstants*` *in your jar.
> Or could you use Arthas[1] to check if there exists the class when running
> the job?
>
> [1] https://github.com/alibaba/arthas
> Best,
> Congxian
>
>
> Andrea Spina  于2019年6月27日周四 上午1:57写道:
>
>> Dear community,
>> I'm trying to use HDFS checkpoints in flink-1.6.4 with the following
>> configuration
>>
>> state.backend: rocksdb
>> state.checkpoints.dir: hdfs://
>> rbl1.stage.certilogo.radicalbit.io:8020/flink/checkpoint
>> state.savepoints.dir: hdfs://
>> rbl1.stage.certilogo.radicalbit.io:8020/flink/savepoints
>>
>> and I record the following exceptions
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Caused by: java.io.IOException: Could not flush and close the file
>> system output stream to
>> hdfs://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a
>> <http://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a>
>> in order to obtain the stream state handleat
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>>   at
>> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
>>   at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826)
>>   at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
>>   at
>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>> ... 7 moreCaused by: java.io.IOException: DataStreamer Exception:
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused
>> by: java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.hadoop.hdfs.protocol.HdfsConstantsat
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318)
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262)
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)*
>>
>> or
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *  at java.util.concurrent.FutureTask.run(FutureTask.java:266)at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>> ... 7 moreCaused by: java.io.IOException: DataStreamer Exception:
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused
>> by: javax.xml.parsers.FactoryConfigurationError: Provider for class
>> javax.xml.parsers.DocumentBuilderFactory cannot be createdat
>> javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
>>   at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
>>   at
>> javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
>>   at
>> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2515)
>>   at
>> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
>>   at
>> org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
>>   at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
>>   at
>> org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
>> at
>> org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
>> at
>> org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
>> 

HDFS checkpoints for rocksDB state backend:

2019-06-26 Thread Andrea Spina
Dear community,
I'm trying to use HDFS checkpoints in flink-1.6.4 with the following
configuration

state.backend: rocksdb
state.checkpoints.dir: hdfs://
rbl1.stage.certilogo.radicalbit.io:8020/flink/checkpoint
state.savepoints.dir: hdfs://
rbl1.stage.certilogo.radicalbit.io:8020/flink/savepoints

and I record the following exceptions















*Caused by: java.io.IOException: Could not flush and close the file system
output stream to
hdfs://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a
<http://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a>
in order to obtain the stream state handleat
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
  at
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
  at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826)
  at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
  at
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 moreCaused by: java.io.IOException: DataStreamer Exception:
  at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused
by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.hadoop.hdfs.protocol.HdfsConstantsat
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318)
  at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262)
  at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)*

or



















*  at java.util.concurrent.FutureTask.run(FutureTask.java:266)at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 moreCaused by: java.io.IOException: DataStreamer Exception:
  at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused
by: javax.xml.parsers.FactoryConfigurationError: Provider for class
javax.xml.parsers.DocumentBuilderFactory cannot be createdat
javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
  at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
  at
javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
  at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2515)
  at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
  at
org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
  at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
  at
org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
at
org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
at
org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
  at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318)
  at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262)
  at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)*

In my lib folder I have the uber jar about hdfs as usual but I am not able
to let the Job checkpointing its state correctly.
I read also here [1] but is not helping.

Thank you for the precious help

[1] - https://www.cnblogs.com/chendapao/p/9170566.html
-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Process Function's timers "postponing"

2019-06-25 Thread Andrea Spina
Hi Yun, thank you so much. That was an idea, I wanted to avoid to store an
additional state for it. In the end, I went for coalescing as documentation
suggested so that I will have just one timer per interval. What I didn't
catch initially from the documentation is that* for a determined key and a
determined timestamp Flink will retain just one timer, i.e. if I set two
timers to trigger at the same time T, Flink will trigger the timer once.*
I accept then to have at least one coalesced timer per interval.

Thank you again for your support!

Il giorno mar 25 giu 2019 alle ore 19:14 Yun Tang  ha
scritto:

> If you are using processing time, one possible way is to track last
> registered in another ValueState. And you could call
> #deleteProcessingTimeTimer(time) when you register new timer and found
> previous timer which stored in ValueState has smaller timestamp(T1) than
> current time (T2). After delete that processing timer, T1 would not trigger
> any action. You could refer to [1] and its usage for similar ideas.
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/CleanupState.java
>
> ------
> *From:* Andrea Spina 
> *Sent:* Tuesday, June 25, 2019 23:40
> *To:* Yun Tang
> *Cc:* user
> *Subject:* Re: Process Function's timers "postponing"
>
> Hi Yun, thank you for your answer. I'm not sure I got your point. My
> question is:
> for the same key K, I process two records R1 at t1 and R2 at t2.
> When I process R1, I set a timer to be triggered at T1 which is > t2
> When I process R2, I set a timer to be triggered at T2 which is > T1, but
> in order to do that, I want to remove the previous timer T1 in order to
> "postpone" the triggering.
>
> In other words, I would like for a single key to be active just one-timer
> and if a new timer is requested the old one should be deleted.
>
> Thank you,
>
> Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang  ha
> scritto:
>
> Hi Andrea
>
> If my understanding is correct, you just want to know when the eventual
> timer would be deleted. When you register your timer into
> 'processingTimeTimersQueue' (where your timer stored) at [1], the
> 'SystemProcessingTimeService' would then schedule a runnable TriggerTask
> after the "postpone" delay at [2]. When the scheduled runnable is
> triggered, it would poll from the 'processingTimeTimersQueue' [3] which
> means the timer would finally be removed. Hope this could help you.
>
> Best
> Yun Tang
>
> [1]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
> [2]
> https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
> [3]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
>
> <https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>
>
> --
> *From:* Andrea Spina 
> *Sent:* Tuesday, June 25, 2019 2:06
> *To:* user
> *Subject:* Process Function's timers "postponing"
>
> Dear Community,
> I am using Flink (processing-time) timers along with a Process Function.
> What I would like to do is to "postpone" eventually registered timers for
> the given key: I would like to do it since I might process plenty of events
> in a row (think about it as a session) so that I will able to trigger the
> computation "just after" this session somehow stops.
>
> I wondered about deleting eventual existing timers but AFAIU I need to
> know the previous timer triggering time, which I guess is not possible for
> me since I use processing-time timers.
>
> I read also [1] but I am not really able to understand if it comes handy
> to me; for instance, I don't understand what "Since Flink maintains only
> one timer per key and timestamp...". Does this imply that a new PT timer
> will automatically overwrite an eventual previously existing one?
>
> Thank you for your precious help,
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
> --
> *Andrea Spina*
> Head of R @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>
>
>
> --
> *Andrea Spina*
> Head of R @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>


-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Process Function's timers "postponing"

2019-06-25 Thread Andrea Spina
Hi Yun, thank you for your answer. I'm not sure I got your point. My
question is:
for the same key K, I process two records R1 at t1 and R2 at t2.
When I process R1, I set a timer to be triggered at T1 which is > t2
When I process R2, I set a timer to be triggered at T2 which is > T1, but
in order to do that, I want to remove the previous timer T1 in order to
"postpone" the triggering.

In other words, I would like for a single key to be active just one-timer
and if a new timer is requested the old one should be deleted.

Thank you,

Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang  ha
scritto:

> Hi Andrea
>
> If my understanding is correct, you just want to know when the eventual
> timer would be deleted. When you register your timer into
> 'processingTimeTimersQueue' (where your timer stored) at [1], the
> 'SystemProcessingTimeService' would then schedule a runnable TriggerTask
> after the "postpone" delay at [2]. When the scheduled runnable is
> triggered, it would poll from the 'processingTimeTimersQueue' [3] which
> means the timer would finally be removed. Hope this could help you.
>
> Best
> Yun Tang
>
> [1]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
> [2]
> https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
> [3]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
>
> <https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>
>
> --
> *From:* Andrea Spina 
> *Sent:* Tuesday, June 25, 2019 2:06
> *To:* user
> *Subject:* Process Function's timers "postponing"
>
> Dear Community,
> I am using Flink (processing-time) timers along with a Process Function.
> What I would like to do is to "postpone" eventually registered timers for
> the given key: I would like to do it since I might process plenty of events
> in a row (think about it as a session) so that I will able to trigger the
> computation "just after" this session somehow stops.
>
> I wondered about deleting eventual existing timers but AFAIU I need to
> know the previous timer triggering time, which I guess is not possible for
> me since I use processing-time timers.
>
> I read also [1] but I am not really able to understand if it comes handy
> to me; for instance, I don't understand what "Since Flink maintains only
> one timer per key and timestamp...". Does this imply that a new PT timer
> will automatically overwrite an eventual previously existing one?
>
> Thank you for your precious help,
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
> --
> *Andrea Spina*
> Head of R @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>


-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Process Function's timers "postponing"

2019-06-24 Thread Andrea Spina
Dear Community,
I am using Flink (processing-time) timers along with a Process Function.
What I would like to do is to "postpone" eventually registered timers for
the given key: I would like to do it since I might process plenty of events
in a row (think about it as a session) so that I will able to trigger the
computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know
the previous timer triggering time, which I guess is not possible for me
since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to
me; for instance, I don't understand what "Since Flink maintains only one
timer per key and timestamp...". Does this imply that a new PT timer will
automatically overwrite an eventual previously existing one?

Thank you for your precious help,

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Linkage Error RocksDB and flink-1.6.4

2019-06-24 Thread Andrea Spina
Hi Shu Su,
the first point exactly pinpointed the issue I bumped into. I forgot to put
that dependency to "provided". Thank you!

Il giorno lun 24 giu 2019 alle ore 05:35 Shu Su  ha
scritto:

> Hi Andrea
>
> Actually It’s caused by Flink’s ClassLoader. It’s because flink use
> parent Classloader to load jar first and then you use it in your user’s
> code, then user-code classloader will load it again so it raised the error.
> There are two solutions.
> 1.  Add scope “provided” to maven pom.xml
> 
> org.apache.flink
> flink-statebackend-rocksdb_2.11
> ${flink_version}
> *provided*
> 
> 2. Set this classloader.resolve-order: parent-first in flink-conf.yml
>
> Hope this will help you.
>
> Thanks,
> Simon
> On 06/24/2019 11:27,Yun Tang  wrote:
>
> Hi Andrea
>
> Since I have not written Scala for a while, I wonder why you need to
> instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions
> on JM side. As far as I can see, you could instantiate your on your TM side
> like code:
>
> val rocksdbConfig = new OptionsFactory() {
>   override def createDBOptions(currentOptions: DBOptions): DBOptions =
>  currentOptions.setIncreaseParallelism(properties.threadNo)
>
>   override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
> ColumnFamilyOptions =
>  
> currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
> }
>
> You just need to serialize the properties via closure to TMs. Hope this could 
> help you.
>
> Best
> Yun Tang
> --
> *From:* Andrea Spina 
> *Sent:* Monday, June 24, 2019 2:20
> *To:* user
> *Subject:* Linkage Error RocksDB and flink-1.6.4
>
> Dear community,
> I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11.
> At the job Startp the following exception happens (it's recorded by the Job
> Manager).
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
> previously initiated loading for a different type with name
> "org/rocksdb/DBOptions" at
> java.lang.ClassLoader.defineClass1(Native Method) at
> java.lang.ClassLoader.defineClass(ClassLoader.java:763) at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at
> java.net.URLClassLoader$1.run(URLClassLoader.java:369) at
> java.net.URLClassLoader$1.run(URLClassLoader.java:363) at
> java.security.AccessController.doPrivileged(Native Method) at
> java.net.URLClassLoader.findClass(URLClassLoader.java:362) at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)*
>
> For this job, I programmatically set some RocksDB options by using the
> code appended below. Anybody can help with this? Thank you so much,
> Andrea
>
>
> import org.apache.flink.configuration.MemorySize
> import org.apache.flink.contrib.streaming.state.{OptionsFactory, 
> PredefinedOptions, RocksDBStateBackend}
> import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}
>
> object ConfigurableRocksDB {
>
>   lazy val columnOptions = new ColumnFamilyOptions() with Serializable
>   lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
>   lazy val dbOptions = new DBOptions() with Serializable
>
>   def configureStateBackendRocksDB(properties: FlinkDeployment): 
> RocksDBStateBackend = {
> properties.threadNo.foreach(dbOptions.setIncreaseParallelism)
>
> properties.blockSize.foreach(bs => 
> tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
> properties.cacheSize.foreach(cs => 
> tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
> properties.cacheIndicesAndFilters.foreach(cif => if (cif) 
> tableConfig.cacheIndexAndFilterBlocks())
> properties.writeBufferSize.foreach(wbs => 
> columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))
>
> columnOptions.setTableFormatConfig(tableConfig)
> properties.writeBufferToMerge.foreach(bm => 
> columnOptions.setMinWriteBufferNumberToMerge(bm))
> properties.writeBufferCount.foreach(bc => 
> columnOptions.setMaxWriteBufferNumber(bc))
> properties.optimizeFilterForHits.foreach(op => if (op) 

Linkage Error RocksDB and flink-1.6.4

2019-06-23 Thread Andrea Spina
Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11.
At the job Startp the following exception happens (it's recorded by the Job
Manager).











*Caused by: java.lang.LinkageError: loader constraint violation: loader
(instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
previously initiated loading for a different type with name
"org/rocksdb/DBOptions"at java.lang.ClassLoader.defineClass1(Native
Method)at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
  at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)at
java.net.URLClassLoader$1.run(URLClassLoader.java:369)at
java.net.URLClassLoader$1.run(URLClassLoader.java:363)at
java.security.AccessController.doPrivileged(Native Method)at
java.net.URLClassLoader.findClass(URLClassLoader.java:362)at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)*

For this job, I programmatically set some RocksDB options by using the code
appended below. Anybody can help with this? Thank you so much,
Andrea


import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory,
PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}

object ConfigurableRocksDB {

  lazy val columnOptions = new ColumnFamilyOptions() with Serializable
  lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
  lazy val dbOptions = new DBOptions() with Serializable

  def configureStateBackendRocksDB(properties: FlinkDeployment):
RocksDBStateBackend = {
properties.threadNo.foreach(dbOptions.setIncreaseParallelism)

properties.blockSize.foreach(bs =>
tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
properties.cacheSize.foreach(cs =>
tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
properties.cacheIndicesAndFilters.foreach(cif => if (cif)
tableConfig.cacheIndexAndFilterBlocks())
properties.writeBufferSize.foreach(wbs =>
columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))

columnOptions.setTableFormatConfig(tableConfig)
properties.writeBufferToMerge.foreach(bm =>
columnOptions.setMinWriteBufferNumberToMerge(bm))
properties.writeBufferCount.foreach(bc =>
columnOptions.setMaxWriteBufferNumber(bc))
properties.optimizeFilterForHits.foreach(op => if (op)
columnOptions.optimizeFiltersForHits())

val rocksdbConfig = new OptionsFactory() {
  override def createDBOptions(currentOptions: DBOptions):
DBOptions = dbOptions
  override def createColumnOptions(currentOptions:
ColumnFamilyOptions): ColumnFamilyOptions = columnOptions
}

val stateBE =
  new RocksDBStateBackend(properties.checkpointDir.get,
properties.checkpointIncremental.getOrElse(false))
stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
stateBE.setOptions(rocksdbConfig)

    stateBE
  }

}

-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Flink sent/received bytes related metrics: how are they calculated?

2019-06-14 Thread Andrea Spina
Hi Chesnay, just one downstream: Sin (Source: Enriched Code) outcome is the
right part of the following operator as in the figure; this operator is the
exclusive downstream of Sin.
Thanks,

[image: Screenshot 2019-06-14 at 13.52.52.png]

Il giorno ven 14 giu 2019 alle ore 12:23 Chesnay Schepler <
ches...@apache.org> ha scritto:

> How does the *P1 *pipeline look like? Are there 2 downstream operators
> reading from *Sin* (in this case the number of bytes would be measured
> twice)?
>
> On 14/06/2019 12:09, Andrea Spina wrote:
>
> Sorry, I totally missed the version: flink-1.6.4, Streaming API
>
> Il giorno ven 14 giu 2019 alle ore 11:08 Chesnay Schepler <
> ches...@apache.org> ha scritto:
>
>> Which version of Flink are you using? There were some issues at some
>> point about double-counting.
>>
>> On 14/06/2019 09:49, Andrea Spina wrote:
>>
>> Dear Community, I'd like to ask for some details about bytes related
>> metrics in Flink. Precisely, I'm looking at *bytes sent* and *bytes
>> received *metrics: what I am recording is the following:
>>
>> I am reading from a Kafka topic *A* records with schema *K* using a
>> source *Sin *belonging to a pipeline *P1*. The topic *A* is filled with
>> *K* data using a separated pipeline *p2* and a sink *Sout*.
>>
>> Basically what I register is that the *bytes sent* metric of *Sin* - 
>> available
>> through the Flink UI -measure twice or more respect of the *bytes
>> received* of *Sout. *For instance, if my sink Sout records 10GB of *bytes
>> received* inbound, then my source Sin emits between 20-25GB *bytes sent*.
>>
>> [image: Screenshot 2019-06-14 at 09.40.08.png]
>>
>> Is someone able to detail how these two metrics are calculated?
>>
>> Thank you,
>>
>> --
>> *Andrea Spina*
>> Software Engineer @ Radicalbit Srl
>> Via Borsieri 41, 20159, Milano - IT
>>
>>
>>
>
> --
> *Andrea Spina*
> Head of R @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>
>
>

-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Flink sent/received bytes related metrics: how are they calculated?

2019-06-14 Thread Andrea Spina
Sorry, I totally missed the version: flink-1.6.4, Streaming API

Il giorno ven 14 giu 2019 alle ore 11:08 Chesnay Schepler <
ches...@apache.org> ha scritto:

> Which version of Flink are you using? There were some issues at some point
> about double-counting.
>
> On 14/06/2019 09:49, Andrea Spina wrote:
>
> Dear Community, I'd like to ask for some details about bytes related
> metrics in Flink. Precisely, I'm looking at *bytes sent* and *bytes
> received *metrics: what I am recording is the following:
>
> I am reading from a Kafka topic *A* records with schema *K* using a
> source *Sin *belonging to a pipeline *P1*. The topic *A* is filled with
> *K* data using a separated pipeline *p2* and a sink *Sout*.
>
> Basically what I register is that the *bytes sent* metric of *Sin* - available
> through the Flink UI -measure twice or more respect of the *bytes
> received* of *Sout. *For instance, if my sink Sout records 10GB of *bytes
> received* inbound, then my source Sin emits between 20-25GB *bytes sent*.
>
> [image: Screenshot 2019-06-14 at 09.40.08.png]
>
> Is someone able to detail how these two metrics are calculated?
>
> Thank you,
>
> --
> *Andrea Spina*
> Software Engineer @ Radicalbit Srl
> Via Borsieri 41, 20159, Milano - IT
>
>
>

-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Flink sent/received bytes related metrics: how are they calculated?

2019-06-14 Thread Andrea Spina
Dear Community, I'd like to ask for some details about bytes related
metrics in Flink. Precisely, I'm looking at *bytes sent* and *bytes
received *metrics: what I am recording is the following:

I am reading from a Kafka topic *A* records with schema *K* using a source *Sin
*belonging to a pipeline *P1*. The topic *A* is filled with *K* data using
a separated pipeline *p2* and a sink *Sout*.

Basically what I register is that the *bytes sent* metric of *Sin* - available
through the Flink UI -measure twice or more respect of the *bytes received*
 of *Sout. *For instance, if my sink Sout records 10GB of *bytes received*
inbound, then my source Sin emits between 20-25GB *bytes sent*.

[image: Screenshot 2019-06-14 at 09.40.08.png]

Is someone able to detail how these two metrics are calculated?

Thank you,

-- 
*Andrea Spina*
Software Engineer @ Radicalbit Srl
Via Borsieri 41, 20159, Milano - IT


Re: Flink 1.7.2: All jobs are getting deployed on the same task manager

2019-03-18 Thread Andrea Spina
Hi everybody. We're currently experimenting the same characteristic on
flink-1.6.2.

I've been reading that Flink treats all the slot as equals, it doesn't even
know where these slots reside
https://stackoverflow.com/questions/54980104/uneven-assignment-of-tasks-to-workers-in-flink.

So it should not be an issue; thus, the fact that it runs all the slots of
a machine before moving to a new one should be just a rough coincidence.

Given that, I'm pretty sure that I've never been recording this feature
using previous majors (I recall flink-1.3 for sure).
Moreover, this is damaging because you can get resources exhausted (e.g.
memory, disk).

Hope we might find a solution on this.
Sincerely,

Andrea


Il giorno lun 18 mar 2019 alle ore 11:53 Kumar Bolar, Harshith <
hk...@arity.com> ha scritto:

> Hi all,
>
>
>
> We're running a Flink on a five node standalone cluster with three task
> manager (TM1, TM2, TM3) and two job managers.
>
>
>
> Whenever I submit a new job, the job gets deployed on only TM3. When the
> number of slots in TM3 get exhausted, the jobs start getting deployed on
> TM2 and so on. How do I ensure that the jobs get distributed evenly across
> all 3 task managers?
>
>
>
> Thanks,
>
> Harshith
>
>
>


-- 
*Andrea Spina*
Software Engineer @ Radicalbit Srl
Via Borsieri 41, 20159, Milano - IT


Re: Flink 1.3.2 RocksDB map segment fail if configured as state backend

2018-09-18 Thread Andrea Spina
Hi Stefan, thank you. You were right. It was an unexpected permits issue.
Thank you again.

2018-09-17 16:50 GMT+02:00 Stefan Richter :

> Hi,
>
> I think the exception pretty much says what is wrong, the native library
> cannot be mapped into the process because of some access rights problem.
> Please make sure that your path /tmp has the exec right.
>
> Best,
> Stefan
>
>
> Am 17.09.2018 um 11:37 schrieb Andrea Spina :
>
> Hi everybody,
>
> I run with a Flink 1.3.2 installation on a Red Hat Enterprise Linux Server
> and I'm not able to set rocksdb as state.backend due to this error whenever
> I try to deploy any job:
>
> *java.lang.IllegalStateException: Could not initialize keyed state
> backend.*
> *at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)*
> *at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)*
> *at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:678)*
> *at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:666)*
> *at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)*
> *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)*
> *at java.lang.Thread.run(Thread.java:748)*
> *Caused by: java.io.IOException: Could not load the native RocksDB library*
> *at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:560)*
> *at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:298)*
> *at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:756)*
> *at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)*
> *... 6 more*
> *Caused by: java.lang.UnsatisfiedLinkError:
> /tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so:
> /tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so:
> failed to map segment from shared object: Operation not permitted*
> *at java.lang.ClassLoader$NativeLibrary.load(Native Method)*
> *at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)*
> *at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)*
> *at java.lang.Runtime.load0(Runtime.java:809)*
> *at java.lang.System.load(System.java:1086)*
> *at
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)*
> *at
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)*
> *at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:537)*
> *... 9 more*
>
> Machine detail
>
> *NAME="Red Hat Enterprise Linux Server"*
> *VERSION="7.5 (Maipo)"*
> *ID="rhel"*
> *ID_LIKE="fedora"*
>
> Cpu architecture (cat /proc/cpuinfo)
>
> *processor: 0*
> *vendor_id: GenuineIntel*
> *cpu family: 6*
> *model: 45*
> *model name: Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz*
> *stepping: 2*
> *microcode: 0x710*
> *cpu MHz: 2200.000*
> *cache size: 20480 KB*
> *physical id: 0*
> *siblings: 4*
> *core id: 0*
> *cpu cores: 4*
> *apicid: 0*
> *initial apicid: 0*
> *fpu: yes*
> *fpu_exception: yes*
> *cpuid level: 13*
> *wp: yes*
> *flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca
> cmov pat pse36 clflush dts mmx fxsr sse sse2 ss ht syscall nx rdtscp lm
> constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc
> aperfmperf pni pclmulqdq ssse3 cx16 pcid sse4_1 sse4_2 x2apic popcnt aes
> xsave avx hypervisor lahf_lm epb dtherm ida arat pln pts*
> *bogomips: 4400.00*
> *clflush size: 64*
> *cache_alignment: 64*
> *address sizes: 40 bits physical, 48 bits virtual*
>
> I found similar errors related to different systems [1] and similar
> problems with rocksdb related to endianness [2], but mine sounds different.
> Since the upgrade to newer Flink version atm might be painful, are there
> any reason behind this exception, and is a workaround existing?
>
> Thank you so much,
>
> Andrea
>
> [1] - https://communities.ca.com/thread/241773398-em-failed-to-start-
> tmplibrocksdbjni8883977260053861907so-failed-to-map-segment-
> from-shared-object-operation-not-permitted
>
> [2] - http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Unable-to-use-Flink-RocksDB-state-backend-
> due-to-endianness-mismatch-td13473.html
>
> --
> *Andrea Spina*
> Software Engineer @ Radicalbit Srl
> Via Borsieri 41, 20159, Milano
> <https://maps.google.com/?q=Via+Borsieri+41,+20159,+Milano=gmail=g>
> - IT
>
>
>


-- 
*Andrea Spina*
Software Engineer @ Radicalbit Srl
Via Borsieri 41, 20159, Milano - IT


Flink 1.3.2 RocksDB map segment fail if configured as state backend

2018-09-17 Thread Andrea Spina
Hi everybody,

I run with a Flink 1.3.2 installation on a Red Hat Enterprise Linux Server
and I'm not able to set rocksdb as state.backend due to this error whenever
I try to deploy any job:

*java.lang.IllegalStateException: Could not initialize keyed state backend.*
*at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)*
*at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:678)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:666)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)*
*at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)*
*at java.lang.Thread.run(Thread.java:748)*
*Caused by: java.io.IOException: Could not load the native RocksDB library*
*at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:560)*
*at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:298)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:756)*
*at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)*
*... 6 more*
*Caused by: java.lang.UnsatisfiedLinkError:
/tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so:
/tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so:
failed to map segment from shared object: Operation not permitted*
*at java.lang.ClassLoader$NativeLibrary.load(Native Method)*
*at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)*
*at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)*
*at java.lang.Runtime.load0(Runtime.java:809)*
*at java.lang.System.load(System.java:1086)*
*at
org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)*
*at
org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)*
*at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:537)*
*... 9 more*

Machine detail

*NAME="Red Hat Enterprise Linux Server"*
*VERSION="7.5 (Maipo)"*
*ID="rhel"*
*ID_LIKE="fedora"*

Cpu architecture (cat /proc/cpuinfo)

*processor: 0*
*vendor_id: GenuineIntel*
*cpu family: 6*
*model: 45*
*model name: Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz*
*stepping: 2*
*microcode: 0x710*
*cpu MHz: 2200.000*
*cache size: 20480 KB*
*physical id: 0*
*siblings: 4*
*core id: 0*
*cpu cores: 4*
*apicid: 0*
*initial apicid: 0*
*fpu: yes*
*fpu_exception: yes*
*cpuid level: 13*
*wp: yes*
*flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca
cmov pat pse36 clflush dts mmx fxsr sse sse2 ss ht syscall nx rdtscp lm
constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc
aperfmperf pni pclmulqdq ssse3 cx16 pcid sse4_1 sse4_2 x2apic popcnt aes
xsave avx hypervisor lahf_lm epb dtherm ida arat pln pts*
*bogomips: 4400.00*
*clflush size: 64*
*cache_alignment: 64*
*address sizes: 40 bits physical, 48 bits virtual*

I found similar errors related to different systems [1] and similar
problems with rocksdb related to endianness [2], but mine sounds different.
Since the upgrade to newer Flink version atm might be painful, are there
any reason behind this exception, and is a workaround existing?

Thank you so much,

Andrea

[1] -
https://communities.ca.com/thread/241773398-em-failed-to-start-tmplibrocksdbjni8883977260053861907so-failed-to-map-segment-from-shared-object-operation-not-permitted

[2] -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unable-to-use-Flink-RocksDB-state-backend-due-to-endianness-mismatch-td13473.html

-- 
*Andrea Spina*
Software Engineer @ Radicalbit Srl
Via Borsieri 41, 20159, Milano - IT


Re: streaming predictions

2018-07-24 Thread Andrea Spina
Dear Cederic,
I did something similar as yours a while ago along this work [1] but I've
always been working within the batch context. I'm also the co-author of
flink-jpmml and, since a flink2pmml model saver library doesn't exist
currently, I'd suggest you a twofold strategy to tackle this problem:
- if your model is relatively simple, take the batch evaluate method (it
belongs to your SVM classifier) and attempt to translate it in a flatMap
function (hopefully you can reuse some internal utilities, Flink exploits
breeze vector library under the hoods [3]).
- if your model is a complex one, you should export the model into PMML and
employ then [2]. For a first overview, this [4] is the library you should
adopt as to export your model and this [5] can help you with the related
implementation.

Hope it can help and good luck!

Andrea

[1] https://dl.acm.org/citation.cfm?id=3070612
[2] https://github.com/FlinkML/flink-jpmml
[3]
https://github.com/apache/flink/blob/7034e9cfcb051ef90c5bf0960bfb50a79b3723f0/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L73
[4] https://github.com/jpmml/jpmml-model
[5] https://github.com/jpmml/jpmml-sparkml

2018-07-24 13:29 GMT+02:00 David Anderson :

> One option (which I haven't tried myself) would be to somehow get the
> model into PMML format, and then use https://github.com/
> FlinkML/flink-jpmml to score the model. You could either use another
> machine learning framework to train the model (i.e., a framework that
> directly supports PMML export), or convert the Flink model into PMML. Since
> SVMs are fairly simple to describe, that might not be terribly difficult.
>
> On Mon, Jul 23, 2018 at 4:18 AM Xingcan Cui  wrote:
>
>> Hi Cederic,
>>
>> If the model is a simple function, you can just load it and make
>> predictions using the map/flatMap function in the StreamEnvironment.
>>
>> But I’m afraid the model trained by Flink-ML should be a “batch job",
>> whose predict method takes a Dataset as the parameter and outputs another
>> Dataset as the result. That means you cannot easily apply the model on
>> streams, at least for now.
>>
>> There are two options to solve this. (1) Train the dataset using another
>> framework to produce a simple function. (2) Adjust your model serving as a
>> series of batch jobs.
>>
>> Hope that helps,
>> Xingcan
>>
>> On Jul 22, 2018, at 8:56 PM, Hequn Cheng  wrote:
>>
>> Hi Cederic,
>>
>> I am not familiar with SVM or machine learning but I think we can work it
>> out together.
>> What problem have you met when you try to implement this function? From
>> my point of view, we can rebuild the model in the flatMap function and use
>> it to predict the input data. There are some flatMap documents here[1].
>>
>> Best, Hequn
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> master/dev/stream/operators/#datastream-transformations
>>
>>
>>
>>
>>
>> On Sun, Jul 22, 2018 at 4:12 PM, Cederic Bosmans 
>> wrote:
>>
>>> Dear
>>>
>>> My name is Cederic Bosmans and I am a masters student at the Ghent
>>> University (Belgium).
>>> I am currently working on my masters dissertation which involves Apache
>>> Flink.
>>>
>>> I want to make predictions in the streaming environment based on a model
>>> trained in the batch environment.
>>>
>>> I trained my SVM-model this way:
>>> val svm2 = SVM()
>>> svm2.setSeed(1)
>>> svm2.fit(trainLV)
>>> val testVD = testLV.map(lv => (lv.vector, lv.label))
>>> val evalSet = svm2.evaluate(testVD)
>>>
>>> and saved the model:
>>> val modelSvm = svm2.weightsOption.get
>>>
>>> Then I have an incoming datastream in the streaming environment:
>>> dataStream[(Int, Int, Int)]
>>> which should be bininary classified using this trained SVM model.
>>>
>>> Since the predict function does only support DataSet and not DataStream,
>>> on stackoverflow a flink contributor mentioned that this should be done
>>> using a map/flatMap function.
>>> Unfortunately I am not able to work this function out.
>>>
>>> It would be incredible for me if you could help me a little bit further!
>>>
>>> Kind regards and thanks in advance
>>> Cederic Bosmans
>>>
>>
>>
>>
>
> --
> *David Anderson* | Training Coordinator | data Artisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
>



-- 
*Andrea Spina*
Software Engineer @ Radicalbit Srl
Via Borsieri 41, 20159, Milano - IT


Re: Model serving in Flink DataStream

2017-11-15 Thread Andrea Spina
Hi Adarsh,
we developed flink-JPMML for streaming model serving based on top of the
PMML format and of course Flink: we didn't release any official benchmark
numbers yet. We didn't bump into any performance issue along the library
employment. In terms of throughput and latency it doesn't require more
effort than using Flink streaming APIs by itself.

What it can happen is high memory usage if you're deploying thousands of
(fatty) models at a time within the same pipeline, but this was a design
choice (you can see explanation here
https://www.youtube.com/watch?v=0rWvMZ6JSD8=17s). 

AFAIK the lib is already deployed in a couple of projects. Don't hesitate to
write on Github issues if you have more questions.

https://github.com/FlinkML/flink-jpmml

Cheers,

Andrea



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FlinkML ALS is taking too long to run

2017-07-12 Thread Andrea Spina
Dear Ziyad, 

Yep, I had encountered same very long runtimes with ALS as well at the time
and I recorded improvements by increasing the number of blocks / decreasing
#TSs/TM like you've stated out.

Cheers,

Andrea






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkML-ALS-is-taking-too-long-to-run-tp14154p14192.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: FlinkML ALS is taking too long to run

2017-07-11 Thread Andrea Spina
Dear Ziyad,
could you kindly share some additional info about your environment
(local/cluster, nodes, machines' configuration)?
What does exactly you mean by "indefinitely"? How much time the job is
hanging?

Hope to help you, then.

Cheers,

Andrea



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkML-ALS-is-taking-too-long-to-run-tp14154p14186.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

2017-06-21 Thread Andrea Spina
I Gordon, sadly no news since the last message.

At the end I jumped over the issue, I was not able to solve it. I'll try
provide a runnable example asap.

Thank you. 

Andrea



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558p13896.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

2017-06-08 Thread Andrea Spina
Hi guys,

thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0
versions. 
Following Gordon suggestion I tried to put setReference to false but sadly
it didn't help. What I did then was to declare a custom serializer as the
following:

class BlockSerializer extends Serializer[Block] with Serializable {

override def read(kryo: Kryo, input: Input, block: Class[Block]): Block
= {
  val serializer = new SparseMatrixSerializer

  val blockData = kryo.readObject(input, classOf[SparseMatrix],
serializer)
  new Block(blockData)
}

override def write(kryo: Kryo, output: Output, block: Block): Unit = {
  val serializer = new SparseMatrixSerializer

  kryo.register(classOf[SparseMatrix], serializer)
  kryo.writeObject(output, block.blockData, serializer)

  output.close()
}

  }

  class SparseMatrixSerializer extends Serializer[SparseMatrix] with
Serializable {

override def read(kryo: Kryo, input: Input, sparse:
Class[SparseMatrix]): SparseMatrix = {
  val collectionIntSerializer = new CollectionSerializer()
  collectionIntSerializer.setElementClass(classOf[Int], new
IntSerializer)
  val collectionDoubleSerializer = new CollectionSerializer()
  collectionDoubleSerializer.setElementClass(classOf[Double], new
DoubleSerializer)

  val numRows = input.readInt
  val numCols = input.readInt
  val colPtrs = kryo.readObject(input,
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray
  val rowIndices = kryo.readObject(input,
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray
  val data = kryo.readObject(input,
classOf[java.util.ArrayList[Double]],
collectionDoubleSerializer).asScala.toArray

  input.close()

  new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs =
colPtrs, rowIndices = rowIndices, data = data)
}

override def write(kryo: Kryo, output: Output, sparseMatrix:
SparseMatrix): Unit = {

  val collectionIntSerializer = new CollectionSerializer()
  collectionIntSerializer.setElementClass(classOf[Int], new
IntSerializer)

  val collectionDoubleSerializer = new CollectionSerializer()
  collectionDoubleSerializer.setElementClass(classOf[Double], new
DoubleSerializer)

  kryo.register(classOf[java.util.ArrayList[Int]],
collectionIntSerializer)
  kryo.register(classOf[java.util.ArrayList[Double]],
collectionDoubleSerializer)

  output.writeInt(sparseMatrix.numRows)
  output.writeInt(sparseMatrix.numCols)
  kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava,
collectionIntSerializer)
  kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava,
collectionIntSerializer)
  kryo.writeObject(output, sparseMatrix.data.toList.asJava,
collectionDoubleSerializer)

  output.close()
}

  }

  What I obtained is the same previous exception but on different accessed
index and size.

  Caused by: java.lang.Exception: The data preparation for task 'CHAIN
GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))'
, caused an error: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: Index: 1, Size: 0
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1,
Size: 0
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Index: 1, Size: 0
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.set(ArrayList.java:444)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:680)
at

Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

2017-06-07 Thread Andrea Spina
Good afternoon dear Community,

Since few days I'm really struggling to understand the reason behind this
KryoException. Here the stack trace.

2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask  
   
- Error in task code:  CHAIN GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat
ion$.main(MatrixMultiplication.scala:46)) (1/1)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B
lockMatrix.scala:103)) -> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
, caused an error: E
rror obtaining the sorted input: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception:
java.lang.IndexOu
tOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 11 more
2017-06-07 10:18:52,594 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage
stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)]
2017-06-07 10:18:52,766 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Direct
memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281
2017-06-07 10:18:52,766 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Off-heap
pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace:
57/58/-1 MB (used/committed/max)], [Compressed Class Space: 

Async Functions and Scala async-client for mySql/MariaDB database connection

2017-03-30 Thread Andrea Spina
Dear Flink community,

I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve
enriching information from MariaDB database. In order to do that, I firstly
employed classical jdbc library (org.mariadb.jdbc) and it worked has
expected.

Due to the blocking behavior of jdbc, I'm trying to use this library
https://github.com/mauricio/postgresql-async/tree/master/mysql-async
which promises to offer a subset of features in a non-blocking fashion.

Sadly I'm not able to use it.

Following the async function code.

*
object AsyncEnricher {
  case class OutputType(field1: FieldType, field2: FieldType)
}

class AsyncEnricher(configuration: MariaDBConfig)
extends AsyncFunction[InputType, OutputType]
with Serializable
with AutoCloseable
with LazyLogging {

  private val queryString = s"SELECT  FROM [table] WHERE
 = ;"

  implicit lazy val executor =
ExecutionContext.fromExecutor(Executors.directExecutor())

  private lazy val mariaDBClient: Connection = {
val config = createConfiguration(configuration)
val connection = new MySQLConnection(config)
Await.result(connection.connect, 5 seconds)
  }

  override def asyncInvoke(input: InputType, collector:
AsyncCollector[OutputType]): Unit = {

val queryResult = mariaDBClient.sendPreparedStatement(queryString,
Seq(input.fieldToSearch))

queryResult.map(_.rows) onSuccess {
  case Some(resultSet) =>
Try {
  resultSet.head(0).asInstanceOf[FieldType]
} match {
  case Success(value) =>
collector.collect(Iterable(OutputType(value, value)))
  case Failure(e) =>
logger.error(s"retrieving value from MariaDB raised $e:
$queryString executed")
}
  case _ => logger.error(s"value not found: $queryString executed")
}

queryResult onFailure {
  case e: Throwable =>
logger.error(s"retrieving location volume from MariaDB raised $e:
$queryString executed")
}

  }

  override def close(): Unit = {
Try(mariaDBClient.disconnect).recover {
  case t: Throwable => logger.info(s"MariaDB cannot be closed -
${t.getMessage}")
}
  }

}
*

Follows the stack

/
TimerException{java.lang.IllegalStateException: Timer service is shut down}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Timer service is shut down
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118)
at
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
... 7 more

java.lang.NullPointerException
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
/

I think it's involving connection.connect returning object which is a Future
and so the Await. This is different than jdbc driver, which worked like a
charm. I tried to move away the await from the lazy val.

Can't wait for your opinion. Thank you so much in advance.

Andrea 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Functions-and-Scala-async-client-for-mySql-MariaDB-database-connection-tp12469.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Cogrouped Stream never triggers tumbling event time window

2017-03-30 Thread Andrea Spina
Dear community,

I finally solved the issue i was bumped into.
Basically the reason of the encountered problem was the behavior of my
input: incoming rates were so far different in behavior (really late and
scarce presence of second type event in event time).

The solution I employed was to assign timestamps and watermarks to the
source stream just before splitting it into my first type and second type
handled streams. I suppose this solved my problem due to EventTimeTrigger
.getCurrentWatermark() method, which I think it returns the minimum
watermark between the streams scoped by the TriggerContext. So the window
was hanging because of the incoming rate behavior of the second type stream.

Hope it could help someone in the future.

Cheers,

Andrea



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12468.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Cogrouped Stream never triggers tumbling event time window

2017-03-23 Thread Andrea Spina
Sorry, I forgot to put the Flink version. 1.1.2

Thanks, Andrea





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12374.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Cogrouped Stream never triggers tumbling event time window

2017-03-23 Thread Andrea Spina
Dear Community,

I'm really struggling on a co-grouped stream. The workload is the following:

*
val firstStream: DataStream[FirstType] =
  firstRaw.assignTimestampsAndWatermarks(new
MyCustomFirstExtractor(maxOutOfOrder))

val secondStream: DataStream[SecondType] = secondRaw
  .assignTimestampsAndWatermarks(new
MyCustomSecondExtractor(maxOutOfOrder))
  .map(new toSecondsStreamMapper())
*

where both the Extractors extend BoundedOutOfOrdernessTimestampExtractor by
overriding the extractTimestamp method and assigning timestamps owned
respectively by FirstType and SecondType objects.

*override def extractTimestamp(first: FirstType): Long = first.timestamp*

Then I'm calling cogroup as follows

*

val stockDetails = firstStream
  .coGroup(secondStream)
  .where(_.id)
  .equalTo(_.id)
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .apply(new MyCogroupFunction())
  .uid("myCogroup")
  .name("My CoGroup")

*

The problem is the CoGroup function is never triggered. I did several tests
and I was not able to solve it at all. 
The first relevant point is that event time can be seriously out-of-order. I
can even bump into 0 timestamp. Then I faked also timestamps in order to
distribute them in a set of two seconds, five seconds, so forth. These tries
didn't change at all the behavior: no one window is raised.

Another relevant is: I'm running locally by reading from a pre-loaded kafka
topic, then all the events are ridden sequentially at startup.

I will give a couple example

Workload 1 (faked timestamps)
fields (id, timestamp)
FirstType(9781783433803 ,1490280129517)
FirstType(9781783433803 ,1490280129517)
FirstType(9781783433803 ,1490280131191)
FirstType(9781783433803 ,1490280131191)
FirstType(9781783433803 ,1490280131214)
FirstType(9781783433803 ,1490280131214)
FirstType(9781783433803 ,1490280131250)
FirstType(9781783433803 ,1490280131250)
FirstType(9781783433803 ,1490280131294)
FirstType(9781783433803 ,1490280131294)
FirstType(9781783433803 ,1490280131328)
FirstType(9781783433803 ,1490280131328)

SecondType(9781783433803,1490280130465)
SecondType(9781783433803,1490280131027)
SecondType(9781783433803,1490280131051)
SecondType(9781783433803,1490280131070)
SecondType(9781783433803,1490280131085)
SecondType(9781783433803,1490280131103)
SecondType(9781783433803,1490280131124)
SecondType(9781783433803,1490280131143)
SecondType(9781783433803,1490280131158)
SecondType(9781783433803,1490280131175)

Workload 2 (real case timestamps)

> FirstType(9781783433803, 1490172958602)
1> FirstType(9781783433803, ,1490172958611)
1> FirstType(9781783433803, 1490172958611)
1> FirstType(9781783433803, 1490172958620)
1> FirstType(9781783433803, 1490172958620)
1> FirstType(9781783433803 ,1490196171869)
1> FirstType(9781783433803, 1490196171869)

SecondType(9781783433803 ,0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 1488834670490)
SecondType(9781783433803, 1489577984143)
SecondType(9781783433803, 0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 1488834670490)
SecondType(9781783433803, 1489577984143)
SecondType(9781783433803, 1489689399726)
SecondType(9781783433803, 1489689399726)

I confirm that I have healthy incoming streams at the entrance of the
coGroup operator.
I think I'm likely missing something easy.

Any help will be really appreciated.

Sincerly,

Andrea




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

2016-09-02 Thread ANDREA SPINA
Hi Stefan,
Thank you so much for the answer. Ok, I'll do it asap.
For the sake of argument, could the issue be related to the low number of
blocks? I noticed the Flink implementation, as default, set the number of
blocks to the input count (which is actually a lot). So with a low
cardinality and big sized blocks, maybe they don't fit somewhere...
Thank you again.

Andrea

2016-09-02 10:51 GMT+02:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> unfortunately, the log does not contain the required information for this
> case. It seems like a sender to the SortMerger failed. The best way to find
> this problem is to take a look to the exceptions that are reported in the
> web front-end for the failing job. Could you check if you find any reported
> exceptions there and provide them to us?
>
> Best,
> Stefan
>
> Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <74...@studenti.unimore.it>:
>
> Sure. Here <https://drive.google.com/open?id=0B6TTuPO7UoeFRXY3RW1KQnNrd3c>
> you can find the complete logs file.
> Still can not run through the issue. Thank you for your help.
>
> 2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>
>> I don't know whether my usual error is related to this one but is very
>> similar and it happens randomly...I still have to figure out the root cause
>> of the error:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map
>> at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted
>> input: Thread 'SortMerger spilling thread' terminated due to an exception:
>> -2
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception: -2
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1079)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:94)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: -2
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadO
>> bject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:135)
>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:219)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:245)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.copy(KryoSerializer.java:255)
>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.
>> copy(PojoSerializer.java:556)
>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB
>> ase.copy(TupleSerializerBase.java:75)
>> at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.
>> writeToOutput(NormalizedKeySorter.java:499)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $SpillingThread.go(UnilateralSortMerger.java:1344)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>
>>
>> On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> could you provide the log outputs for your job (ideally with debug
>>> logging enabled)?
>>>
>>> Best,
>>> Stefan
>>>
&

Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

2016-09-01 Thread ANDREA SPINA
Sure. Here <https://drive.google.com/open?id=0B6TTuPO7UoeFRXY3RW1KQnNrd3c>
you can find the complete logs file.
Still can not run through the issue. Thank you for your help.

2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> I don't know whether my usual error is related to this one but is very
> similar and it happens randomly...I still have to figure out the root cause
> of the error:
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map
> at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted
> input: Thread 'SortMerger spilling thread' terminated due to an exception:
> -2
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: -2
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1079)
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:94)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: -2
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
> MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at com.esotericsoftware.kryo.serializers.MapSerializer.
> read(MapSerializer.java:135)
> at com.esotericsoftware.kryo.serializers.MapSerializer.
> read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:219)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:245)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(
> KryoSerializer.java:255)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(
> PojoSerializer.java:556)
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(
> TupleSerializerBase.java:75)
> at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.
> writeToOutput(NormalizedKeySorter.java:499)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> SpillingThread.go(UnilateralSortMerger.java:1344)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
>
> On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> could you provide the log outputs for your job (ideally with debug
>> logging enabled)?
>>
>> Best,
>> Stefan
>>
>> Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <74...@studenti.unimore.it>:
>>
>> Hi everyone.
>> I'm running the FlinkML ALS matrix factorization and I bumped into the
>> following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> at org.apache.flink.client.program.ContextEnvironment.execute(C
>> ontextEnvironment.java:60)
>> at org.apache.flink.api.scala.ExecutionEnvironment.execute(Exec
>> utionEnvironment.scala:652)
>> at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTool
>> s.scala:94)
>> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
>> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
>> at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
>> at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
>> at dima.tu.berlin.bench

FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

2016-08-31 Thread ANDREA SPINA
)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
at java.lang.Thread.run(Thread.java:745)

I'm running with* flink-1.0.3*. I really can't figure out the reason behind
that.

My code simply calls the library as follows:

val als = ALS()
  .setIterations(numIterations)
  .setNumFactors(rank)
  .setBlocks(degreeOfParallelism)
  .setSeed(42)
  .setTemporaryPath(tempPath)

als.fit(ratings, parameters)

val (users, items) = als.factorsOption match {
  case Some(_) => als.factorsOption.get
  case _ => throw new RuntimeException
}

users.writeAsText(outputPath, WriteMode.OVERWRITE)
items.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("ALS matrix factorization")

where
- ratings as the input dataset contains (uid, iid, rate) rows about 8e6
users, 1e6 items and 700 rating per user average.
- numIterations 10
- rank 50
- degreeOfParallelism 240


*The error seems to be related to the final .persists() call.*at
org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)

I'm running with a 15 nodes cluster - 16cpus per node - with the following
valuable properties:

jobmanager.heap.mb = 2048
taskmanager.memory.fraction = 0.5
taskmanager.heap.mb = 28672
taskmanager.network.bufferSizeInBytes = 32768
taskmanager.network.numberOfBuffers = 98304
akka.ask.timeout = 300s

Any help will be appreciated. Thank you.

-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)


Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining

2016-07-13 Thread ANDREA SPINA
Hi everybody,
increasing the akka.ask.timeout solved the second issue. Anyway that was a
warning about a congestioned network. So I worked to improve the algorithm.
Increasing the numberOfBuffers and the corresponding size solved the first
issue, thus now I can run with the full DOP.
In my case enabling the off-heap memory didn't the trick.

Thank you. All the bests,
Andrea

2016-06-29 17:10 GMT+02:00 Martin Scholl <m...@funkpopes.org>:

> Other than increasing the ask.timeout, we've seen such failures being
> caused by long GC pauses over bigger heaps. In such a case, you could
> fiddle with a) enabling object reuse, or b) enabling off-heap memory (i.e.
> taskmanager.memory.off-heap == true) to mitigate GC-induced issues a bit.
>
> Hope it helps,
> Martin
>
>
> On Wed, Jun 29, 2016 at 3:29 PM Ufuk Celebi <u...@apache.org> wrote:
>
>> OK, looks like you can easily give more memory to the network stack,
>> e.g. for 2 GB set
>>
>> taskmanager.network.numberOfBuffers = 65536
>> taskmanager.network.bufferSizeInBytes = 32768
>>
>> For the other exception, your logs confirm that there is something
>> else going on. Try increasing the akka ask timeout:
>>
>> akka.ask.timeout: 100 s
>>
>> Does this help?
>>
>>
>> On Wed, Jun 29, 2016 at 3:10 PM, ANDREA SPINA <74...@studenti.unimore.it>
>> wrote:
>> > Hi Ufuk,
>> >
>> > so the memory available per node is 48294 megabytes per node, but I
>> reserve
>> > 28 by flink conf file.
>> > taskmanager.heap.mb = 28672
>> > taskmanager.memory.fraction = 0.7
>> > taskmanager.network.numberOfBuffers = 32768
>> > taskmanager.network.bufferSizeInBytes = 16384
>> >
>> > Anyway Follows what I found in log files.
>> >
>> > Follows the taskmanager log (task manager that seems failed)
>> >
>> > 2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
>> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
>> >
>> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
>> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
>> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
>> (1/1)
>> > switched to FAILED with exception.
>> > java.lang.IllegalStateException: Received unexpected partition state
>> null
>> > for partition request. This is a bug.
>> > at
>> >
>> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
>> > at
>> > org.apache.flink.runtime.taskmanager.TaskManager.org
>> $apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:
>> > 468)
>> > at
>> >
>> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> > at
>> >
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> > at
>> >
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> > at
>> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> > at
>> > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> > at
>> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>> > at
>> >
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> > at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> > at
>> >
>> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> > at akka.dispatch.Ma

Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining

2016-06-29 Thread ANDREA SPINA
l(Future.scala:244)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@130.149.21.16:6122/user/taskmanager#1824295872]]
after [1 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)

Really appreciating your help here. :)
Cheers,
Andrea

2016-06-29 13:48 GMT+02:00 Ufuk Celebi <u...@apache.org>:

> Hey Andrea! Sorry for the bad user experience.
>
> Regarding the network buffers: you should be able to run it after
> increasing the number of network buffers, just account for it when
> specifying the heap size etc. You currently allocate 32768 * 16384
> bytes = 512 MB for them. If you have a very long pipeline and high
> parallelism, you should increase it accordingly. How much memory do
> you have on your machines?
>
> Regarding the IllegalStateException: I suspect that this is **not**
> the root failure cause. The null ExecutionState can only happen, if
> the producer task (from which data is requested) failed during the
> request. The error message is confusing and I opened a JIRA to fix it:
> https://issues.apache.org/jira/browse/FLINK-4131. Can you please check
> your complete logs to see what the root cause might be, e.g. why did
> the producer fail?
>
>
> On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA
> <74...@studenti.unimore.it> wrote:
> > Hi everyone,
> >
> > I am running some Flink experiments with Peel benchmark
> > http://peel-framework.org/ and I am struggling with exceptions: the
> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
> ~80GiB
> > and is located on Hdfs 2.7.1. Flink version is 1.0.3.
> >
> > At the beginning I tried with 400 as degree of parallelism but not enough
> > numberOfBuffers was raised so I changed the parallelism to 200. Flink
> > configuration follows:
> >
> > jobmanager.rpc.address = ${runtime.hostname}
> > akka.log.lifecycle.events = ON
> > akka.ask.timeout = 300s
> > jobmanager.rpc.port = 6002
> > jobmanager.heap.mb = 1024
> > jobmanager.web.port = 6004
> > taskmanager.heap.mb = 28672
> > taskmanager.memory.fraction = 0.7
> > taskmanager.network.numberOfBuffers = 32768
> > taskmanager.network.bufferSizeInBytes = 16384
> > taskmanager.tmp.dirs =
> >
> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
> > taskmanager.debug.memory.startLogThread = true
> >
> > With a parallelism of 200 the following exception will raise from a node
> of
> > the cluster:
> >
> > 2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
> > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> >
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69))
> (1/1)
> > switched to FAILED with exception.
> > java.lang.IllegalStateException: Received unexpected partition state null
> > for partition request. This is a bug.
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
> >
> >
> > The reduce code is:
> >
> > 43  val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
> >
> > The map code is:
> >
> > 68  def createI

Apache Flink 1.0.3 IllegalStateException Partiction State on chaining

2016-06-29 Thread ANDREA SPINA
Hi everyone,

I am running some Flink experiments with Peel benchmark
http://peel-framework.org/ and I am struggling with exceptions: the
environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
~80GiB and is located on Hdfs 2.7.1. Flink version is 1.0.3.

At the beginning I tried with 400 as degree of parallelism but not enough
numberOfBuffers was raised so I changed the parallelism to 200. Flink
configuration follows:

jobmanager.rpc.address = ${runtime.hostname}
akka.log.lifecycle.events = ON
akka.ask.timeout = 300s
jobmanager.rpc.port = 6002
jobmanager.heap.mb = 1024
jobmanager.web.port = 6004
taskmanager.heap.mb = 28672
taskmanager.memory.fraction = 0.7
taskmanager.network.numberOfBuffers = 32768
taskmanager.network.bufferSizeInBytes = 16384
taskmanager.tmp.dirs =
"/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
taskmanager.debug.memory.startLogThread = true

With a parallelism of 200 the following exception will raise from a node of
the cluster:

2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
- CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
-> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
.sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
switched to FAILED with exception.
java.lang.IllegalStateException: Received unexpected partition state null
for partition request. This is a bug.
at
org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)


The reduce code is:

43  val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)

The map code is:

68  def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = {
69dimensionDS.map {
70  dimension =>
71  val values = DenseVector(Array.fill(dimension)(0.0))
72  values
73}
74  }

I can't figure out a solution, thank you for your help.

Andrea

-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)


Re: Flink java.io.FileNotFoundException Exception with Peel Framework

2016-06-29 Thread ANDREA SPINA
Hi,

the problem was solved after I figured out there was an istance of Flink
TaskManager running on a node of the cluster.
Thank you,
Andrea

2016-06-28 12:17 GMT+02:00 ANDREA SPINA <74...@studenti.unimore.it>:

> Hi Max,
> thank you for the fast reply and sorry: I use flink-1.0.3.
> Yes I tested on dummy dataset with numOfBuffers = 16384 and decreasing the
> parallelism degree and this solution solved the first exception. Anyway on
> the 80GiB dataset I struggle with the second exception.
>
> Regards,
> Andrea
>
> 2016-06-28 12:08 GMT+02:00 Maximilian Michels <m...@apache.org>:
>
>> Hi Andrea,
>>
>> The number of network buffers should be sufficient. Actually, assuming
>> you have 16 task slots on each of the 25 nodes, it should be enough to
>> have 16^2 * 25 * 4 = 14400 network buffers.
>>
>> See
>> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#background
>>
>> So we have to investigate a little more. Which version of Flink are you
>> using?
>>
>> Cheers,
>> Max
>>
>> On Tue, Jun 28, 2016 at 11:57 AM, ANDREA SPINA
>> <74...@studenti.unimore.it> wrote:
>> > Hi everyone,
>> >
>> > I am running some Flink experiments with Peel benchmark
>> > http://peel-framework.org/ and I am struggling with exceptions: the
>> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
>> ~80GiB
>> > and is located on Hdfs 2.7.1.
>> >
>> > At the beginning I tried with 400 as degree of parallelism and with the
>> > following configuration:
>> >
>> > jobmanager.rpc.address = ${runtime.hostname}
>> > akka.log.lifecycle.events = ON
>> > akka.ask.timeout = 300s
>> > jobmanager.rpc.port = 6002
>> >
>> > jobmanager.heap.mb = 1024
>> > jobmanager.web.port = 6004
>> >
>> > taskmanager.heap.mb = 28672
>> > taskmanager.memory.fraction = 0.7
>> > taskmanager.network.numberOfBuffers = 32768
>> > taskmanager.network.bufferSizeInBytes = 16384
>> > taskmanager.tmp.dirs =
>> >
>> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
>> > taskmanager.debug.memory.startLogThread = true
>> >
>> > the following exception will raise
>> >
>> > Caused by: java.io.IOException: Insufficient number of network buffers:
>> > required 350, but only 317 available. The total number of network
>> buffers is
>> > currently set to 32768. You can increase this number by setting the
>> > configuration key 'taskmanager.network.numberOfBuffers'.
>> > at
>> >
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>> > at
>> >
>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327)
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:469)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > So I tried different solutions, both with increasing numberOfBuffers
>> (Max
>> > value tried 98304) or decreasing the degreeOfParallelism (Min value
>> tried
>> > 300) and testing those configs with a dummy dataset seems to solve the
>> > number of buffers issue.
>> > But In each case with the 80GiB dataset now I struggle with a new
>> exception;
>> > the following with a degree of parallelism = 300 and numberOfBuffers =
>> 32768
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The program
>> > execution failed: Job execution failed.
>> > at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> > at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> > at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> > at
>> >
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> > at
>> >
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
>> > at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon$.main(FlinkSLRTrainCommon.scala:110)
>> > at
>> >
>> dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon.main(FlinkSLRTrainCommon.scala)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

Re: Flink java.io.FileNotFoundException Exception with Peel Framework

2016-06-28 Thread ANDREA SPINA
Hi Max,
thank you for the fast reply and sorry: I use flink-1.0.3.
Yes I tested on dummy dataset with numOfBuffers = 16384 and decreasing the
parallelism degree and this solution solved the first exception. Anyway on
the 80GiB dataset I struggle with the second exception.

Regards,
Andrea

2016-06-28 12:08 GMT+02:00 Maximilian Michels <m...@apache.org>:

> Hi Andrea,
>
> The number of network buffers should be sufficient. Actually, assuming
> you have 16 task slots on each of the 25 nodes, it should be enough to
> have 16^2 * 25 * 4 = 14400 network buffers.
>
> See
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#background
>
> So we have to investigate a little more. Which version of Flink are you
> using?
>
> Cheers,
> Max
>
> On Tue, Jun 28, 2016 at 11:57 AM, ANDREA SPINA
> <74...@studenti.unimore.it> wrote:
> > Hi everyone,
> >
> > I am running some Flink experiments with Peel benchmark
> > http://peel-framework.org/ and I am struggling with exceptions: the
> > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is
> ~80GiB
> > and is located on Hdfs 2.7.1.
> >
> > At the beginning I tried with 400 as degree of parallelism and with the
> > following configuration:
> >
> > jobmanager.rpc.address = ${runtime.hostname}
> > akka.log.lifecycle.events = ON
> > akka.ask.timeout = 300s
> > jobmanager.rpc.port = 6002
> >
> > jobmanager.heap.mb = 1024
> > jobmanager.web.port = 6004
> >
> > taskmanager.heap.mb = 28672
> > taskmanager.memory.fraction = 0.7
> > taskmanager.network.numberOfBuffers = 32768
> > taskmanager.network.bufferSizeInBytes = 16384
> > taskmanager.tmp.dirs =
> >
> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
> > taskmanager.debug.memory.startLogThread = true
> >
> > the following exception will raise
> >
> > Caused by: java.io.IOException: Insufficient number of network buffers:
> > required 350, but only 317 available. The total number of network
> buffers is
> > currently set to 32768. You can increase this number by setting the
> > configuration key 'taskmanager.network.numberOfBuffers'.
> > at
> >
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
> > at
> >
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:469)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > So I tried different solutions, both with increasing numberOfBuffers (Max
> > value tried 98304) or decreasing the degreeOfParallelism (Min value tried
> > 300) and testing those configs with a dummy dataset seems to solve the
> > number of buffers issue.
> > But In each case with the 80GiB dataset now I struggle with a new
> exception;
> > the following with a degree of parallelism = 300 and numberOfBuffers =
> 32768
> >
> > org.apache.flink.client.program.ProgramInvocationException: The program
> > execution failed: Job execution failed.
> > at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> > at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> > at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> > at
> >
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> > at
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
> > at
> >
> dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon$.main(FlinkSLRTrainCommon.scala:110)
> > at
> >
> dima.tu.berlin.benchmark.flink.mlr.FlinkSLRTrainCommon.main(FlinkSLRTrainCommon.scala)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> > at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> > at
> >
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> > at org.apache.flink.client.CliFrontend

Flink java.io.FileNotFoundException Exception with Peel Framework

2016-06-28 Thread ANDREA SPINA
or.java:69)
at
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Channel to path
'/data/3/peel/flink/tmp/flink-io-3a97d62c-ada4-44f1-ab72-dd018386f9aa/79305169d69f7e8b361a175af87353fa.channel'
could not be opened.
at
org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:61)
at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.(AsynchronousFileIOChannel.java:86)
at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.(AsynchronousBufferFileWriter.java:31)
at
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBufferFileWriter(IOManagerAsync.java:257)
at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:151)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:366)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:159)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 6 more
Caused by: java.io.FileNotFoundException:
/data/3/peel/flink/tmp/flink-io-3a97d62c-ada4-44f1-ab72-dd018386f9aa/79305169d69f7e8b361a175af87353fa.channel
(No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.(RandomAccessFile.java:243)
at java.io.RandomAccessFile.(RandomAccessFile.java:124)
at
org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:57)
... 15 more

here
<https://dl.dropboxusercontent.com/u/78598929/flink-hadoop-jobmanager-0-cloud-11.log>
the
related jobmanager full log. I can't figure out a solution.

Thank you and have a nice day.

-- 
*Andrea Spina*
Guest student at DIMA, TU Berlin
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)