Re: when use broadcast variable and run on bigdata display this error please help

2015-08-21 Thread hagersaleh
Note: As the content of broadcast variables is kept in-memory on each node,
it should not become too large. For simpler things like scalar values you
can simply make parameters part of the closure of a function, or use the
withParameters(...) method to pass in a configuration.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2489.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: when use broadcast variable and run on bigdata display this error please help

2015-08-21 Thread hagersaleh
When to use broadcast variable?

Distribute data with a broadcast variable when

The data is large
The data has been produced by some form of computation and is already a
DataSet (distributed result)
Typical use case: Redistribute intermediate results, such as trained
models


from link
https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2488.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Gelly ran out of memory

2015-08-21 Thread Stephan Ewen
The operator needs to implement spilling, which the SolutionSet currently
doesn't (as the only operator).

On Fri, Aug 21, 2015 at 5:15 PM, Henry Saputra 
wrote:

> Ah yes, I agree about the purpose of memory management, what I was
> wondering is that could an operator do something like explicitly spill to
> disk when it fails to get required memory from memory segment?
>
>
> On Friday, August 21, 2015, Stephan Ewen  wrote:
>
>> One of the ideas behind memory management is to recognize when the memory
>> runs out. Simply using regular Java heap means that an operator would
>> consume more than it should, and eventually cause an OOM error again.
>>
>> The only way of reacting to the fact that the memory ran out is to reduce
>> memory usage, for example by either spilling or compacting. The solution
>> set cannot spill at this point, so if compaction does not help, it gives up.
>>
>> On Thu, Aug 20, 2015 at 8:50 PM, Henry Saputra 
>> wrote:
>>
>>> Hi Stephan, this looks like a bug to me. Shouldn't the memory manager
>>> switch to out of managed area if it is out of memory space?
>>>
>>> - Henry
>>>
>>> On Thu, Aug 20, 2015 at 3:09 AM, Stephan Ewen  wrote:
>>> > Actually, you ran out of "Flink Managed Memory", not user memory. User
>>> > memory shortage manifests itself as Java OutofMemoryError.
>>> >
>>> > At this point, the Delta iterations cannot spill. They additionally
>>> suffer a
>>> > bit from memory fragmentation.
>>> > A possible workaround is to use the option
>>> "setSolutionSetUnmanaged(true)"
>>> > on the iteration. That will eliminate the fragmentation issue, at
>>> least.
>>> >
>>> > Stephan
>>> >
>>> >
>>> > On Thu, Aug 20, 2015 at 12:06 PM, Andra Lungu 
>>> wrote:
>>> >>
>>> >> Hi Flavio,
>>> >>
>>> >> These kinds of exceptions generally arise from the fact that you ran
>>> out
>>> >> of `user` memory. You can try to increase that a bit.
>>> >> In your flink-conf.yaml try adding
>>> >> # The memory fraction allocated system -user
>>> >> taskmanager.memory.fraction: 0.4
>>> >>
>>> >> This will give 0.6 of the unit of memory to the user and 0.4 to the
>>> >> system.
>>> >>
>>> >> Tell me if that helped.
>>> >> Andra
>>> >>
>>> >> On Thu, Aug 20, 2015 at 12:02 PM, Flavio Pompermaier
>>> >>  wrote:
>>> >>>
>>> >>> Hi to all,
>>> >>>
>>> >>> I tried to run my gelly job on Flink 0.9-SNAPSHOT and I was having an
>>> >>> EOFException, so I tried on 0.10-SNAPSHOT and now I have the
>>> following
>>> >>> error:
>>> >>>
>>> >>> Caused by: java.lang.RuntimeException: Memory ran out. Compaction
>>> failed.
>>> >>> numPartitions: 32 minPartition: 73 maxPartition: 80 number of
>>> overflow
>>> >>> segments: 0 bucketSize: 570 Overall memory: 102367232 Partition
>>> memory:
>>> >>> 81100800 Message: null
>>> >>> at
>>> >>>
>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:465)
>>> >>> at
>>> >>>
>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
>>> >>> at
>>> >>>
>>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>>> >>> at
>>> >>>
>>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:211)
>>> >>> at
>>> >>>
>>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:272)
>>> >>> at
>>> >>>
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>>> >>> at java.lang.Thread.run(Thread.java:745)
>>> >>>
>>> >>> Probably I'm doing something wrong but I can't understand how to
>>> estimate
>>> >>> the required memory for my Gelly job..
>>> >>>
>>> >>> Best,
>>> >>> Flavio
>>> >>
>>> >>
>>> >
>>>
>>
>>


