Re: when use broadcast variable and run on bigdata display this error please help
Note: As the content of broadcast variables is kept in-memory on each node, it should not become too large. For simpler things like scalar values you can simply make parameters part of the closure of a function, or use the withParameters(...) method to pass in a configuration. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2489.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: when use broadcast variable and run on bigdata display this error please help
When to use broadcast variable? Distribute data with a broadcast variable when The data is large The data has been produced by some form of computation and is already a DataSet (distributed result) Typical use case: Redistribute intermediate results, such as trained models from link https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2488.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Gelly ran out of memory
The operator needs to implement spilling, which the SolutionSet currently doesn't (as the only operator). On Fri, Aug 21, 2015 at 5:15 PM, Henry Saputra wrote: > Ah yes, I agree about the purpose of memory management, what I was > wondering is that could an operator do something like explicitly spill to > disk when it fails to get required memory from memory segment? > > > On Friday, August 21, 2015, Stephan Ewen wrote: > >> One of the ideas behind memory management is to recognize when the memory >> runs out. Simply using regular Java heap means that an operator would >> consume more than it should, and eventually cause an OOM error again. >> >> The only way of reacting to the fact that the memory ran out is to reduce >> memory usage, for example by either spilling or compacting. The solution >> set cannot spill at this point, so if compaction does not help, it gives up. >> >> On Thu, Aug 20, 2015 at 8:50 PM, Henry Saputra >> wrote: >> >>> Hi Stephan, this looks like a bug to me. Shouldn't the memory manager >>> switch to out of managed area if it is out of memory space? >>> >>> - Henry >>> >>> On Thu, Aug 20, 2015 at 3:09 AM, Stephan Ewen wrote: >>> > Actually, you ran out of "Flink Managed Memory", not user memory. User >>> > memory shortage manifests itself as Java OutofMemoryError. >>> > >>> > At this point, the Delta iterations cannot spill. They additionally >>> suffer a >>> > bit from memory fragmentation. >>> > A possible workaround is to use the option >>> "setSolutionSetUnmanaged(true)" >>> > on the iteration. That will eliminate the fragmentation issue, at >>> least. >>> > >>> > Stephan >>> > >>> > >>> > On Thu, Aug 20, 2015 at 12:06 PM, Andra Lungu >>> wrote: >>> >> >>> >> Hi Flavio, >>> >> >>> >> These kinds of exceptions generally arise from the fact that you ran >>> out >>> >> of `user` memory. You can try to increase that a bit. >>> >> In your flink-conf.yaml try adding >>> >> # The memory fraction allocated system -user >>> >> taskmanager.memory.fraction: 0.4 >>> >> >>> >> This will give 0.6 of the unit of memory to the user and 0.4 to the >>> >> system. >>> >> >>> >> Tell me if that helped. >>> >> Andra >>> >> >>> >> On Thu, Aug 20, 2015 at 12:02 PM, Flavio Pompermaier >>> >> wrote: >>> >>> >>> >>> Hi to all, >>> >>> >>> >>> I tried to run my gelly job on Flink 0.9-SNAPSHOT and I was having an >>> >>> EOFException, so I tried on 0.10-SNAPSHOT and now I have the >>> following >>> >>> error: >>> >>> >>> >>> Caused by: java.lang.RuntimeException: Memory ran out. Compaction >>> failed. >>> >>> numPartitions: 32 minPartition: 73 maxPartition: 80 number of >>> overflow >>> >>> segments: 0 bucketSize: 570 Overall memory: 102367232 Partition >>> memory: >>> >>> 81100800 Message: null >>> >>> at >>> >>> >>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:465) >>> >>> at >>> >>> >>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >>> >>> at >>> >>> >>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >>> >>> at >>> >>> >>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:211) >>> >>> at >>> >>> >>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:272) >>> >>> at >>> >>> >>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) >>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> >>> Probably I'm doing something wrong but I can't understand how to >>> estimate >>> >>> the required memory for my Gelly job.. >>> >>> >>> >>> Best, >>> >>> Flavio >>> >> >>> >> >>> > >>> >> >>
Re: Gelly ran out of memory
Ah yes, I agree about the purpose of memory management, what I was wondering is that could an operator do something like explicitly spill to disk when it fails to get required memory from memory segment? On Friday, August 21, 2015, Stephan Ewen wrote: > One of the ideas behind memory management is to recognize when the memory > runs out. Simply using regular Java heap means that an operator would > consume more than it should, and eventually cause an OOM error again. > > The only way of reacting to the fact that the memory ran out is to reduce > memory usage, for example by either spilling or compacting. The solution > set cannot spill at this point, so if compaction does not help, it gives up. > > On Thu, Aug 20, 2015 at 8:50 PM, Henry Saputra > wrote: > >> Hi Stephan, this looks like a bug to me. Shouldn't the memory manager >> switch to out of managed area if it is out of memory space? >> >> - Henry >> >> On Thu, Aug 20, 2015 at 3:09 AM, Stephan Ewen > > wrote: >> > Actually, you ran out of "Flink Managed Memory", not user memory. User >> > memory shortage manifests itself as Java OutofMemoryError. >> > >> > At this point, the Delta iterations cannot spill. They additionally >> suffer a >> > bit from memory fragmentation. >> > A possible workaround is to use the option >> "setSolutionSetUnmanaged(true)" >> > on the iteration. That will eliminate the fragmentation issue, at least. >> > >> > Stephan >> > >> > >> > On Thu, Aug 20, 2015 at 12:06 PM, Andra Lungu > > wrote: >> >> >> >> Hi Flavio, >> >> >> >> These kinds of exceptions generally arise from the fact that you ran >> out >> >> of `user` memory. You can try to increase that a bit. >> >> In your flink-conf.yaml try adding >> >> # The memory fraction allocated system -user >> >> taskmanager.memory.fraction: 0.4 >> >> >> >> This will give 0.6 of the unit of memory to the user and 0.4 to the >> >> system. >> >> >> >> Tell me if that helped. >> >> Andra >> >> >> >> On Thu, Aug 20, 2015 at 12:02 PM, Flavio Pompermaier >> >> > > wrote: >> >>> >> >>> Hi to all, >> >>> >> >>> I tried to run my gelly job on Flink 0.9-SNAPSHOT and I was having an >> >>> EOFException, so I tried on 0.10-SNAPSHOT and now I have the following >> >>> error: >> >>> >> >>> Caused by: java.lang.RuntimeException: Memory ran out. Compaction >> failed. >> >>> numPartitions: 32 minPartition: 73 maxPartition: 80 number of overflow >> >>> segments: 0 bucketSize: 570 Overall memory: 102367232 Partition >> memory: >> >>> 81100800 Message: null >> >>> at >> >>> >> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:465) >> >>> at >> >>> >> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >> >>> at >> >>> >> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >> >>> at >> >>> >> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:211) >> >>> at >> >>> >> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:272) >> >>> at >> >>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) >> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) >> >>> at java.lang.Thread.run(Thread.java:745) >> >>> >> >>> Probably I'm doing something wrong but I can't understand how to >> estimate >> >>> the required memory for my Gelly job.. >> >>> >> >>> Best, >> >>> Flavio >> >> >> >> >> > >> > >
Re: Custom Aggregate - Example
Thank you Aljoscha, I guessed that I should use the reduce method. However, I do not look for window aggregations. I want to do this on a grouped stream. The problem is we work with Lists instead of tuples and thus we can not use the pre-implemented aggregates. So the idea is to call it like that: val aggr = source.groupBy(_(0)).reduce(new customReducer(1)) And this is the signature of the class: class customReducer(field: Int) extends RichReduceFunction[List[Any]] How do I have to implement this class now, so that it is working correctly even with parallelism > 1? I hope you understand what I try to do. =) Kind Regards, Philipp On 21.08.2015 15:28, Aljoscha Krettek wrote: Hi, with the current API this should do what you are after: val input = ... val result = input .window(...) .groupBy(...) .reduceWindow( /* your reduce function */ ) With the reduce function you should be able to implement any custom aggregations. You can also use foldWindow() if you want to do a functional fold over the window. I hope this helps. Cheers, Aljoscha On Fri, 21 Aug 2015 at 14:51 Philipp Goetze mailto:philipp.goe...@tu-ilmenau.de>> wrote: Hello community, how do I define a custom aggregate function in Flink Streaming (Scala)? Could you please provide an example on how to do that? Thank you and best regards, Philipp
Re: Custom Aggregate - Example
Hi, Alternatively if you would like to create continuous aggregates per key you can use ds.groupBy().reduce(..), or use one of the stateful functions in the scala api such as mapWithState. For a rolling average per key you can check this exmple: https://github.com/gyfora/summer-school/blob/master/flink/src/main/scala/summerschool/FlinkKafkaExample.scala Cheers, Gyula On Fri, Aug 21, 2015 at 3:28 PM Aljoscha Krettek wrote: > Hi, > with the current API this should do what you are after: > > val input = ... > > val result = input > .window(...) > .groupBy(...) > .reduceWindow( /* your reduce function */ ) > > With the reduce function you should be able to implement any custom > aggregations. You can also use foldWindow() if you want to do a functional > fold over the window. > > I hope this helps. > > Cheers, > Aljoscha > > On Fri, 21 Aug 2015 at 14:51 Philipp Goetze > wrote: > >> Hello community, >> >> how do I define a custom aggregate function in Flink Streaming (Scala)? >> Could you please provide an example on how to do that? >> >> Thank you and best regards, >> Philipp >> >
Re: Custom Aggregate - Example
Hi, with the current API this should do what you are after: val input = ... val result = input .window(...) .groupBy(...) .reduceWindow( /* your reduce function */ ) With the reduce function you should be able to implement any custom aggregations. You can also use foldWindow() if you want to do a functional fold over the window. I hope this helps. Cheers, Aljoscha On Fri, 21 Aug 2015 at 14:51 Philipp Goetze wrote: > Hello community, > > how do I define a custom aggregate function in Flink Streaming (Scala)? > Could you please provide an example on how to do that? > > Thank you and best regards, > Philipp >
Re: Keep Model in Operator instance up to date
Hi Gyula, Thanks a lot. That's really help a lot ! Have a great vacation Cheers On Fri, Aug 21, 2015 at 7:47 PM, Gyula Fóra wrote: > Hi > > You are right, if all operators need continuous updates than the most > straightforward way is to push all the updates to the operators like you > described. > > If the cached data is the same for all operators and is small enough you > can centralize the updates in a dedicated operator and push the updated > data to the operators every once in a while. > > Cheers > Gyula > > > > On Thu, Aug 20, 2015 at 4:31 PM Welly Tambunan wrote: > >> Hi Gyula, >> >> I have a couple of operator on the pipeline. Filter, mapper, flatMap, and >> each of these operator contains some cache data. >> >> So i think that means for every other operator on the pipeline, i will >> need to add a new stream to update each cache data. >> >> >> Cheers >> >> On Thu, Aug 20, 2015 at 5:33 PM, Gyula Fóra wrote: >> >>> Hi, >>> >>> I don't think I fully understand your question, could you please try to >>> be a little more specific? >>> >>> I assume by caching you mean that you keep the current model as an >>> operator state. Why would you need to add new streams in this case? >>> >>> I might be slow to answer as I am currently on vacation without stable >>> internet connection. >>> >>> Cheers, >>> Gyula >>> >>> On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan >>> wrote: >>> Hi Gyula, I have another question. So if i cache something on the operator, to keep it up to date, i will always need to add and connect another stream of changes to the operator ? Is this right for every case ? Cheers On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan wrote: > Hi Gyula, > > That's really helpful. The docs is improving so much since the last > time (0.9). > > Thanks a lot ! > > Cheers > > On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra > wrote: > >> Hey, >> >> If it is always better to check the events against a more up-to-date >> model (even if the events we are checking arrived before the update) then >> it is fine to keep the model outside of the system. >> >> In this case we need to make sure that we can push the updates to the >> external system consistently. If you are using the PersistentKafkaSource >> for instance it can happen that some messages are replayed in case of >> failure. In this case you need to make sure that you remove duplicate >> updates or have idempotent updates. >> >> You can read about the checkpoint mechanism in the Flink website: >> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html >> >> Cheers, >> Gyula >> >> On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan >> wrote: >> >>> Thanks Gyula, >>> >>> Another question i have.. >>> >>> > ... while external model updates would be *tricky *to keep >>> consistent. >>> Is that still the case if the Operator treat the external model as >>> read-only ? We create another stream that will update the external model >>> separately. >>> >>> Could you please elaborate more about this one ? >>> >>> Cheers >>> >>> On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra >>> wrote: >>> In that case I would apply a map to wrap in some common type, like a n Either before the union. And then in the coflatmap you can unwrap it. On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan wrote: > Hi Gyula, > > Thanks. > > However update1 and update2 have a different type. Based on my > understanding, i don't think we can use union. How can we handle this > one ? > > We like to create our event strongly type to get the domain > language captured. > > > Cheers > > On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra > wrote: > >> Hey, >> >> One input of your co-flatmap would be model updates and the other >> input would be events to check against the model if I understand >> correctly. >> >> This means that if your model updates come from more than one >> stream you need to union them into a single stream before connecting >> them >> with the event stream and applying the coatmap. >> >> DataStream updates1 = >> DataStream updates2 = >> DataStream events = >> >> events.connect(updates1.union(updates2).broadcast()).flatMap(...) >> >> Does this answer your question? >> >> Gyula >> >> >> On Wednesday, August 19, 2015, Welly Tambunan >> wrote: >> >>> Hi Gyula, >>> >>
Custom Aggregate - Example
Hello community, how do I define a custom aggregate function in Flink Streaming (Scala)? Could you please provide an example on how to do that? Thank you and best regards, Philipp
Re: Keep Model in Operator instance up to date
Hi You are right, if all operators need continuous updates than the most straightforward way is to push all the updates to the operators like you described. If the cached data is the same for all operators and is small enough you can centralize the updates in a dedicated operator and push the updated data to the operators every once in a while. Cheers Gyula On Thu, Aug 20, 2015 at 4:31 PM Welly Tambunan wrote: > Hi Gyula, > > I have a couple of operator on the pipeline. Filter, mapper, flatMap, and > each of these operator contains some cache data. > > So i think that means for every other operator on the pipeline, i will > need to add a new stream to update each cache data. > > > Cheers > > On Thu, Aug 20, 2015 at 5:33 PM, Gyula Fóra wrote: > >> Hi, >> >> I don't think I fully understand your question, could you please try to >> be a little more specific? >> >> I assume by caching you mean that you keep the current model as an >> operator state. Why would you need to add new streams in this case? >> >> I might be slow to answer as I am currently on vacation without stable >> internet connection. >> >> Cheers, >> Gyula >> >> On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan wrote: >> >>> Hi Gyula, >>> >>> I have another question. So if i cache something on the operator, to >>> keep it up to date, i will always need to add and connect another stream >>> of changes to the operator ? >>> >>> Is this right for every case ? >>> >>> Cheers >>> >>> On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan >>> wrote: >>> Hi Gyula, That's really helpful. The docs is improving so much since the last time (0.9). Thanks a lot ! Cheers On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra wrote: > Hey, > > If it is always better to check the events against a more up-to-date > model (even if the events we are checking arrived before the update) then > it is fine to keep the model outside of the system. > > In this case we need to make sure that we can push the updates to the > external system consistently. If you are using the PersistentKafkaSource > for instance it can happen that some messages are replayed in case of > failure. In this case you need to make sure that you remove duplicate > updates or have idempotent updates. > > You can read about the checkpoint mechanism in the Flink website: > https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html > > Cheers, > Gyula > > On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan > wrote: > >> Thanks Gyula, >> >> Another question i have.. >> >> > ... while external model updates would be *tricky *to keep >> consistent. >> Is that still the case if the Operator treat the external model as >> read-only ? We create another stream that will update the external model >> separately. >> >> Could you please elaborate more about this one ? >> >> Cheers >> >> On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra >> wrote: >> >>> In that case I would apply a map to wrap in some common type, like a >>> n Either before the union. >>> >>> And then in the coflatmap you can unwrap it. >>> On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan >>> wrote: >>> Hi Gyula, Thanks. However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? We like to create our event strongly type to get the domain language captured. Cheers On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra wrote: > Hey, > > One input of your co-flatmap would be model updates and the other > input would be events to check against the model if I understand > correctly. > > This means that if your model updates come from more than one > stream you need to union them into a single stream before connecting > them > with the event stream and applying the coatmap. > > DataStream updates1 = > DataStream updates2 = > DataStream events = > > events.connect(updates1.union(updates2).broadcast()).flatMap(...) > > Does this answer your question? > > Gyula > > > On Wednesday, August 19, 2015, Welly Tambunan > wrote: > >> Hi Gyula, >> >> Thanks for your response. >> >> However the model can received multiple event for update. How can >> we do that with co-flatmap as i can see the connect API only received >> single datastream ? >> >> >> > ... while external model updates would be tricky t
Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error
I think we need to extend our own FileInputFormats as well to pass the credentials... On Fri, Aug 21, 2015 at 12:44 PM, Robert Metzger wrote: > I was able to reproduce the issue. This is the JIRA: > https://issues.apache.org/jira/browse/FLINK-2555 > I've already opened a pull request with the fix. > > The problem was that our HadoopInputFormat wrapper was not correctly > passing the security credentials from the Job object to the cluster. > > Consider this code posted by Arnaud in the initial message: > > *final* Job job = Job.*getInstance*(); > > job.setJobName("Flink source for Hive Table " + dbName + "." + > tableName); > > > > // Crée la source > > @SuppressWarnings({ "unchecked", "rawtypes" }) > > *final* HadoopInputFormat > inputFormat = *new* HadoopInputFormat > DefaultHCatRecord>(// CHECKSTYLE:ON > > (InputFormat) HCatInputFormat.*setInput*(job, dbName, > tableName, filter), // > > NullWritable.*class*, // > > DefaultHCatRecord.*class*, // > > job); > > > in the "Job.getInstance()" call, the current authentication credentials of > the user are stored. > > They are later passed to the HadoopInputFormat class (last line), but > Flink was not properly making the Credentials available again on the > cluster. > > > The pull request should resolve the issue (I've verified it on a secured > CDH 5.3 setup) > > > Thank you for reporting the bug! > > > > On Thu, Aug 20, 2015 at 5:21 PM, LINZ, Arnaud > wrote: > >> Hi Robert, >> >> >> >> Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do >> some Kerberos-related operations, needed because I manipulate some HDFS >> files before executing the application. >> >> The local cluster mode is working fine with the same code, and it does >> some HCat reading / HDFS writing. >> >> >> >> What HdfsTools does, in a nutshell : >> >> *final* Configuration cfg = *new* Configuration(); >> >> cfg.addResource(*new* Path("/home/hadoop/conf/core-site.xml")); >> >> cfg.addResource(*new* Path("/home/hadoop/conf/hdfs-site.xml")); >> >> cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*( >> "HADOOP_CONF_DIR") + "/core-site.xml")); >> >> cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*( >> "HADOOP_CONF_DIR") + "/hdfs-site.xml")); >> >> // Kerberos handling >> >> *if* (*isKerberosActive*()) { >> >> *loginKerberos*(cfg); >> >> } >> >> filesys = FileSystem.*get*(cfg); >> >> >> >> And the straightforward kerberos stuff: >> >> *public* *static* *synchronized* *void* loginKerberos(Configuration cfg) >> { >> >> UserGroupInformation.*setConfiguration*(cfg); >> >> *if* (!*loggedIn*) { >> >> *try* { >> >> UserGroupInformation.*loginUserFromKeytab*( >> *getKerberosPrincipal*(), *getKerberosKeytab*()); >> >> *loggedIn* = *true*; >> >> JournalUDF.*logLocalFS*("User " + UserGroupInformation. >> *getLoginUser*() + " : Kerberos login succeeded "); >> >> } >> >> *catch* (IOException excep) { >> >> *throw* *new* GaneshRuntimeException("Unable to log >> (kerberos) : " + excep.toString(), excep); >> >> } >> >> } >> >> } >> >> *loggedIn *being static to the class, and *alinz* having all the proper >> rights. >> >> >> >> From what I’ve seen on google, spark and hive/oozie ran into the same >> error and somewhat corrected that, but I don’t know if it will help to see >> if it’s really the same pb. >> >> I’m sending you the full trace on a private mail. >> >> >> >> Arnaud >> >> >> >> *De :* Robert Metzger [mailto:rmetz...@apache.org] >> *Envoyé :* jeudi 20 août 2015 16:42 >> *À :* user@flink.apache.org >> *Objet :* Re: Using HadoopInputFormat files from Flink/Yarn in a secure >> cluster gives an error >> >> >> >> Hi Arnaud, >> >> >> >> I suspect the "HdfsTools" are something internal from your company? >> >> Are they doing any kerberos-related operations? >> >> >> >> Is the local cluster mode also reading files from the secured HDFS >> cluster? >> >> >> >> Flink is taking care of sending the authentication tokens from the client >> to the jobManager and to the TaskManagers. >> >> For HDFS Flink should also use these user settings. >> >> I don't know whether the HCatalog code / Hadoop compatbililty code is >> also doing some kerberos operations which are interfering with our efforts. >> >> >> >> From the logs, you can see: >> >> Secure Hadoop environment setup detected. Running in secure context. >> 15:04:18,005 INFO >> org.apache.hadoop.security.UserGroupInformation - Login >> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab >> >> >> >> Is the user "alinz" authorized to access the files in HDFS? >> >> >> >> I have to admit that I didn't see this issue before. >> >> If possible, can you privately send the the full log of the ap
Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error
I was able to reproduce the issue. This is the JIRA: https://issues.apache.org/jira/browse/FLINK-2555 I've already opened a pull request with the fix. The problem was that our HadoopInputFormat wrapper was not correctly passing the security credentials from the Job object to the cluster. Consider this code posted by Arnaud in the initial message: *final* Job job = Job.*getInstance*(); job.setJobName("Flink source for Hive Table " + dbName + "." + tableName); // Crée la source @SuppressWarnings({ "unchecked", "rawtypes" }) *final* HadoopInputFormat inputFormat = *new* HadoopInputFormat(// CHECKSTYLE:ON (InputFormat) HCatInputFormat.*setInput*(job, dbName, tableName , filter), // NullWritable.*class*, // DefaultHCatRecord.*class*, // job); in the "Job.getInstance()" call, the current authentication credentials of the user are stored. They are later passed to the HadoopInputFormat class (last line), but Flink was not properly making the Credentials available again on the cluster. The pull request should resolve the issue (I've verified it on a secured CDH 5.3 setup) Thank you for reporting the bug! On Thu, Aug 20, 2015 at 5:21 PM, LINZ, Arnaud wrote: > Hi Robert, > > > > Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do > some Kerberos-related operations, needed because I manipulate some HDFS > files before executing the application. > > The local cluster mode is working fine with the same code, and it does > some HCat reading / HDFS writing. > > > > What HdfsTools does, in a nutshell : > > *final* Configuration cfg = *new* Configuration(); > > cfg.addResource(*new* Path("/home/hadoop/conf/core-site.xml")); > > cfg.addResource(*new* Path("/home/hadoop/conf/hdfs-site.xml")); > > cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*( > "HADOOP_CONF_DIR") + "/core-site.xml")); > > cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*( > "HADOOP_CONF_DIR") + "/hdfs-site.xml")); > > // Kerberos handling > > *if* (*isKerberosActive*()) { > > *loginKerberos*(cfg); > > } > > filesys = FileSystem.*get*(cfg); > > > > And the straightforward kerberos stuff: > > *public* *static* *synchronized* *void* loginKerberos(Configuration cfg) { > > UserGroupInformation.*setConfiguration*(cfg); > > *if* (!*loggedIn*) { > > *try* { > > UserGroupInformation.*loginUserFromKeytab*( > *getKerberosPrincipal*(), *getKerberosKeytab*()); > > *loggedIn* = *true*; > > JournalUDF.*logLocalFS*("User " + UserGroupInformation. > *getLoginUser*() + " : Kerberos login succeeded "); > > } > > *catch* (IOException excep) { > > *throw* *new* GaneshRuntimeException("Unable to log > (kerberos) : " + excep.toString(), excep); > > } > > } > > } > > *loggedIn *being static to the class, and *alinz* having all the proper > rights. > > > > From what I’ve seen on google, spark and hive/oozie ran into the same > error and somewhat corrected that, but I don’t know if it will help to see > if it’s really the same pb. > > I’m sending you the full trace on a private mail. > > > > Arnaud > > > > *De :* Robert Metzger [mailto:rmetz...@apache.org] > *Envoyé :* jeudi 20 août 2015 16:42 > *À :* user@flink.apache.org > *Objet :* Re: Using HadoopInputFormat files from Flink/Yarn in a secure > cluster gives an error > > > > Hi Arnaud, > > > > I suspect the "HdfsTools" are something internal from your company? > > Are they doing any kerberos-related operations? > > > > Is the local cluster mode also reading files from the secured HDFS cluster? > > > > Flink is taking care of sending the authentication tokens from the client > to the jobManager and to the TaskManagers. > > For HDFS Flink should also use these user settings. > > I don't know whether the HCatalog code / Hadoop compatbililty code is also > doing some kerberos operations which are interfering with our efforts. > > > > From the logs, you can see: > > Secure Hadoop environment setup detected. Running in secure context. > 15:04:18,005 INFO > org.apache.hadoop.security.UserGroupInformation - Login > successful for user alinz using keytab file /usr/users/alinz/alinz.keytab > > > > Is the user "alinz" authorized to access the files in HDFS? > > > > I have to admit that I didn't see this issue before. > > If possible, can you privately send the the full log of the application, > using "yarn logs -applicationId " ? > > > > > > On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud > wrote: > > Hello, > > > > My application handles as input and output some HDFS files in the jobs and > in the driver application. > > It works in local cluster mode, but when I’m trying to submit it to a yarn > client, when I try to use a HadoopInputFormat (that comes from a HCatalog
Re: Gelly ran out of memory
One of the ideas behind memory management is to recognize when the memory runs out. Simply using regular Java heap means that an operator would consume more than it should, and eventually cause an OOM error again. The only way of reacting to the fact that the memory ran out is to reduce memory usage, for example by either spilling or compacting. The solution set cannot spill at this point, so if compaction does not help, it gives up. On Thu, Aug 20, 2015 at 8:50 PM, Henry Saputra wrote: > Hi Stephan, this looks like a bug to me. Shouldn't the memory manager > switch to out of managed area if it is out of memory space? > > - Henry > > On Thu, Aug 20, 2015 at 3:09 AM, Stephan Ewen wrote: > > Actually, you ran out of "Flink Managed Memory", not user memory. User > > memory shortage manifests itself as Java OutofMemoryError. > > > > At this point, the Delta iterations cannot spill. They additionally > suffer a > > bit from memory fragmentation. > > A possible workaround is to use the option > "setSolutionSetUnmanaged(true)" > > on the iteration. That will eliminate the fragmentation issue, at least. > > > > Stephan > > > > > > On Thu, Aug 20, 2015 at 12:06 PM, Andra Lungu > wrote: > >> > >> Hi Flavio, > >> > >> These kinds of exceptions generally arise from the fact that you ran out > >> of `user` memory. You can try to increase that a bit. > >> In your flink-conf.yaml try adding > >> # The memory fraction allocated system -user > >> taskmanager.memory.fraction: 0.4 > >> > >> This will give 0.6 of the unit of memory to the user and 0.4 to the > >> system. > >> > >> Tell me if that helped. > >> Andra > >> > >> On Thu, Aug 20, 2015 at 12:02 PM, Flavio Pompermaier > >> wrote: > >>> > >>> Hi to all, > >>> > >>> I tried to run my gelly job on Flink 0.9-SNAPSHOT and I was having an > >>> EOFException, so I tried on 0.10-SNAPSHOT and now I have the following > >>> error: > >>> > >>> Caused by: java.lang.RuntimeException: Memory ran out. Compaction > failed. > >>> numPartitions: 32 minPartition: 73 maxPartition: 80 number of overflow > >>> segments: 0 bucketSize: 570 Overall memory: 102367232 Partition memory: > >>> 81100800 Message: null > >>> at > >>> > org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:465) > >>> at > >>> > org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) > >>> at > >>> > org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) > >>> at > >>> > org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:211) > >>> at > >>> > org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:272) > >>> at > >>> > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) > >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > >>> at java.lang.Thread.run(Thread.java:745) > >>> > >>> Probably I'm doing something wrong but I can't understand how to > estimate > >>> the required memory for my Gelly job.. > >>> > >>> Best, > >>> Flavio > >> > >> > > >