I think we need to extend our own FileInputFormats as well to pass the credentials...
On Fri, Aug 21, 2015 at 12:44 PM, Robert Metzger <rmetz...@apache.org> wrote: > I was able to reproduce the issue. This is the JIRA: > https://issues.apache.org/jira/browse/FLINK-2555 > I've already opened a pull request with the fix. > > The problem was that our HadoopInputFormat wrapper was not correctly > passing the security credentials from the Job object to the cluster. > > Consider this code posted by Arnaud in the initial message: > > *final* Job job = Job.*getInstance*(); > > job.setJobName("Flink source for Hive Table " + dbName + "." + > tableName); > > > > // Crée la source > > @SuppressWarnings({ "unchecked", "rawtypes" }) > > *final* HadoopInputFormat<NullWritable, DefaultHCatRecord> > inputFormat = *new* HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF > > DefaultHCatRecord>(// CHECKSTYLE:ON > > (InputFormat) HCatInputFormat.*setInput*(job, dbName, > tableName, filter), // > > NullWritable.*class*, // > > DefaultHCatRecord.*class*, // > > job); > > > in the "Job.getInstance()" call, the current authentication credentials of > the user are stored. > > They are later passed to the HadoopInputFormat class (last line), but > Flink was not properly making the Credentials available again on the > cluster. > > > The pull request should resolve the issue (I've verified it on a secured > CDH 5.3 setup) > > > Thank you for reporting the bug! > > > > On Thu, Aug 20, 2015 at 5:21 PM, LINZ, Arnaud <al...@bouyguestelecom.fr> > wrote: > >> Hi Robert, >> >> >> >> Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do >> some Kerberos-related operations, needed because I manipulate some HDFS >> files before executing the application. >> >> The local cluster mode is working fine with the same code, and it does >> some HCat reading / HDFS writing. >> >> >> >> What HdfsTools does, in a nutshell : >> >> *final* Configuration cfg = *new* Configuration(); >> >> cfg.addResource(*new* Path("/home/hadoop/conf/core-site.xml")); >> >> cfg.addResource(*new* Path("/home/hadoop/conf/hdfs-site.xml")); >> >> cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*( >> "HADOOP_CONF_DIR") + "/core-site.xml")); >> >> cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*( >> "HADOOP_CONF_DIR") + "/hdfs-site.xml")); >> >> // Kerberos handling >> >> *if* (*isKerberosActive*()) { >> >> *loginKerberos*(cfg); >> >> } >> >> filesys = FileSystem.*get*(cfg); >> >> >> >> And the straightforward kerberos stuff: >> >> *public* *static* *synchronized* *void* loginKerberos(Configuration cfg) >> { >> >> UserGroupInformation.*setConfiguration*(cfg); >> >> *if* (!*loggedIn*) { >> >> *try* { >> >> UserGroupInformation.*loginUserFromKeytab*( >> *getKerberosPrincipal*(), *getKerberosKeytab*()); >> >> *loggedIn* = *true*; >> >> JournalUDF.*logLocalFS*("User " + UserGroupInformation. >> *getLoginUser*() + " : Kerberos login succeeded "); >> >> } >> >> *catch* (IOException excep) { >> >> *throw* *new* GaneshRuntimeException("Unable to log >> (kerberos) : " + excep.toString(), excep); >> >> } >> >> } >> >> } >> >> *loggedIn *being static to the class, and *alinz* having all the proper >> rights. >> >> >> >> From what I’ve seen on google, spark and hive/oozie ran into the same >> error and somewhat corrected that, but I don’t know if it will help to see >> if it’s really the same pb. >> >> I’m sending you the full trace on a private mail. >> >> >> >> Arnaud >> >> >> >> *De :* Robert Metzger [mailto:rmetz...@apache.org] >> *Envoyé :* jeudi 20 août 2015 16:42 >> *À :* user@flink.apache.org >> *Objet :* Re: Using HadoopInputFormat files from Flink/Yarn in a secure >> cluster gives an error >> >> >> >> Hi Arnaud, >> >> >> >> I suspect the "HdfsTools" are something internal from your company? >> >> Are they doing any kerberos-related operations? >> >> >> >> Is the local cluster mode also reading files from the secured HDFS >> cluster? >> >> >> >> Flink is taking care of sending the authentication tokens from the client >> to the jobManager and to the TaskManagers. >> >> For HDFS Flink should also use these user settings. >> >> I don't know whether the HCatalog code / Hadoop compatbililty code is >> also doing some kerberos operations which are interfering with our efforts. >> >> >> >> From the logs, you can see: >> >> Secure Hadoop environment setup detected. Running in secure context. >> 15:04:18,005 INFO >> org.apache.hadoop.security.UserGroupInformation - Login >> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab >> >> >> >> Is the user "alinz" authorized to access the files in HDFS? >> >> >> >> I have to admit that I didn't see this issue before. >> >> If possible, can you privately send the the full log of the application, >> using "yarn logs -applicationId <ID>" ? >> >> >> >> >> >> On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <al...@bouyguestelecom.fr> >> wrote: >> >> Hello, >> >> >> >> My application handles as input and output some HDFS files in the jobs >> and in the driver application. >> >> It works in local cluster mode, but when I’m trying to submit it to a >> yarn client, when I try to use a HadoopInputFormat (that comes from a >> HCatalog request), I have the following error: *Delegation Token can be >> issued only with kerberos or web authentication *(full stack trace >> below). >> >> >> >> Code which I believe causes the error (It’s not clear in the stack trace, >> as the nearest point in my code is “execEnv.execute()”) : >> >> >> >> *public* *synchronized* DataSet<T> readTable(String dbName, String >> tableName, String filter, ExecutionEnvironment cluster, >> >> *final* HiveBeanFactory<T> factory) *throws* IOException { >> >> >> >> // login kerberos if needed (via >> UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), >> getKerberosKeytab());) >> >> HdfsTools.*getFileSystem*(); >> >> >> >> // Create M/R job and configure it >> >> *final* Job job = Job.*getInstance*(); >> >> job.setJobName("Flink source for Hive Table " + dbName + "." + >> tableName); >> >> >> >> // Crée la source >> >> @SuppressWarnings({ "unchecked", "rawtypes" }) >> >> *final* HadoopInputFormat<NullWritable, DefaultHCatRecord> >> inputFormat = *new* HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF >> >> DefaultHCatRecord>(// CHECKSTYLE:ON >> >> (InputFormat) HCatInputFormat.*setInput*(job, dbName, >> tableName, filter), // >> >> NullWritable.*class*, // >> >> DefaultHCatRecord.*class*, // >> >> job); >> >> >> >> *final* HCatSchema inputSchema = HCatInputFormat.*getTableSchema* >> (job.getConfiguration()); >> >> @SuppressWarnings("serial") >> >> *final* DataSet<T> dataSet = cluster >> >> // Read the table >> >> .createInput(inputFormat) >> >> // map bean (key is useless) >> >> .flatMap(*new* FlatMapFunction<Tuple2<NullWritable, >> DefaultHCatRecord>, T>() { >> >> @Override >> >> *public* *void* flatMap(Tuple2<NullWritable, >> DefaultHCatRecord> value, Collector<T> out) *throws* Exception { // >> NOPMD >> >> *final* T record = factory.fromHive(value.f1, >> inputSchema); >> >> *if* (record != *null*) { >> >> out.collect(record); >> >> } >> >> } >> >> }).returns(beanClass); >> >> >> >> *return* dataSet; >> >> } >> >> >> >> Maybe I need to explicitely get a token on each node in the >> initialization of HadoopInputFormat() (overriding configure()) ? That >> would be difficult since the keyfile is on the driver’s local drive… >> >> >> >> StackTrace : >> >> >> >> Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties >> >> Using JobManager address from YARN properties >> bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494 >> >> Secure Hadoop environment setup detected. Running in secure context. >> >> 2015:08:20 15:04:17 (main) - INFO - >> com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement >> >> 15:04:18,005 INFO >> org.apache.hadoop.security.UserGroupInformation - Login >> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab >> >> 15:04:20,139 WARN >> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The >> short-circuit local reads feature cannot be used because libhadoop cannot >> be loaded. >> >> Error : Execution Kubera KO : java.lang.IllegalStateException: Error >> while executing Flink application >> >> >> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84) >> >> >> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68) >> >> >> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51) >> >> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81) >> >> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) >> >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> java.lang.reflect.Method.invoke(Method.java:606) >> >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >> >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >> >> org.apache.flink.client.program.Client.run(Client.java:315) >> >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) >> >> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) >> >> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873) >> >> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870) >> >> >> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50) >> >> java.security.AccessController.doPrivileged(Native Method) >> >> javax.security.auth.Subject.doAs(Subject.java:415) >> >> >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) >> >> >> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47) >> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870) >> >> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) >> >> >> >> Caused by: org.apache.flink.client.program.ProgramInvocationException: >> The program execution failed: Failed to submit job >> dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT) >> >> org.apache.flink.client.program.Client.run(Client.java:413) >> >> org.apache.flink.client.program.Client.run(Client.java:356) >> >> org.apache.flink.client.program.Client.run(Client.java:349) >> >> >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) >> >> >> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80) >> >> >> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68) >> >> >> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51) >> >> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81) >> >> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) >> >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> java.lang.reflect.Method.invoke(Method.java:606) >> >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >> >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >> >> org.apache.flink.client.program.Client.run(Client.java:315) >> >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) >> >> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) >> >> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873) >> >> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870) >> >> >> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50) >> >> java.security.AccessController.doPrivileged(Native Method) >> >> javax.security.auth.Subject.doAs(Subject.java:415) >> >> >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) >> >> >> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47) >> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870) >> >> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) >> >> >> >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed >> to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT) >> >> org.apache.flink.runtime.jobmanager.JobManager.org >> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594) >> >> >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> >> >> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) >> >> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >> >> >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) >> >> >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >> >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> >> >> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >> >> akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> >> >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> >> akka.actor.ActorCell.invoke(ActorCell.scala:487) >> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> >> akka.dispatch.Mailbox.run(Mailbox.scala:221) >> >> akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> *Caused by: org.apache.flink.runtime.JobException: Creating the input >> splits caused an error: Delegation Token can be issued only with kerberos >> or web authentication* >> >> * at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)* >> >> * at >> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)* >> >> * at >> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)* >> >> * at >> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)* >> >> * at >> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)* >> >> * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)* >> >> * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)* >> >> * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)* >> >> * at java.security.AccessController.doPrivileged(Native Method)* >> >> * at javax.security.auth.Subject.doAs(Subject.java:415)* >> >> * at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)* >> >> * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)* >> >> >> >> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162) >> >> >> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469) >> >> org.apache.flink.runtime.jobmanager.JobManager.org >> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534) >> >> >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> >> >> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) >> >> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >> >> >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) >> >> >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >> >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> >> >> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >> >> akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> >> >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> >> akka.actor.ActorCell.invoke(ActorCell.scala:487) >> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> >> akka.dispatch.Mailbox.run(Mailbox.scala:221) >> >> akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): >> Delegation Token can be issued only with kerberos or web authentication >> >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609) >> >> at >> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522) >> >> at >> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977) >> >> at >> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) >> >> at >> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) >> >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) >> >> at java.security.AccessController.doPrivileged(Native Method) >> >> at javax.security.auth.Subject.doAs(Subject.java:415) >> >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) >> >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) >> >> >> >> org.apache.hadoop.ipc.Client.call(Client.java:1468) >> >> org.apache.hadoop.ipc.Client.call(Client.java:1399) >> >> >> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) >> >> com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source) >> >> >> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909) >> >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> java.lang.reflect.Method.invoke(Method.java:606) >> >> >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) >> >> >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >> >> com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source) >> >> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029) >> >> >> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355) >> >> >> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529) >> >> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507) >> >> >> org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041) >> >> >> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121) >> >> >> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100) >> >> >> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80) >> >> >> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205) >> >> >> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) >> >> >> org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157) >> >> >> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140) >> >> >> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51) >> >> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146) >> >> >> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469) >> >> org.apache.flink.runtime.jobmanager.JobManager.org >> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534) >> >> >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> >> >> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) >> >> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >> >> >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) >> >> >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >> >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> >> >> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >> >> akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> >> >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> >> akka.actor.ActorCell.invoke(ActorCell.scala:487) >> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> >> akka.dispatch.Mailbox.run(Mailbox.scala:221) >> >> akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> >> >> >> >> Do you have any clue? >> >> >> >> Best regards, >> >> Arnaud >> >> >> >> >> >> >> ------------------------------ >> >> >> L'intégrité de ce message n'étant pas assurée sur internet, la société >> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces >> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si >> vous n'êtes pas destinataire de ce message, merci de le détruire et >> d'avertir l'expéditeur. >> >> The integrity of this message cannot be guaranteed on the Internet. The >> company that sent this message cannot therefore be held liable for its >> content nor attachments. Any unauthorized use or dissemination is >> prohibited. If you are not the intended recipient of this message, then >> please delete it and notify the sender. >> >> >> > >