Re: Gelly ran out of memory

2015-08-21 Thread Henry Saputra
Ah yes, I agree about the purpose of memory management, what I was
wondering is that could an operator do something like explicitly spill to
disk when it fails to get required memory from memory segment?


On Friday, August 21, 2015, Stephan Ewen  wrote:

> One of the ideas behind memory management is to recognize when the memory
> runs out. Simply using regular Java heap means that an operator would
> consume more than it should, and eventually cause an OOM error again.
>
> The only way of reacting to the fact that the memory ran out is to reduce
> memory usage, for example by either spilling or compacting. The solution
> set cannot spill at this point, so if compaction does not help, it gives up.
>
> On Thu, Aug 20, 2015 at 8:50 PM, Henry Saputra  > wrote:
>
>> Hi Stephan, this looks like a bug to me. Shouldn't the memory manager
>> switch to out of managed area if it is out of memory space?
>>
>> - Henry
>>
>> On Thu, Aug 20, 2015 at 3:09 AM, Stephan Ewen > > wrote:
>> > Actually, you ran out of "Flink Managed Memory", not user memory. User
>> > memory shortage manifests itself as Java OutofMemoryError.
>> >
>> > At this point, the Delta iterations cannot spill. They additionally
>> suffer a
>> > bit from memory fragmentation.
>> > A possible workaround is to use the option
>> "setSolutionSetUnmanaged(true)"
>> > on the iteration. That will eliminate the fragmentation issue, at least.
>> >
>> > Stephan
>> >
>> >
>> > On Thu, Aug 20, 2015 at 12:06 PM, Andra Lungu > > wrote:
>> >>
>> >> Hi Flavio,
>> >>
>> >> These kinds of exceptions generally arise from the fact that you ran
>> out
>> >> of `user` memory. You can try to increase that a bit.
>> >> In your flink-conf.yaml try adding
>> >> # The memory fraction allocated system -user
>> >> taskmanager.memory.fraction: 0.4
>> >>
>> >> This will give 0.6 of the unit of memory to the user and 0.4 to the
>> >> system.
>> >>
>> >> Tell me if that helped.
>> >> Andra
>> >>
>> >> On Thu, Aug 20, 2015 at 12:02 PM, Flavio Pompermaier
>> >> > > wrote:
>> >>>
>> >>> Hi to all,
>> >>>
>> >>> I tried to run my gelly job on Flink 0.9-SNAPSHOT and I was having an
>> >>> EOFException, so I tried on 0.10-SNAPSHOT and now I have the following
>> >>> error:
>> >>>
>> >>> Caused by: java.lang.RuntimeException: Memory ran out. Compaction
>> failed.
>> >>> numPartitions: 32 minPartition: 73 maxPartition: 80 number of overflow
>> >>> segments: 0 bucketSize: 570 Overall memory: 102367232 Partition
>> memory:
>> >>> 81100800 Message: null
>> >>> at
>> >>>
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:465)
>> >>> at
>> >>>
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
>> >>> at
>> >>>
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>> >>> at
>> >>>
>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:211)
>> >>> at
>> >>>
>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:272)
>> >>> at
>> >>>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>> >>> at java.lang.Thread.run(Thread.java:745)
>> >>>
>> >>> Probably I'm doing something wrong but I can't understand how to
>> estimate
>> >>> the required memory for my Gelly job..
>> >>>
>> >>> Best,
>> >>> Flavio
>> >>
>> >>
>> >
>>
>
>


Re: Custom Aggregate - Example

2015-08-21 Thread Philipp Goetze

Thank you Aljoscha,

I guessed that I should use the reduce method. However, I do not look 
for window aggregations. I want to do this on a grouped stream.


