hi 苏欣: 建议先检查一下最后打的 jar 包里面是否包含了 com.hadoop.compression.lzo.LzoCodec 和 com.hadoop.compression.lzo.LzoCodec
苏 欣 <sean...@live.com> 于2019年8月9日周五 下午5:41写道: > 使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。 > > 我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。 > 请问有人遇到过这种问题吗? > > 报错信息如下: > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: Could not > retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) > at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > [Internal server error., <Exception on server side: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 6 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 7 more > Caused by: org.apache.flink.runtime.JobException: Creating the input > splits caused an error: Error in configuring object > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > at > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > ... 10 more > Caused by: java.lang.RuntimeException: Error in configuring object > at > org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) > at > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) > at > org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161) > at > org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256) > ... 22 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) > ... 26 more > Caused by: java.lang.IllegalArgumentException: Compression codec > com.hadoop.compression.lzo.LzoCodec not found. > at > org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135) > at > org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175) > at > org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) > ... 31 more > Caused by: java.lang.ClassNotFoundException: Class > com.hadoop.compression.lzo.LzoCodec not found > at > org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060) > at > org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128) > ... 33 more > > End of exception on server side>] > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more > > > 代码如下: > > ExecutionEnvironment execEnv = > ExecutionEnvironment.getExecutionEnvironment(); > execEnv.setParallelism(1); > BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); > > HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null, > "/home/sean/120_conf", "1.2.1"); > hiveCatalog.open(); > > tableEnv.registerCatalog("myhive", hiveCatalog); > > Optional<Catalog> myHive = tableEnv.getCatalog("myhive"); > > ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01"); > // 这里可以打印 > System.out.println(myHive.get().getTable(myTablePath).getSchema()); > > tableEnv.useCatalog("myhive"); > Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01"); > List<Row> result = tableEnv.toDataSet(table, Row.class).collect(); > System.out.println(result); > tableEnv.execute(""); > > > > > > Hive相关的pom配置如下: > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-metastore</artifactId> > <version>1.2.1</version> > </dependency> > > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-exec</artifactId> > <version>1.2.1</version> > <exclusions> > <exclusion> > <groupId>org.apache.thrift</groupId> > <artifactId>libfb303</artifactId> > </exclusion> > </exclusions> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-shaded-hadoop-2-uber</artifactId> > <version>2.6.5-7.0</version> > <scope>provided</scope> > </dependency> > > > > > 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > >