I forgot to include the version information. I'm currently using Flume 1.5.2 from HDP 2.4.2.
Looking at the changelog of Flume 1.6.0, the latest version, there seems to be some improvements for Hive support. This makes me wondering - does Flume 1.5.2 support Hive streaming to ACID table? Best regards, Thanh Hong. From: Thanh Hong Dai [mailto:[email protected]] Sent: Tuesday, 5 July, 2016 11:47 AM To: [email protected] Subject: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed() Does anyone knows the cause of this exception when using Hive Sink, and how to fix it? The Hive Sink managed to write data in the Hive table for a few minutes (which I can confirm by querying the table), but then it shows the Exception below in the log file (/var/log/flume/flume-<streamname>.log) for all the nodes. 05 Jul 2016 04:24:22,737 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804] } has been closed() at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java :68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs Closed(HiveEndPoint.java:690) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H iveEndPoint.java:729) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H iveEndPoint.java:686) at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe xtSerializer.java:48) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155) at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11 42) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6 17) ... 1 more 05 Jul 2016 04:24:27,891 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804] } has been closed() at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java :68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs Closed(HiveEndPoint.java:690) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H iveEndPoint.java:729) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H iveEndPoint.java:686) at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe xtSerializer.java:48) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155) at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11 42) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6 17) ... 1 more My flume.conf file: # acidstream - streaming data from Kafka into Hive transactional table acidstream.sources = kafka-source acidstream.sinks = hive-sink acidstream.channels = gutter acidstream.sources.kafka-source.channels = gutter acidstream.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource acidstream.sources.kafka-source.zookeeperConnect = chdhost125.vitaldev.tma.com.vn:2181,chdhost27.vitaldev.tma.com.vn:2181,chdho st185.vitaldev.tma.com.vn:2181 acidstream.sources.kafka-source.topic = lan acidstream.sources.kafka-source.groupId = acid acidstream.sources.kafka-source.batchSize = 10000 acidstream.sources.kafka-source.batchDurationMillis = 60000 acidstream.sources.kafka-source.kafka.consumer.timeout.ms = 200 acidstream.sources.kafka-source.interceptors = i1 acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor acidstream.sources.kafka-source.interceptors.i1.regex = ^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2} <file:///\\d%7b4%7d-\d%7b2%7d-\d%7b2%7d\s\d%7b2%7d:\d%7b2%7d:\d%7b2%7d> ) acidstream.sources.kafka-source.interceptors.i1.serializers = s1 acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name = timestamp acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm:ss acidstream.sinks.hive-sink.channel = gutter acidstream.sinks.hive-sink.type = hive acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083 acidstream.sinks.hive-sink.hive.database = default acidstream.sinks.hive-sink.hive.table = acid acidstream.sinks.hive-sink.hive.partition = %m%d acidstream.sinks.hive-sink.heartBeatInterval = 10 acidstream.sinks.hive-sink.useLocalTimeStamp = false acidstream.sinks.hive-sink.round = false acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000 acidstream.sinks.hive-sink.batchSize = 10000 acidstream.sinks.hive-sink.callTimeout = 30000 acidstream.sinks.hive-sink.serializer = DELIMITED acidstream.sinks.hive-sink.serializer.delimiter = "\t" acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t' acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data acidstream.channels.gutter.type = memory acidstream.channels.gutter.capacity = 100000 acidstream.channels.gutter.transactionCapacity = 50000 My flume-env file has this line added: export JAVA_OPTS="-Xms100m -Xmx3g" My table on Hive has the following properties: PARTITIONED BY (md string) CLUSTERED BY (id) INTO 10 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional' = 'true'); Hive has Tez engine set as the default execution engine. Could this error be caused by low number of threads? (NameNode has 100 server threads available) Best regards, Thanh Hong.