The problem is we work with Lists instead of tuples and thus we can not 
use the pre-implemented aggregates.


So the idea is to call it like that:

   val aggr = source.groupBy(_(0)).reduce(new customReducer(1))

And this is the signature of the class:

   class customReducer(field: Int) extends RichReduceFunction[List[Any]]


How do I have to implement this class now, so that it is working 
correctly even with parallelism > 1?


I hope you understand what I try to do. =)

Kind Regards,
Philipp


On 21.08.2015 15:28, Aljoscha Krettek wrote:

Hi,
with the current API this should do what you are after:

val input = ...
val result = input
  .window(...)
  .groupBy(...)
  .reduceWindow( /* your reduce function */ )

With the reduce function you should be able to implement any custom 
aggregations. You can also use foldWindow() if you want to do a 
functional fold over the window.


I hope this helps.

Cheers,
Aljoscha

On Fri, 21 Aug 2015 at 14:51 Philipp Goetze 
mailto:philipp.goe...@tu-ilmenau.de>> 
wrote:


Hello community,

how do I define a custom aggregate function in Flink Streaming
(Scala)?
Could you please provide an example on how to do that?

Thank you and best regards,
Philipp





Re: Custom Aggregate - Example

2015-08-21 Thread Gyula Fóra
Hi,

Alternatively if you would like to create continuous aggregates per key you
can use ds.groupBy().reduce(..), or use one of the stateful functions in
the scala api such as mapWithState.

For a rolling average per key you can check this exmple:
https://github.com/gyfora/summer-school/blob/master/flink/src/main/scala/summerschool/FlinkKafkaExample.scala

Cheers,
Gyula

On Fri, Aug 21, 2015 at 3:28 PM Aljoscha Krettek 
wrote:

> Hi,
> with the current API this should do what you are after:
>
> val input = ...
>
> val result = input
>   .window(...)
>   .groupBy(...)
>   .reduceWindow( /* your reduce function */ )
>
> With the reduce function you should be able to implement any custom
> aggregations. You can also use foldWindow() if you want to do a functional
> fold over the window.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 21 Aug 2015 at 14:51 Philipp Goetze 
> wrote:
>
>> Hello community,
>>
>> how do I define a custom aggregate function in Flink Streaming (Scala)?
>> Could you please provide an example on how to do that?
>>
>> Thank you and best regards,
>> Philipp
>>
>


Re: Custom Aggregate - Example

2015-08-21 Thread Aljoscha Krettek
Hi,
with the current API this should do what you are after:

val input = ...

val result = input
  .window(...)
  .groupBy(...)
  .reduceWindow( /* your reduce function */ )

With the reduce function you should be able to implement any custom
aggregations. You can also use foldWindow() if you want to do a functional
fold over the window.

I hope this helps.

Cheers,
Aljoscha

On Fri, 21 Aug 2015 at 14:51 Philipp Goetze 
wrote:

> Hello community,
>
> how do I define a custom aggregate function in Flink Streaming (Scala)?
> Could you please provide an example on how to do that?
>
> Thank you and best regards,
> Philipp
>


Re: Keep Model in Operator instance up to date

2015-08-21 Thread Welly Tambunan
Hi Gyula,

Thanks a lot. That's really help a lot !

Have a great vacation

Cheers

On Fri, Aug 21, 2015 at 7:47 PM, Gyula Fóra  wrote:

> Hi
>
> You are right, if all operators need continuous updates than the most
> straightforward way is to push all the updates to the operators like you
> described.
>
> If the cached data is the same for all operators and is small enough you
> can centralize the updates in a dedicated operator and push the updated
> data to the operators every once in a while.
>
> Cheers
> Gyula
>
>
>
> On Thu, Aug 20, 2015 at 4:31 PM Welly Tambunan  wrote:
>
>> Hi Gyula,
>>
>> I have a couple of operator on the pipeline. Filter, mapper, flatMap, and
>> each of these operator contains some cache data.
>>
>> So i think that means for every other operator on the pipeline, i will
>> need to add a new stream to update each cache data.
>>
>>
>> Cheers
>>
>> On Thu, Aug 20, 2015 at 5:33 PM, Gyula Fóra  wrote:
>>
>>> Hi,
>>>
>>> I don't think I fully understand your question, could you please try to
>>> be a little more specific?
>>>
>>> I assume by caching you mean that you keep the current model as an
>>> operator state. Why would you need to add new streams in this case?
>>>
>>> I might be slow to answer as I am currently on vacation without stable
>>> internet connection.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan 
>>> wrote:
>>>
 Hi Gyula,

 I have another question. So if i cache something on the operator, to
 keep it up to date,  i will always need to add and connect another stream
 of changes to the operator ?

 Is this right for every case ?

 Cheers

 On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan 
 wrote:

