Hi 叶贤勋,

我手头上没有kerberos的环境,从TokenCache的代码(2.7.5版本)看起来,这个异常可能是因为没有正确拿到RM的地址或者principal。请检查一下下面这几个配置:
mapreduce.framework.name
yarn.resourcemanager.address
yarn.resourcemanager.principal
以及你的flink的作业是否能读到这些配置

On Fri, Feb 28, 2020 at 11:10 AM Kurt Young <ykt...@gmail.com> wrote:

> cc @li...@apache.org <li...@apache.org>
>
> Best,
> Kurt
>
>
> On Thu, Feb 13, 2020 at 10:22 AM 叶贤勋 <yxx_c...@163.com> wrote:
>
>> Hi 大家好:
>>     在做hive2.1.1 source带Kerberos认证有个异常请教下大家。
>>     flink 版本1.9
>>     hive 版本2.1.1,实现了HiveShimV211。
>>     代码:
>> public class HiveCatalogTest {
>>    private static final Logger LOG =
>> LoggerFactory.getLogger(HiveCatalogTest.class);
>>    private String hiveConfDir = "/Users/yexianxun/dev/env/test-hive"; //
>> a local path
>>    private TableEnvironment tableEnv;
>>    private HiveCatalog hive;
>>    private String hiveName;
>>    private String hiveDB;
>>    private String version;
>>
>>
>>    @Before
>>    public void before() {
>>       EnvironmentSettings settings =
>>          EnvironmentSettings.newInstance()
>>             .useBlinkPlanner()
>>             .inBatchMode()
>>             .build();
>>       tableEnv = TableEnvironment.create(settings);
>>       hiveName = "myhive";
>>       hiveDB = "sloth";
>>       version = "2.1.1";
>>    }
>>
>>
>>    @Test
>>    public void testCatalogQuerySink() throws Exception {
>>       hive = new HiveCatalog(hiveName, hiveDB, hiveConfDir, version);
>>       System.setProperty("java.security.krb5.conf", hiveConfDir +
>> "/krb5.conf");
>>       tableEnv.getConfig().getConfiguration().setString("stream_mode",
>> "false");
>>       tableEnv.registerCatalog(hiveName, hive);
>>       tableEnv.useCatalog(hiveName);
>>       String query = "select * from " + hiveName + "." + hiveDB +
>> ".testtbl2 where id = 20200202";
>>       Table table = tableEnv.sqlQuery(query);
>>       String newTableName = "testtbl2_1";
>>       table.insertInto(hiveName, hiveDB, newTableName);
>>       tableEnv.execute("test");
>>    }
>> }
>>
>>
>> HiveMetastoreClientFactory:
>>    public static HiveMetastoreClientWrapper create(HiveConf hiveConf,
>> String hiveVersion) {
>>       Preconditions.checkNotNull(hiveVersion, "Hive version cannot be
>> null");
>>       if (System.getProperty("java.security.krb5.conf") != null) {
>>          if (System.getProperty("had_set_kerberos") == null) {
>>             String principal = "sloth/d...@bdms.163.com";
>>             String keytab =
>> "/Users/yexianxun/dev/env/mammut-test-hive/sloth.keytab";
>>             try {
>>                sun.security.krb5.Config.refresh();
>>                UserGroupInformation.setConfiguration(hiveConf);
>>                UserGroupInformation.loginUserFromKeytab(principal,
>> keytab);
>>                System.setProperty("had_set_kerberos", "true");
>>             } catch (Exception e) {
>>                LOG.error("", e);
>>             }
>>          }
>>       }
>>       return new HiveMetastoreClientWrapper(hiveConf, hiveVersion);
>>    }
>>
>>
>> HiveCatalog:
>>    private static HiveConf createHiveConf(@Nullable String hiveConfDir) {
>>       LOG.info("Setting hive conf dir as {}", hiveConfDir);
>>       try {
>>          HiveConf.setHiveSiteLocation(
>>             hiveConfDir == null ?
>>                null : Paths.get(hiveConfDir,
>> "hive-site.xml").toUri().toURL());
>>       } catch (MalformedURLException e) {
>>          throw new CatalogException(
>>             String.format("Failed to get hive-site.xml from %s",
>> hiveConfDir), e);
>>       }
>>
>>
>>       // create HiveConf from hadoop configuration
>>       HiveConf hiveConf = new
>> HiveConf(HadoopUtils.getHadoopConfiguration(new
>> org.apache.flink.configuration.Configuration()),
>>          HiveConf.class);
>>       try {
>>          hiveConf.addResource(Paths.get(hiveConfDir,
>> "hdfs-site.xml").toUri().toURL());
>>          hiveConf.addResource(Paths.get(hiveConfDir,
>> "core-site.xml").toUri().toURL());
>>       } catch (MalformedURLException e) {
>>          throw new CatalogException(String.format("Failed to get
>> hdfs|core-site.xml from %s", hiveConfDir), e);
>>       }
>>       return hiveConf;
>>    }
>>
>>
>>     在执行testCatalogQuerySink方法报以下错误:
>> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
>> JobResult.
>>
>>
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:622)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
>> at
>> org.apache.flink.table.planner.delegation.BatchExecutor.execute(BatchExecutor.java:55)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410)
>> at api.HiveCatalogTest.testCatalogQuerySink(HiveCatalogMumTest.java:234)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> at
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>> at
>> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>> at
>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
>> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
>> to submit job.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
>> at
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>> at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.RuntimeException:
>> org.apache.flink.runtime.client.JobExecutionException: Could not set up
>> JobManager
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> ... 6 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
>> at
>> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>> ... 7 more
>> Caused by: org.apache.flink.runtime.JobException: Creating the input
>> splits caused an error: Can't get Master Kerberos principal for use as
>> renewer
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:907)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
>> at
>> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
>> at
>> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
>> at
>> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
>> at
>> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
>> ... 10 more
>> Caused by: java.io.IOException: Can't get Master Kerberos principal for
>> use as renewer
>> at
>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116)
>> at
>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
>> at
>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
>> at
>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:206)
>> at
>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
>> at
>> org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:159)
>> at
>> org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:63)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256)
>> ... 22 more
>>
>>
>>     测试sink的方法是能够正常插入数据,但是在hive source时报这个错误,感觉是获取deleg
>> token时返回空导致的。不知道具体应该怎么解决
>>
>>
>>
>>
>>
>> | |
>> 叶贤勋
>> |
>> |
>> yxx_c...@163.com
>> |
>> 签名由网易邮箱大师定制
>>
>>

回复