看看你的表是不是事务表,hive建表的时候加上 'transactional' = 'false'

在 2020/9/8 16:26, 大罗 写道:
Hi,我使用flink sql 1.11.1 的hive catalog特性往hive orc表插入数据:

我所使用的版本如下:

Hadoop 3.0.0+cdh6.3.2

HDFS 3.0.0+cdh6.3.2

HBase 2.1.0+cdh6.3.2

Hive 2.1.1+cdh6.3.2

Flink 1.11.1

定义hive orc表如下:
create table dest_orc (
  i int
) partitioned by (ts string)
stored as orc
TBLPROPERTIES(
  'orc.compress' = 'SNAPPY'
);

在flink-sql插入数据:
Flink SQL> insert into dest_orc select 1,  '2020-09-08 10:11:00' ;
[INFO] Table update statement has been successfully submitted to the
cluster:
Job ID: a2c96bcaf23abc24de8e5405ec2bb7c6

报错如下:
2020-09-08 16:16:39
org.apache.flink.connectors.hive.FlinkHiveException:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create
Hive RecordWriter
        at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159)
        at
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47)
        at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257)
        at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230)
        at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
        at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
        at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
        at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
        at
org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at StreamExecCalc$183.processElement(Unknown Source)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
        at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException:
Failed to create Hive RecordWriter
        at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58)
        at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151)
        ... 25 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55)
        ... 26 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
        at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getOptions(OrcOutputFormat.java:161)
        at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:189)
        at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:67)
        at
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:284)
        ... 30 more

但是,我有在flink的lib目录下保存"/opt/flink-1.11.1/lib/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar",如下:

[econ@dev-hadoop-node-c ~]$ ll /opt/flink-1.11.1/lib/*jar
-rw-r--r--+ 1 econ econ     92494 8月  26 16:38
/opt/flink-1.11.1/lib/flink-connector-hbase_2.11-1.11.1.jar
-rw-r--r--+ 1 econ econ    197131 8月  25 18:06
/opt/flink-1.11.1/lib/flink-connector-jdbc_2.11-1.11.0.jar
-rw-r--r--+ 1 econ econ     90782 7月  15 17:24
/opt/flink-1.11.1/lib/flink-csv-1.11.1.jar
-rw-r--r--+ 1 econ econ 108350618 7月  15 17:30
/opt/flink-1.11.1/lib/flink-dist_2.11-1.11.1.jar
-rw-r--r--+ 1 econ econ     94865 7月  15 17:24
/opt/flink-1.11.1/lib/flink-json-1.11.1.jar
-rw-r--r--+ 1 econ econ  58920258 8月  25 10:20
/opt/flink-1.11.1/lib/flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.2-9.0.jar
-rw-r--r--+ 1 econ econ   7712156 6月  18 10:42
/opt/flink-1.11.1/lib/flink-shaded-zookeeper-3.4.14.jar
-rwxrwxr-x+ 1 econ econ  20837927 8月  25 18:04
/opt/flink-1.11.1/lib/flink-sql-connector-elasticsearch6_2.11-1.11.0.jar
-rwxrwxr-x+ 1 econ econ  26053409 8月  25 18:04
/opt/flink-1.11.1/lib/flink-sql-connector-elasticsearch7_2.11-1.11.0.jar
-rwxrwxrwx+ 1 econ econ  38108015 8月  25 18:04
/opt/flink-1.11.1/lib/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar
-rw-r--r--+ 1 econ econ   3541645 8月  26 16:01
/opt/flink-1.11.1/lib/flink-sql-connector-kafka_2.11-1.11.1.jar
-rwxrwxr-x+ 1 econ econ   2100826 9月   5 13:23
/opt/flink-1.11.1/lib/flink-sql-orc_2.11-1.11.0.jar
-rwxrwxr-x+ 1 econ econ   4581283 9月   5 13:23
/opt/flink-1.11.1/lib/flink-sql-parquet_2.11-1.11.0.jar
-rw-r--r--+ 1 econ econ  33327194 7月  15 17:28
/opt/flink-1.11.1/lib/flink-table_2.11-1.11.1.jar
-rw-r--r--+ 1 econ econ  37331759 7月  15 17:28
/opt/flink-1.11.1/lib/flink-table-blink_2.11-1.11.1.jar
-rw-r--r--+ 1 econ econ     67114 4月  20 20:47
/opt/flink-1.11.1/lib/log4j-1.2-api-2.12.1.jar
-rw-r--r--+ 1 econ econ    276771 4月  20 20:47
/opt/flink-1.11.1/lib/log4j-api-2.12.1.jar
-rw-r--r--+ 1 econ econ   1674433 4月  20 20:47
/opt/flink-1.11.1/lib/log4j-core-2.12.1.jar
-rw-r--r--+ 1 econ econ     23518 4月  20 20:47
/opt/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar
-rw-r--r--+ 1 econ econ   1007502 8月  26 11:45
/opt/flink-1.11.1/lib/mysql-connector-java-5.1.47.jar
[econ@dev-hadoop-node-c ~]$


同样的情景,我尝试parquet 格式的插入,就没问题:

create table dest_par (
  i int
) partitioned by (ts string)
stored as parquet
TBLPROPERTIES(
  'parquet.compress' = 'SNAPPY'
);

Flink SQL> insert into dest_par select 132,  '2020-09-08 10:11:00' ;
[INFO] Table update statement has been successfully submitted to the
cluster:
Job ID: dd258840b5cb9c6f12d60291b31dac3f


Flink SQL> select * from dest_par where i = 132 ;
+-----+-------------+----------------------+
| +/- |           i |                   ts |
+-----+-------------+----------------------+
|   + |         132 |  2020-09-08 10:11:00 |
+-----+-------------+----------------------+
Received a total of 1 rows

Flink SQL>

按照最底层的报错信息"Caused by: java.lang.NoClassDefFoundError: Could not initialize
class org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
但是这个类'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
是存在于flink-sql-connector-kafka_2.11-1.11.1.jar。

即使flink运行时加载使用我系统的'hive-exec-2.1.1-cdh6.3.2.jar'(来源于依赖org.apache.hive:hive-exec:2.1.1-cdh6.3.2'),它里面也有这个类'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
.

所以,问题处在哪里呢?





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复