> Hi Gyula,
>
> That's really helpful. The docs is improving so much since the last
> time (0.9).
>
> Thanks a lot !
>
> Cheers
>
> On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra 
> wrote:
>
>> Hey,
>>
>> If it is always better to check the events against a more up-to-date
>> model (even if the events we are checking arrived before the update) then
>> it is fine to keep the model outside of the system.
>>
>> In this case we need to make sure that we can push the updates to the
>> external system consistently. If you are using the PersistentKafkaSource
>> for instance it can happen that some messages are replayed in case of
>> failure. In this case you need to make sure that you remove duplicate
>> updates or have idempotent updates.
>>
>> You can read about the checkpoint mechanism in the Flink website:
>> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
>>
>> Cheers,
>> Gyula
>>
>> On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan 
>> wrote:
>>
>>> Thanks Gyula,
>>>
>>> Another question i have..
>>>
>>> > ... while external model updates would be *tricky *to keep
>>> consistent.
>>> Is that still the case if the Operator treat the external model as
>>> read-only ? We create another stream that will update the external model
>>> separately.
>>>
>>> Could you please elaborate more about this one ?
>>>
>>> Cheers
>>>
>>> On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra 
>>> wrote:
>>>
 In that case I would apply a map to wrap in some common type, like
 a n Either before the union.

 And then in the coflatmap you can unwrap it.
 On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan 
 wrote:

> Hi Gyula,
>
> Thanks.
>
> However update1 and update2 have a different type. Based on my
> understanding, i don't think we can use union. How can we handle this 
> one ?
>
> We like to create our event strongly type to get the domain
> language captured.
>
>
> Cheers
>
> On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra 
> wrote:
>
>> Hey,
>>
>> One input of your co-flatmap would be model updates and the other
>> input would be events to check against the model if I understand 
>> correctly.
>>
>> This means that if your model updates come from more than one
>> stream you need to union them into a single stream before connecting 
>> them
>> with the event stream and applying the coatmap.
>>
>> DataStream updates1 = 
>> DataStream updates2 = 
>> DataStream events = 
>>
>> events.connect(updates1.union(updates2).broadcast()).flatMap(...)
>>
>> Does this answer your question?
>>
>> Gyula
>>
>>
>> On Wednesday, August 19, 2015, Welly Tambunan 
>> wrote:
>>
>>> Hi Gyula,
>>>
>>

Custom Aggregate - Example

2015-08-21 Thread Philipp Goetze

Hello community,

how do I define a custom aggregate function in Flink Streaming (Scala)?
Could you please provide an example on how to do that?

Thank you and best regards,
Philipp


Re: Keep Model in Operator instance up to date

2015-08-21 Thread Gyula Fóra
Hi

You are right, if all operators need continuous updates than the most
straightforward way is to push all the updates to the operators like you
described.

If the cached data is the same for all operators and is small enough you
can centralize the updates in a dedicated operator and push the updated
data to the operators every once in a while.

Cheers
Gyula


On Thu, Aug 20, 2015 at 4:31 PM Welly Tambunan  wrote:

