先问下你的hdfs上的文件是不是用lzo压缩的?


athlon...@gmail.com
 
发件人: 苏 欣
发送时间: 2019-08-09 17:40
收件人: user-zh@flink.apache.org
主题: flink1.10版本连接hive报错
使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。
我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。
请问有人遇到过这种问题吗?
 
报错信息如下:
------------------------------------------------------------
The program finished with the following exception:
 
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 3f3033f7076c332529f3ac8250713889)
        at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
        at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
        at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
        at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
        at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
        at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
        at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager
        at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
        ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager
        at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
        at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
        at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
        at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
        ... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Error in configuring object
        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:270)
        at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:897)
        at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
        at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
        at 
org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
        at 
org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
        at 
org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
        at 
org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
        at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
        ... 10 more
Caused by: java.lang.RuntimeException: Error in configuring object
        at 
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
        at 
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
        at 
org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:161)
        at 
org.apache.flink.connectors.hive.HiveTableInputFormat.createInputSplits(HiveTableInputFormat.java:67)
        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:256)
        ... 22 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
        ... 26 more
Caused by: java.lang.IllegalArgumentException: Compression codec 
com.hadoop.compression.lzo.LzoCodec not found.
        at 
org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
        at 
org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
        at 
org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
        ... 31 more
Caused by: java.lang.ClassNotFoundException: Class 
com.hadoop.compression.lzo.LzoCodec not found
        at 
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
        at 
org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
        ... 33 more
 
End of exception on server side>]
        at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
        at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
       at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
        ... 4 more
 
 
代码如下:
 
ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(1);
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
 
HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", null, 
"/home/sean/120_conf", "1.2.1");
hiveCatalog.open();
 
tableEnv.registerCatalog("myhive", hiveCatalog);
 
Optional<Catalog> myHive = tableEnv.getCatalog("myhive");
 
ObjectPath myTablePath = new ObjectPath("sean_test", "flink_test_01");
// 这里可以打印
System.out.println(myHive.get().getTable(myTablePath).getSchema());
 
tableEnv.useCatalog("myhive");
Table table = tableEnv.sqlQuery("select * from sean_test.flink_test_01");
List<Row> result = tableEnv.toDataSet(table, Row.class).collect();
System.out.println(result);
tableEnv.execute("");
 
 
 
 
 
Hive相关的pom配置如下:
 
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>
 
<dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-metastore</artifactId>
   <version>1.2.1</version>
</dependency>
 
<dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-exec</artifactId>
   <version>1.2.1</version>
   <exclusions>
      <exclusion>
         <groupId>org.apache.thrift</groupId>
         <artifactId>libfb303</artifactId>
      </exclusion>
   </exclusions>
</dependency>
 
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
 
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-shaded-hadoop-2-uber</artifactId>
   <version>2.6.5-7.0</version>
   <scope>provided</scope>
</dependency>
 
 
 
 
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
 

Reply via email to