Does anyone knows the cause of this exception when using Hive Sink, and how
to fix it?
The Hive Sink managed to write data in the Hive table for a few minutes
(which I can confirm by querying the table), but then it shows the Exception
below in the log file (/var/log/flume/flume-<streamname>.log) for all the
nodes.
05 Jul 2016 04:24:22,737 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver
event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java
:68)
at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()
at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs
Closed(HiveEndPoint.java:690)
at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:729)
at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:686)
at
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe
xtSerializer.java:48)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)
at
org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)
... 1 more
05 Jul 2016 04:24:27,891 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver
event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java
:68)
at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()
at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs
Closed(HiveEndPoint.java:690)
at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:729)
at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:686)
at
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe
xtSerializer.java:48)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)
at
org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)
... 1 more
My flume.conf file:
# acidstream - streaming data from Kafka into Hive transactional table
acidstream.sources = kafka-source
acidstream.sinks = hive-sink
acidstream.channels = gutter
acidstream.sources.kafka-source.channels = gutter
acidstream.sources.kafka-source.type =
org.apache.flume.source.kafka.KafkaSource
acidstream.sources.kafka-source.zookeeperConnect =
chdhost125.vitaldev.tma.com.vn:2181,chdhost27.vitaldev.tma.com.vn:2181,chdho
st185.vitaldev.tma.com.vn:2181
acidstream.sources.kafka-source.topic = lan
acidstream.sources.kafka-source.groupId = acid
acidstream.sources.kafka-source.batchSize = 10000
acidstream.sources.kafka-source.batchDurationMillis = 60000
acidstream.sources.kafka-source.kafka.consumer.timeout.ms = 200
acidstream.sources.kafka-source.interceptors = i1
acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor
acidstream.sources.kafka-source.interceptors.i1.regex =
^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2})
acidstream.sources.kafka-source.interceptors.i1.serializers = s1
acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type =
org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name =
timestamp
acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern =
yyyy-MM-dd HH:mm:ss
acidstream.sinks.hive-sink.channel = gutter
acidstream.sinks.hive-sink.type = hive
acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083
acidstream.sinks.hive-sink.hive.database = default
acidstream.sinks.hive-sink.hive.table = acid
acidstream.sinks.hive-sink.hive.partition = %m%d
acidstream.sinks.hive-sink.heartBeatInterval = 10
acidstream.sinks.hive-sink.useLocalTimeStamp = false
acidstream.sinks.hive-sink.round = false
acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000
acidstream.sinks.hive-sink.batchSize = 10000
acidstream.sinks.hive-sink.callTimeout = 30000
acidstream.sinks.hive-sink.serializer = DELIMITED
acidstream.sinks.hive-sink.serializer.delimiter = "\t"
acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t'
acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data
acidstream.channels.gutter.type = memory
acidstream.channels.gutter.capacity = 100000
acidstream.channels.gutter.transactionCapacity = 50000
My flume-env file has this line added:
export JAVA_OPTS="-Xms100m -Xmx3g"
My table on Hive has the following properties:
PARTITIONED BY (md string)
CLUSTERED BY (id) INTO 10 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
Hive has Tez engine set as the default execution engine.
Could this error be caused by low number of threads? (NameNode has 100
server threads available)
Best regards,
Thanh Hong.