> Hi Gyula,
>
> I have a couple of operator on the pipeline. Filter, mapper, flatMap, and
> each of these operator contains some cache data.
>
> So i think that means for every other operator on the pipeline, i will
> need to add a new stream to update each cache data.
>
>
> Cheers
>
> On Thu, Aug 20, 2015 at 5:33 PM, Gyula Fóra  wrote:
>
>> Hi,
>>
>> I don't think I fully understand your question, could you please try to
>> be a little more specific?
>>
>> I assume by caching you mean that you keep the current model as an
>> operator state. Why would you need to add new streams in this case?
>>
>> I might be slow to answer as I am currently on vacation without stable
>> internet connection.
>>
>> Cheers,
>> Gyula
>>
>> On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan  wrote:
>>
>>> Hi Gyula,
>>>
>>> I have another question. So if i cache something on the operator, to
>>> keep it up to date,  i will always need to add and connect another stream
>>> of changes to the operator ?
>>>
>>> Is this right for every case ?
>>>
>>> Cheers
>>>
>>> On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan 
>>> wrote:
>>>
 Hi Gyula,

 That's really helpful. The docs is improving so much since the last
 time (0.9).

 Thanks a lot !

 Cheers

 On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra 
 wrote:

> Hey,
>
> If it is always better to check the events against a more up-to-date
> model (even if the events we are checking arrived before the update) then
> it is fine to keep the model outside of the system.
>
> In this case we need to make sure that we can push the updates to the
> external system consistently. If you are using the PersistentKafkaSource
> for instance it can happen that some messages are replayed in case of
> failure. In this case you need to make sure that you remove duplicate
> updates or have idempotent updates.
>
> You can read about the checkpoint mechanism in the Flink website:
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
>
> Cheers,
> Gyula
>
> On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan 
> wrote:
>
>> Thanks Gyula,
>>
>> Another question i have..
>>
>> > ... while external model updates would be *tricky *to keep
>> consistent.
>> Is that still the case if the Operator treat the external model as
>> read-only ? We create another stream that will update the external model
>> separately.
>>
>> Could you please elaborate more about this one ?
>>
>> Cheers
>>
>> On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra 
>> wrote:
>>
>>> In that case I would apply a map to wrap in some common type, like a
>>> n Either before the union.
>>>
>>> And then in the coflatmap you can unwrap it.
>>> On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan 
>>> wrote:
>>>
 Hi Gyula,

 Thanks.

 However update1 and update2 have a different type. Based on my
 understanding, i don't think we can use union. How can we handle this 
 one ?

 We like to create our event strongly type to get the domain
 language captured.


 Cheers

 On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra 
 wrote:

> Hey,
>
> One input of your co-flatmap would be model updates and the other
> input would be events to check against the model if I understand 
> correctly.
>
> This means that if your model updates come from more than one
> stream you need to union them into a single stream before connecting 
> them
> with the event stream and applying the coatmap.
>
> DataStream updates1 = 
> DataStream updates2 = 
> DataStream events = 
>
> events.connect(updates1.union(updates2).broadcast()).flatMap(...)
>
> Does this answer your question?
>
> Gyula
>
>
> On Wednesday, August 19, 2015, Welly Tambunan 
> wrote:
>
>> Hi Gyula,
>>
>> Thanks for your response.
>>
>> However the model can received multiple event for update. How can
>> we do that with co-flatmap as i can see the connect API only received
>> single datastream ?
>>
>>
>> > ... while external model updates would be tricky t

Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

2015-08-21 Thread Stephan Ewen
I think we need to extend our own FileInputFormats as well to pass the
credentials...

On Fri, Aug 21, 2015 at 12:44 PM, Robert Metzger 
wrote:

