Hi 大家好:
    在做hive2.1.1 source带Kerberos认证有个异常请教下大家。
    flink 版本1.9
    hive 版本2.1.1,实现了HiveShimV211。
    代码:
public class HiveCatalogTest {
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalogTest.class);
   private String hiveConfDir = "/Users/yexianxun/dev/env/test-hive"; // a 
local path
   private TableEnvironment tableEnv;
   private HiveCatalog hive;
   private String hiveName;
   private String hiveDB;
   private String version;


   @Before
   public void before() {
      EnvironmentSettings settings =
         EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inBatchMode()
            .build();
      tableEnv = TableEnvironment.create(settings);
      hiveName = "myhive";
      hiveDB = "sloth";
      version = "2.1.1";
   }


   @Test
   public void testCatalogQuerySink() throws Exception {
      hive = new HiveCatalog(hiveName, hiveDB, hiveConfDir, version);
      System.setProperty("java.security.krb5.conf", hiveConfDir + "/krb5.conf");
      tableEnv.getConfig().getConfiguration().setString("stream_mode", "false");
      tableEnv.registerCatalog(hiveName, hive);
      tableEnv.useCatalog(hiveName);
      String query = "select * from " + hiveName + "." + hiveDB + ".testtbl2 
where id = 20200202";
      Table table = tableEnv.sqlQuery(query);
      String newTableName = "testtbl2_1";
      table.insertInto(hiveName, hiveDB, newTableName);
      tableEnv.execute("test");
   }
}


HiveMetastoreClientFactory:
   public static HiveMetastoreClientWrapper create(HiveConf hiveConf, String 
hiveVersion) {
      Preconditions.checkNotNull(hiveVersion, "Hive version cannot be null");
      if (System.getProperty("java.security.krb5.conf") != null) {
         if (System.getProperty("had_set_kerberos") == null) {
            String principal = "sloth/d...@bdms.163.com";
            String keytab = 
"/Users/yexianxun/dev/env/mammut-test-hive/sloth.keytab";
            try {
               sun.security.krb5.Config.refresh();
               UserGroupInformation.setConfiguration(hiveConf);
               UserGroupInformation.loginUserFromKeytab(principal, keytab);
               System.setProperty("had_set_kerberos", "true");
            } catch (Exception e) {
               LOG.error("", e);
            }
         }
      }
      return new HiveMetastoreClientWrapper(hiveConf, hiveVersion);
   }


HiveCatalog:
   private static HiveConf createHiveConf(@Nullable String hiveConfDir) {
      LOG.info("Setting hive conf dir as {}", hiveConfDir);
      try {
         HiveConf.setHiveSiteLocation(
            hiveConfDir == null ?
               null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());
      } catch (MalformedURLException e) {
         throw new CatalogException(
            String.format("Failed to get hive-site.xml from %s", hiveConfDir), 
e);
      }


      // create HiveConf from hadoop configuration
      HiveConf hiveConf = new HiveConf(HadoopUtils.getHadoopConfiguration(new 
org.apache.flink.configuration.Configuration()),
         HiveConf.class);
      try {
         hiveConf.addResource(Paths.get(hiveConfDir, 
"hdfs-site.xml").toUri().toURL());
         hiveConf.addResource(Paths.get(hiveConfDir, 
"core-site.xml").toUri().toURL());
      } catch (MalformedURLException e) {
         throw new CatalogException(String.format("Failed to get 
hdfs|core-site.xml from %s", hiveConfDir), e);
      }
      return hiveConf;
   }


    在执行testCatalogQuerySink方法报以下错误:
org.apache.flink.runtime.client.JobExecutionException: Could not retrieve 
JobResult.


at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:622)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
at 
org.apache.flink.table.planner.delegation.BatchExecutor.execute(BatchExecutor.java:55)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410)
at api.HiveCatalogTest.testCatalogQuerySink(HiveCatalogMumTest.java:234)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit job.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Can't get Master Kerberos principal for use as renewer
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:907)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
at 
org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
... 10 more
Caused by: java.io.IOException: Can't get Master Kerberos principal for use as 
renewer
at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116)
at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:206)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at 
org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:159)
at 
org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:63)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256)
... 22 more


    测试sink的方法是能够正常插入数据,但是在hive source时报这个错误,感觉是获取deleg token时返回空导致的。不知道具体应该怎么解决





| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制

回复