cc @li...@apache.org <li...@apache.org> Best, Kurt
On Thu, Feb 13, 2020 at 10:22 AM 叶贤勋 <yxx_c...@163.com> wrote: > Hi 大家好: > 在做hive2.1.1 source带Kerberos认证有个异常请教下大家。 > flink 版本1.9 > hive 版本2.1.1,实现了HiveShimV211。 > 代码: > public class HiveCatalogTest { > private static final Logger LOG = > LoggerFactory.getLogger(HiveCatalogTest.class); > private String hiveConfDir = "/Users/yexianxun/dev/env/test-hive"; // a > local path > private TableEnvironment tableEnv; > private HiveCatalog hive; > private String hiveName; > private String hiveDB; > private String version; > > > @Before > public void before() { > EnvironmentSettings settings = > EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inBatchMode() > .build(); > tableEnv = TableEnvironment.create(settings); > hiveName = "myhive"; > hiveDB = "sloth"; > version = "2.1.1"; > } > > > @Test > public void testCatalogQuerySink() throws Exception { > hive = new HiveCatalog(hiveName, hiveDB, hiveConfDir, version); > System.setProperty("java.security.krb5.conf", hiveConfDir + > "/krb5.conf"); > tableEnv.getConfig().getConfiguration().setString("stream_mode", > "false"); > tableEnv.registerCatalog(hiveName, hive); > tableEnv.useCatalog(hiveName); > String query = "select * from " + hiveName + "." + hiveDB + > ".testtbl2 where id = 20200202"; > Table table = tableEnv.sqlQuery(query); > String newTableName = "testtbl2_1"; > table.insertInto(hiveName, hiveDB, newTableName); > tableEnv.execute("test"); > } > } > > > HiveMetastoreClientFactory: > public static HiveMetastoreClientWrapper create(HiveConf hiveConf, > String hiveVersion) { > Preconditions.checkNotNull(hiveVersion, "Hive version cannot be > null"); > if (System.getProperty("java.security.krb5.conf") != null) { > if (System.getProperty("had_set_kerberos") == null) { > String principal = "sloth/d...@bdms.163.com"; > String keytab = > "/Users/yexianxun/dev/env/mammut-test-hive/sloth.keytab"; > try { > sun.security.krb5.Config.refresh(); > UserGroupInformation.setConfiguration(hiveConf); > UserGroupInformation.loginUserFromKeytab(principal, keytab); > System.setProperty("had_set_kerberos", "true"); > } catch (Exception e) { > LOG.error("", e); > } > } > } > return new HiveMetastoreClientWrapper(hiveConf, hiveVersion); > } > > > HiveCatalog: > private static HiveConf createHiveConf(@Nullable String hiveConfDir) { > LOG.info("Setting hive conf dir as {}", hiveConfDir); > try { > HiveConf.setHiveSiteLocation( > hiveConfDir == null ? > null : Paths.get(hiveConfDir, > "hive-site.xml").toUri().toURL()); > } catch (MalformedURLException e) { > throw new CatalogException( > String.format("Failed to get hive-site.xml from %s", > hiveConfDir), e); > } > > > // create HiveConf from hadoop configuration > HiveConf hiveConf = new > HiveConf(HadoopUtils.getHadoopConfiguration(new > org.apache.flink.configuration.Configuration()), > HiveConf.class); > try { > hiveConf.addResource(Paths.get(hiveConfDir, > "hdfs-site.xml").toUri().toURL()); > hiveConf.addResource(Paths.get(hiveConfDir, > "core-site.xml").toUri().toURL()); > } catch (MalformedURLException e) { > throw new CatalogException(String.format("Failed to get > hdfs|core-site.xml from %s", hiveConfDir), e); > } > return hiveConf; > } > > > 在执行testCatalogQuerySink方法报以下错误: > org.apache.flink.runtime.client.JobExecutionException: Could not retrieve > JobResult. > > > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:622) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117) > at > org.apache.flink.table.planner.delegation.BatchExecutor.execute(BatchExecutor.java:55) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410) > at api.HiveCatalogTest.testCatalogQuerySink(HiveCatalogMumTest.java:234) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 6 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 7 more > Caused by: org.apache.flink.runtime.JobException: Creating the input > splits caused an error: Can't get Master Kerberos principal for use as > renewer > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:907) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > at > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) > at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > ... 10 more > Caused by: java.io.IOException: Can't get Master Kerberos principal for > use as renewer > at > org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116) > at > org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100) > at > org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80) > at > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:206) > at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315) > at > org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:159) > at > org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:63) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256) > ... 22 more > > > 测试sink的方法是能够正常插入数据,但是在hive source时报这个错误,感觉是获取deleg > token时返回空导致的。不知道具体应该怎么解决 > > > > > > | | > 叶贤勋 > | > | > yxx_c...@163.com > | > 签名由网易邮箱大师定制 > >