Hi 叶贤勋, 我手头上没有kerberos的环境,从TokenCache的代码(2.7.5版本)看起来,这个异常可能是因为没有正确拿到RM的地址或者principal。请检查一下下面这几个配置: mapreduce.framework.name yarn.resourcemanager.address yarn.resourcemanager.principal 以及你的flink的作业是否能读到这些配置
On Fri, Feb 28, 2020 at 11:10 AM Kurt Young <ykt...@gmail.com> wrote: > cc @li...@apache.org <li...@apache.org> > > Best, > Kurt > > > On Thu, Feb 13, 2020 at 10:22 AM 叶贤勋 <yxx_c...@163.com> wrote: > >> Hi 大家好: >> 在做hive2.1.1 source带Kerberos认证有个异常请教下大家。 >> flink 版本1.9 >> hive 版本2.1.1,实现了HiveShimV211。 >> 代码: >> public class HiveCatalogTest { >> private static final Logger LOG = >> LoggerFactory.getLogger(HiveCatalogTest.class); >> private String hiveConfDir = "/Users/yexianxun/dev/env/test-hive"; // >> a local path >> private TableEnvironment tableEnv; >> private HiveCatalog hive; >> private String hiveName; >> private String hiveDB; >> private String version; >> >> >> @Before >> public void before() { >> EnvironmentSettings settings = >> EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inBatchMode() >> .build(); >> tableEnv = TableEnvironment.create(settings); >> hiveName = "myhive"; >> hiveDB = "sloth"; >> version = "2.1.1"; >> } >> >> >> @Test >> public void testCatalogQuerySink() throws Exception { >> hive = new HiveCatalog(hiveName, hiveDB, hiveConfDir, version); >> System.setProperty("java.security.krb5.conf", hiveConfDir + >> "/krb5.conf"); >> tableEnv.getConfig().getConfiguration().setString("stream_mode", >> "false"); >> tableEnv.registerCatalog(hiveName, hive); >> tableEnv.useCatalog(hiveName); >> String query = "select * from " + hiveName + "." + hiveDB + >> ".testtbl2 where id = 20200202"; >> Table table = tableEnv.sqlQuery(query); >> String newTableName = "testtbl2_1"; >> table.insertInto(hiveName, hiveDB, newTableName); >> tableEnv.execute("test"); >> } >> } >> >> >> HiveMetastoreClientFactory: >> public static HiveMetastoreClientWrapper create(HiveConf hiveConf, >> String hiveVersion) { >> Preconditions.checkNotNull(hiveVersion, "Hive version cannot be >> null"); >> if (System.getProperty("java.security.krb5.conf") != null) { >> if (System.getProperty("had_set_kerberos") == null) { >> String principal = "sloth/d...@bdms.163.com"; >> String keytab = >> "/Users/yexianxun/dev/env/mammut-test-hive/sloth.keytab"; >> try { >> sun.security.krb5.Config.refresh(); >> UserGroupInformation.setConfiguration(hiveConf); >> UserGroupInformation.loginUserFromKeytab(principal, >> keytab); >> System.setProperty("had_set_kerberos", "true"); >> } catch (Exception e) { >> LOG.error("", e); >> } >> } >> } >> return new HiveMetastoreClientWrapper(hiveConf, hiveVersion); >> } >> >> >> HiveCatalog: >> private static HiveConf createHiveConf(@Nullable String hiveConfDir) { >> LOG.info("Setting hive conf dir as {}", hiveConfDir); >> try { >> HiveConf.setHiveSiteLocation( >> hiveConfDir == null ? >> null : Paths.get(hiveConfDir, >> "hive-site.xml").toUri().toURL()); >> } catch (MalformedURLException e) { >> throw new CatalogException( >> String.format("Failed to get hive-site.xml from %s", >> hiveConfDir), e); >> } >> >> >> // create HiveConf from hadoop configuration >> HiveConf hiveConf = new >> HiveConf(HadoopUtils.getHadoopConfiguration(new >> org.apache.flink.configuration.Configuration()), >> HiveConf.class); >> try { >> hiveConf.addResource(Paths.get(hiveConfDir, >> "hdfs-site.xml").toUri().toURL()); >> hiveConf.addResource(Paths.get(hiveConfDir, >> "core-site.xml").toUri().toURL()); >> } catch (MalformedURLException e) { >> throw new CatalogException(String.format("Failed to get >> hdfs|core-site.xml from %s", hiveConfDir), e); >> } >> return hiveConf; >> } >> >> >> 在执行testCatalogQuerySink方法报以下错误: >> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve >> JobResult. >> >> >> at >> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:622) >> at >> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117) >> at >> org.apache.flink.table.planner.delegation.BatchExecutor.execute(BatchExecutor.java:55) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410) >> at api.HiveCatalogTest.testCatalogQuerySink(HiveCatalogMumTest.java:234) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >> at >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> at >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> at >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) >> at >> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) >> at >> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) >> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) >> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed >> to submit job. >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333) >> at >> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) >> at >> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) >> at >> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.RuntimeException: >> org.apache.flink.runtime.client.JobExecutionException: Could not set up >> JobManager >> at >> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) >> at >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >> ... 6 more >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could >> not set up JobManager >> at >> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152) >> at >> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) >> at >> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) >> ... 7 more >> Caused by: org.apache.flink.runtime.JobException: Creating the input >> splits caused an error: Can't get Master Kerberos principal for use as >> renewer >> at >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:907) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) >> at >> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) >> at >> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) >> at >> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) >> at >> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) >> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266) >> at >> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) >> at >> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) >> at >> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) >> ... 10 more >> Caused by: java.io.IOException: Can't get Master Kerberos principal for >> use as renewer >> at >> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116) >> at >> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100) >> at >> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80) >> at >> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:206) >> at >> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315) >> at >> org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:159) >> at >> org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:63) >> at >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256) >> ... 22 more >> >> >> 测试sink的方法是能够正常插入数据,但是在hive source时报这个错误,感觉是获取deleg >> token时返回空导致的。不知道具体应该怎么解决 >> >> >> >> >> >> | | >> 叶贤勋 >> | >> | >> yxx_c...@163.com >> | >> 签名由网易邮箱大师定制 >> >>