> I was able to reproduce the issue. This is the JIRA:
> https://issues.apache.org/jira/browse/FLINK-2555
> I've already opened a pull request with the fix.
>
> The problem was that our HadoopInputFormat wrapper was not correctly
> passing the security credentials from the Job object to the cluster.
>
> Consider this code posted by Arnaud in the initial message:
>
> *final* Job job = Job.*getInstance*();
>
> job.setJobName("Flink source for Hive Table " + dbName + "." +
> tableName);
>
>
>
> // Crée la source
>
> @SuppressWarnings({ "unchecked", "rawtypes" })
>
> *final* HadoopInputFormat
> inputFormat = *new* HadoopInputFormat
> DefaultHCatRecord>(// CHECKSTYLE:ON
>
> (InputFormat) HCatInputFormat.*setInput*(job, dbName,
> tableName, filter), //
>
> NullWritable.*class*, //
>
> DefaultHCatRecord.*class*, //
>
> job);
>
>
> in the "Job.getInstance()" call, the current authentication credentials of
> the user are stored.
>
> They are later passed to the HadoopInputFormat class (last line), but
> Flink was not properly making the Credentials available again on the
> cluster.
>
>
> The pull request should resolve the issue (I've verified it on a secured
> CDH 5.3 setup)
>
>
> Thank you for reporting the bug!
>
>
>
> On Thu, Aug 20, 2015 at 5:21 PM, LINZ, Arnaud 
> wrote:
>
>> Hi Robert,
>>
>>
>>
>> Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do
>> some Kerberos-related operations, needed because I manipulate some HDFS
>> files before executing the application.
>>
>> The local cluster mode is working fine with the same code, and it does
>> some HCat reading / HDFS writing.
>>
>>
>>
>> What HdfsTools does, in a nutshell :
>>
>>   *final* Configuration cfg = *new* Configuration();
>>
>> cfg.addResource(*new* Path("/home/hadoop/conf/core-site.xml"));
>>
>> cfg.addResource(*new* Path("/home/hadoop/conf/hdfs-site.xml"));
>>
>> cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*(
>> "HADOOP_CONF_DIR") + "/core-site.xml"));
>>
>> cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*(
>> "HADOOP_CONF_DIR") + "/hdfs-site.xml"));
>>
>> // Kerberos handling
>>
>> *if* (*isKerberosActive*()) {
>>
>> *loginKerberos*(cfg);
>>
>> }
>>
>> filesys = FileSystem.*get*(cfg);
>>
>>
>>
>> And the straightforward kerberos stuff:
>>
>> *public* *static* *synchronized* *void* loginKerberos(Configuration cfg)
>> {
>>
>> UserGroupInformation.*setConfiguration*(cfg);
>>
>> *if* (!*loggedIn*) {
>>
>> *try* {
>>
>> UserGroupInformation.*loginUserFromKeytab*(
>> *getKerberosPrincipal*(), *getKerberosKeytab*());
>>
>> *loggedIn* = *true*;
>>
>> JournalUDF.*logLocalFS*("User " + UserGroupInformation.
>> *getLoginUser*() + " : Kerberos login succeeded ");
>>
>> }
>>
>> *catch* (IOException excep) {
>>
>> *throw* *new* GaneshRuntimeException("Unable to log
>> (kerberos) : " + excep.toString(), excep);
>>
>> }
>>
>> }
>>
>> }
>>
>> *loggedIn *being static to the class, and *alinz* having all the proper
>> rights.
>>
>>
>>
>> From what I’ve seen on google, spark and hive/oozie ran into the same
>> error and somewhat corrected that, but I don’t know if it will help to see
>> if it’s really the same pb.
>>
>> I’m sending you the full trace on a private mail.
>>
>>
>>
>> Arnaud
>>
>>
>>
>> *De :* Robert Metzger [mailto:rmetz...@apache.org]
>> *Envoyé :* jeudi 20 août 2015 16:42
>> *À :* user@flink.apache.org
>> *Objet :* Re: Using HadoopInputFormat files from Flink/Yarn in a secure
>> cluster gives an error
>>
>>
>>
>> Hi Arnaud,
>>
>>
>>
>> I suspect the "HdfsTools" are something internal from your company?
>>
>> Are they doing any kerberos-related operations?
>>
>>
>>
>> Is the local cluster mode also reading files from the secured HDFS
>> cluster?
>>
>>
>>
>> Flink is taking care of sending the authentication tokens from the client
>> to the jobManager and to the TaskManagers.
>>
>> For HDFS Flink should also use these user settings.
>>
>> I don't know whether the HCatalog code / Hadoop compatbililty code is
>> also doing some kerberos operations which are interfering with our efforts.
>>
>>
>>
>> From the logs, you can see:
>>
>> Secure Hadoop environment setup detected. Running in secure context.
>> 15:04:18,005 INFO
>> org.apache.hadoop.security.UserGroupInformation   - Login
>> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab
>>
>>
>>
>> Is the user "alinz" authorized to access the files in HDFS?
>>
>>
>>
>> I have to admit that I didn't see this issue before.
>>
>> If possible, can you privately send the the full log of the ap

Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

2015-08-21 Thread Robert Metzger
I was able to reproduce the issue. This is the JIRA:
https://issues.apache.org/jira/browse/FLINK-2555
I've already opened a pull request with the fix.

The problem was that our HadoopInputFormat wrapper was not correctly
passing the security credentials from the Job object to the cluster.

Consider this code posted by Arnaud in the initial message:

*final* Job job = Job.*getInstance*();

job.setJobName("Flink source for Hive Table " + dbName + "." +
tableName);



// Crée la source

@SuppressWarnings({ "unchecked", "rawtypes" })

*final* HadoopInputFormat
inputFormat = *new* HadoopInputFormat(// CHECKSTYLE:ON

(InputFormat) HCatInputFormat.*setInput*(job, dbName, tableName
, filter), //

NullWritable.*class*, //

DefaultHCatRecord.*class*, //

job);


in the "Job.getInstance()" call, the current authentication credentials of
the user are stored.

They are later passed to the HadoopInputFormat class (last line), but Flink
was not properly making the Credentials available again on the cluster.


The pull request should resolve the issue (I've verified it on a secured
CDH 5.3 setup)


Thank you for reporting the bug!



On Thu, Aug 20, 2015 at 5:21 PM, LINZ, Arnaud 
wrote:

> Hi Robert,
>
>
>
> Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do
> some Kerberos-related operations, needed because I manipulate some HDFS
> files before executing the application.
>
> The local cluster mode is working fine with the same code, and it does
> some HCat reading / HDFS writing.
>
>
>
> What HdfsTools does, in a nutshell :
>
>   *final* Configuration cfg = *new* Configuration();
>
> cfg.addResource(*new* Path("/home/hadoop/conf/core-site.xml"));
>
> cfg.addResource(*new* Path("/home/hadoop/conf/hdfs-site.xml"));
>
> cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*(
> "HADOOP_CONF_DIR") + "/core-site.xml"));
>
> cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*(
> "HADOOP_CONF_DIR") + "/hdfs-site.xml"));
>
> // Kerberos handling
>
> *if* (*isKerberosActive*()) {
>
> *loginKerberos*(cfg);
>
> }
>
> filesys = FileSystem.*get*(cfg);
>
>
>
> And the straightforward kerberos stuff:
>
> *public* *static* *synchronized* *void* loginKerberos(Configuration cfg) {
>
> UserGroupInformation.*setConfiguration*(cfg);
>
> *if* (!*loggedIn*) {
>
> *try* {
>
> UserGroupInformation.*loginUserFromKeytab*(
> *getKerberosPrincipal*(), *getKerberosKeytab*());
>
> *loggedIn* = *true*;
>
> JournalUDF.*logLocalFS*("User " + UserGroupInformation.
> *getLoginUser*() + " : Kerberos login succeeded ");
>
> }
>
> *catch* (IOException excep) {
>
> *throw* *new* GaneshRuntimeException("Unable to log
> (kerberos) : " + excep.toString(), excep);
>
> }
>
> }
>
> }
>
> *loggedIn *being static to the class, and *alinz* having all the proper
> rights.
>
>
>
> From what I’ve seen on google, spark and hive/oozie ran into the same
> error and somewhat corrected that, but I don’t know if it will help to see
> if it’s really the same pb.
>
> I’m sending you the full trace on a private mail.
>
>
>
> Arnaud
>
>
>
> *De :* Robert Metzger [mailto:rmetz...@apache.org]
> *Envoyé :* jeudi 20 août 2015 16:42
> *À :* user@flink.apache.org
> *Objet :* Re: Using HadoopInputFormat files from Flink/Yarn in a secure
> cluster gives an error
>
>
>
> Hi Arnaud,
>
>
>
> I suspect the "HdfsTools" are something internal from your company?
>
> Are they doing any kerberos-related operations?
>
>
>
> Is the local cluster mode also reading files from the secured HDFS cluster?
>
>
>
> Flink is taking care of sending the authentication tokens from the client
> to the jobManager and to the TaskManagers.
>
> For HDFS Flink should also use these user settings.
>
> I don't know whether the HCatalog code / Hadoop compatbililty code is also
> doing some kerberos operations which are interfering with our efforts.
>
>
>
> From the logs, you can see:
>
> Secure Hadoop environment setup detected. Running in secure context.
> 15:04:18,005 INFO
> org.apache.hadoop.security.UserGroupInformation   - Login
> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab
>
>
>
> Is the user "alinz" authorized to access the files in HDFS?
>
>
>
> I have to admit that I didn't see this issue before.
>
> If possible, can you privately send the the full log of the application,
> using "yarn logs -applicationId " ?
>
>
>
>
>
> On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud 
> wrote:
>
> Hello,
>
>
>
> My application handles as input and output some HDFS files in the jobs and
> in the driver application.
>
> It works in local cluster mode, but when I’m trying to submit it to a yarn
> client, when I try to use a HadoopInputFormat (that comes from a HCatalog

Re: Gelly ran out of memory

2015-08-21 Thread Stephan Ewen
One of the ideas behind memory management is to recognize when the memory
runs out. Simply using regular Java heap means that an operator would
consume more than it should, and eventually cause an OOM error again.

The only way of reacting to the fact that the memory ran out is to reduce
memory usage, for example by either spilling or compacting. The solution
set cannot spill at this point, so if compaction does not help, it gives up.

On Thu, Aug 20, 2015 at 8:50 PM, Henry Saputra 
wrote:

> Hi Stephan, this looks like a bug to me. Shouldn't the memory manager
> switch to out of managed area if it is out of memory space?
>
> - Henry
>
> On Thu, Aug 20, 2015 at 3:09 AM, Stephan Ewen  wrote:
> > Actually, you ran out of "Flink Managed Memory", not user memory. User
> > memory shortage manifests itself as Java OutofMemoryError.
> >
> > At this point, the Delta iterations cannot spill. They additionally
> suffer a
> > bit from memory fragmentation.
> > A possible workaround is to use the option
> "setSolutionSetUnmanaged(true)"
> > on the iteration. That will eliminate the fragmentation issue, at least.
> >
> > Stephan
> >
> >
> > On Thu, Aug 20, 2015 at 12:06 PM, Andra Lungu 
> wrote:
> >>
> >> Hi Flavio,
> >>
> >> These kinds of exceptions generally arise from the fact that you ran out
> >> of `user` memory. You can try to increase that a bit.
> >> In your flink-conf.yaml try adding
> >> # The memory fraction allocated system -user
> >> taskmanager.memory.fraction: 0.4
> >>
> >> This will give 0.6 of the unit of memory to the user and 0.4 to the
> >> system.
> >>
> >> Tell me if that helped.
> >> Andra
> >>
> >> On Thu, Aug 20, 2015 at 12:02 PM, Flavio Pompermaier
> >>  wrote:
> >>>
> >>> Hi to all,
> >>>
> >>> I tried to run my gelly job on Flink 0.9-SNAPSHOT and I was having an
> >>> EOFException, so I tried on 0.10-SNAPSHOT and now I have the following
> >>> error:
> >>>
> >>> Caused by: java.lang.RuntimeException: Memory ran out. Compaction
> failed.
> >>> numPartitions: 32 minPartition: 73 maxPartition: 80 number of overflow
> >>> segments: 0 bucketSize: 570 Overall memory: 102367232 Partition memory:
> >>> 81100800 Message: null
> >>> at
> >>>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:465)
> >>> at
> >>>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
> >>> at
> >>>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
> >>> at
> >>>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:211)
> >>> at
> >>>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:272)
> >>> at
> >>>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
> >>> at java.lang.Thread.run(Thread.java:745)
> >>>
> >>> Probably I'm doing something wrong but I can't understand how to
> estimate
> >>> the required memory for my Gelly job..
> >>>
> >>> Best,
> >>> Flavio
> >>
> >>
> >
>