[ 
https://issues.apache.org/jira/browse/HIVE-15658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Klempa updated HIVE-15658:
---------------------------------
    Description: 
Method start() in hive.ql.session.SessionState is supposed to setup needed 
preconditions, like HDFS scratch directories for session.
This happens to be not an atomic operation with setting thread local variable, 
which can later be obtained by calling SessionState.get().
Therefore, even is the start() method itself fails, the SessionState.get() does 
not return null and further re-use of the thread which previously invoked 
start() may lead to obtaining SessionState object in inconsistent state.

I have observed this using Flume Hive Sink, which uses Hive Streaming 
interface. When the directory /tmp/hive is not writable by session user, the 
start() method fails (throwing RuntimeException). If the thread is re-used 
(like it is in Flume), further executions work with wrongly initialized 
SessionState object (HDFS dirs are non-existent). In Flume, this happens to me 
when Flume should create partition if not exists (but the code doing this is in 
Hive Streaming).

Steps to reproduce:
0. create test spooldir and allow flume to write to it, in my case 
/home/ubuntu/flume_test, 775, ubuntu:flume
1. create Flume config (see attachment)
2. create Hive table
{code}
create table default.flume_test (column1 string, column2 string) partitioned by 
(dt string) clustered by (column1) INTO 2 BUCKETS STORED AS ORC;
{code}
3. start flume agent:
{code}
bin/flume-ng agent -n a1 -c conf -f conf/flume-config.txt
{code}
4. hdfs dfs -chmod 600 /tmp/hive
5. put this file into spooldir:
{code}
echo value1,value2 > file1
{code}

Expected behavior:
Exception regarding scratch dir permissions to be thrown repeatedly.
example (note that the line numbers are wrong as Cloudera is cloning the source 
codes here https://github.com/cloudera/flume-ng/ and here 
https://github.com/cloudera/hive):
{code}
2017-01-18 12:39:38,926 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 : 
Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
database='default', table='flume_test', partitionVals=[20170118] }
org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to 
EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', 
table='flume_test', partitionVals=[20170118] } 
        at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
        at 
org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
        at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed 
connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
database='default', table='flume_test', partitionVals=[20170118] }
        at 
org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:380)
        at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86)
        ... 6 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: The root 
scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: 
rw-------
        at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:540)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:358)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
        at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
        at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        ... 1 more
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS 
should be writable. Current permissions are: rw-------
        at 
org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
        at 
org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
        at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
        ... 13 more
{code}

Actual behavior:
Exception regarding scratch dir permissions thrown once, meaningless exceptions 
from code, which should be unreachable, are re-thrown again and again, 
obfuscating the
source of the problem to the user.
exceptions thrown repeatedly:
{code}
java.lang.NullPointerException: Non-local session path expected to be non-null
        at 
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
        at 
org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
        at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
        at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
        at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
        at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
        at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
        at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
        at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
        at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{code}
{code}
2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 : 
Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
database='default', table='flume_test', partitionVals=[20170118] }
org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to 
EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', 
table='flume_test', partitionVals=[20170118] }
        at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
        at 
org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
        at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition 
values=[20170118]. Unable to get path for end point: [20170118]
        at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
        at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
        at 
org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
        at 
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
        at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
        ... 6 more
Caused by: NoSuchObjectException(message:partition values=[20170118])
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
        at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
        at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
        ... 10 more
{code}

Detailed description on whats going on:
Flume, as the Hive Streaming client, does the streaming in the HiveSink class, 
main part is done on line
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L253
where one "Batch" is drained (batch in sense of flume batch of incoming 
messages from channel).
Main for loop for batch drain is:
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L282
Flume creates hive endpoint for each line it tries to insert into Hive 
(https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L290),
 not very effective, but, the .equals in HiveEndPoint is properly written, so 
everything works.
Then, it creates the helper HiveWriter 
(https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L295),
 which
is cached - one for each HiveEndPoint, if no HiveWriter for endpoint exists, it 
is created on line
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L343

Inspecting the constructor of HiveWriter, brings us to creating new connection 
to Hive using the Streaming API:
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86
The connection is created in a separate thread:
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L376
as the submitted Future 
(https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L425)
into the thread pool callTimeoutPool (the pool comes from HiveWriter 
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L493
 and is of constant size 1, which seems like Flume is using 1 thread per Hive 
Sink to talk with Hive.

When creating newConnection 
(https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379),
with the request of autoCreatePartitions=true, the HiveEndPoint, the entry 
point to Hive Streaming is called : 
https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L105
As I was testing non-authenticated, it boils to 
https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L192
and finally to 
https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L215

Constructor for inner private class ConnectionImpl then tries to create 
partition if it not exists, on the line 
https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L318
And the trouble starts in method createPartitionIfNotExists on line
https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L455
as the SessionState.get() returns null - we did not started the session yet, we 
try to create a new one.
In SessionState.start() first thing done is registering the object itself as 
the threadlocal variable:
https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526

Thereafter, the directories (scratchdir and subdirs) are tried to be created:
https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L548
but if this fails, the RuntimeException (from 
https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L619
 and 
https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L677)
 is not caught in the catch blocks (nor there is any finally block).

So basically, SessionState.start() has failed with proper initialization (e.g. 
HDFS dirs are not created, nor is the SessionState.hdfsSessionPath set to 
non-null) and yet the execution continues.
With RuntimeException thrown from .start() method, the caller (HiveEndPoint) 
propagates the exception back to the HiveWriter 
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379

The exception is caught 
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L442
 but handled only as do logging and go on: 
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L456
This is the moment this exception is logged:
{code}
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS 
should be writable. Current permissions are: rw-------
        at 
org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
        at 
org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
        at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
        ... 13 more
{code}

What happens next? Flume re-runs the delivery, calling HiveSink.process, 
boiling into newConnection again. But Flume uses the SAME and exact one thread 
it used before to do this.
This time, the if clause: 
https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L454
returns true, as the SessionState.get() return the incorrectly initialized 
SessionState from previous attempt.
Then, it goes into 
https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L466
 and down to the 
https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L738
 which fails on null value of hdfsSessionPath in SessionState.

But this RuntimeException (NullPointerException) is not caught by 
https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L470
 and so it is logged:
{code}
2017-01-18 12:39:44,194 ERROR org.apache.hadoop.hive.ql.Driver: FAILED: 
NullPointerException Non-local session path expected to be non-null
java.lang.NullPointerException: Non-local session path expected to be non-null
        at 
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
        at 
org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
        at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
        at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
        at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
        at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
        at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
        at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
        at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
        at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{code}

Sometimes, Flume manages to run through the 
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86
 as the newConnection is created in separate thread, the Flume rushes into 
https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L89
 creating another meaningless exception:
{code}
2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 : 
Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
database='default', table='flume_test', partitionVals=[20170118] }
org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to 
EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', 
table='flume_test', partitionVals=[20170118] }
        at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
        at 
org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
        at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
        at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition 
values=[20170118]. Unable to get path for end point: [20170118]
        at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
        at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
        at 
org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
        at 
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
        at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
        ... 6 more
Caused by: NoSuchObjectException(message:partition values=[20170118])
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
        at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
        at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
        at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
        ... 10 more
{code}

Proposing solution:
If Hive Streaming API is allowed to be used with same thread again (which 
probably is), then the threadlocal set in 
https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526
has to be unset in case of any exception in proceeding blocks:
https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L539
so set the thread local back to null before rethrowing exceptions here:
https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L568
and here:
https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L602

Links to source codes are from latest version, although I have been doing 
testing on Hive 1.1.0. From code, it seems like
bug has to be present also in recent versions.

If Hive Streaming API is not allowed to be called by reusing threads, then not 
only Flume, but probably also NiFi client 
(https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java#L237)
 has to be fixed (well, NiFi just copy&pasted the Flume codebase, is there any 
other copy of this HiveWriter out there?).


  was:
Method start() in hive.ql.session.SessionState is supposed to setup needed 
preconditions, like HDFS scratch directories for session.
This happens to be not an atomic operation with setting thread local variable, 
which can later be obtained by calling SessionState.get().
Therefore, even is the start() method itself fails, the SessionState.get() does 
not return null and further re-use of the thread which previously invoked 
start() may lead to obtaining SessionState object in inconsistent state.

I have observed this using Flume Hive Sink, which uses Hive Streaming 
interface. When the directory /tmp/hive is not writable by session user, the 
start() method fails (throwing RuntimeException). If the thread is re-used 
(like it is in Flume), further executions work with wrongly initialized 
SessionState object (HDFS dirs are non-existent).

Steps to reproduce:
1. hdfs dfs -chmod 600 /tmp/hive
2. create Flume
First 


> hive.ql.session.SessionState start() is not atomic, SessionState thread local 
> variable can get into inconsistent state
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-15658
>                 URL: https://issues.apache.org/jira/browse/HIVE-15658
>             Project: Hive
>          Issue Type: Bug
>          Components: API, HCatalog
>    Affects Versions: 1.1.0, 1.2.1, 2.0.0, 2.0.1
>         Environment: CDH5.8.0, Flume 1.6.0, Hive 1.1.0
>            Reporter: Michal Klempa
>
> Method start() in hive.ql.session.SessionState is supposed to setup needed 
> preconditions, like HDFS scratch directories for session.
> This happens to be not an atomic operation with setting thread local 
> variable, which can later be obtained by calling SessionState.get().
> Therefore, even is the start() method itself fails, the SessionState.get() 
> does not return null and further re-use of the thread which previously 
> invoked start() may lead to obtaining SessionState object in inconsistent 
> state.
> I have observed this using Flume Hive Sink, which uses Hive Streaming 
> interface. When the directory /tmp/hive is not writable by session user, the 
> start() method fails (throwing RuntimeException). If the thread is re-used 
> (like it is in Flume), further executions work with wrongly initialized 
> SessionState object (HDFS dirs are non-existent). In Flume, this happens to 
> me when Flume should create partition if not exists (but the code doing this 
> is in Hive Streaming).
> Steps to reproduce:
> 0. create test spooldir and allow flume to write to it, in my case 
> /home/ubuntu/flume_test, 775, ubuntu:flume
> 1. create Flume config (see attachment)
> 2. create Hive table
> {code}
> create table default.flume_test (column1 string, column2 string) partitioned 
> by (dt string) clustered by (column1) INTO 2 BUCKETS STORED AS ORC;
> {code}
> 3. start flume agent:
> {code}
> bin/flume-ng agent -n a1 -c conf -f conf/flume-config.txt
> {code}
> 4. hdfs dfs -chmod 600 /tmp/hive
> 5. put this file into spooldir:
> {code}
> echo value1,value2 > file1
> {code}
> Expected behavior:
> Exception regarding scratch dir permissions to be thrown repeatedly.
> example (note that the line numbers are wrong as Cloudera is cloning the 
> source codes here https://github.com/cloudera/flume-ng/ and here 
> https://github.com/cloudera/hive):
> {code}
> 2017-01-18 12:39:38,926 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 
> : Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
> database='default', table='flume_test', partitionVals=[20170118] }
> org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to 
> EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', 
> table='flume_test', partitionVals=[20170118] } 
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>         at 
> org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
>         at 
> org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed 
> connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
> database='default', table='flume_test', partitionVals=[20170118] }
>         at 
> org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:380)
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86)
>         ... 6 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: The root 
> scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: 
> rw-------
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:540)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:358)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
>         at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         ... 1 more
> Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on 
> HDFS should be writable. Current permissions are: rw-------
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
>         ... 13 more
> {code}
> Actual behavior:
> Exception regarding scratch dir permissions thrown once, meaningless 
> exceptions from code, which should be unreachable, are re-thrown again and 
> again, obfuscating the
> source of the problem to the user.
> exceptions thrown repeatedly:
> {code}
> java.lang.NullPointerException: Non-local session path expected to be non-null
>         at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
>         at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
>         at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
>         at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> 2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 
> : Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
> database='default', table='flume_test', partitionVals=[20170118] }
> org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to 
> EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', 
> table='flume_test', partitionVals=[20170118] }
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>         at 
> org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
>         at 
> org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition 
> values=[20170118]. Unable to get path for end point: [20170118]
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
>         at 
> org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
>         at 
> org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
>         ... 6 more
> Caused by: NoSuchObjectException(message:partition values=[20170118])
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
>         at 
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
>         at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
>         ... 10 more
> {code}
> Detailed description on whats going on:
> Flume, as the Hive Streaming client, does the streaming in the HiveSink 
> class, main part is done on line
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L253
> where one "Batch" is drained (batch in sense of flume batch of incoming 
> messages from channel).
> Main for loop for batch drain is:
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L282
> Flume creates hive endpoint for each line it tries to insert into Hive 
> (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L290),
>  not very effective, but, the .equals in HiveEndPoint is properly written, so 
> everything works.
> Then, it creates the helper HiveWriter 
> (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L295),
>  which
> is cached - one for each HiveEndPoint, if no HiveWriter for endpoint exists, 
> it is created on line
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L343
> Inspecting the constructor of HiveWriter, brings us to creating new 
> connection to Hive using the Streaming API:
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86
> The connection is created in a separate thread:
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L376
> as the submitted Future 
> (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L425)
> into the thread pool callTimeoutPool (the pool comes from HiveWriter 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L493
>  and is of constant size 1, which seems like Flume is using 1 thread per Hive 
> Sink to talk with Hive.
> When creating newConnection 
> (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379),
> with the request of autoCreatePartitions=true, the HiveEndPoint, the entry 
> point to Hive Streaming is called : 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L105
> As I was testing non-authenticated, it boils to 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L192
> and finally to 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L215
> Constructor for inner private class ConnectionImpl then tries to create 
> partition if it not exists, on the line 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L318
> And the trouble starts in method createPartitionIfNotExists on line
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L455
> as the SessionState.get() returns null - we did not started the session yet, 
> we try to create a new one.
> In SessionState.start() first thing done is registering the object itself as 
> the threadlocal variable:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526
> Thereafter, the directories (scratchdir and subdirs) are tried to be created:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L548
> but if this fails, the RuntimeException (from 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L619
>  and 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L677)
>  is not caught in the catch blocks (nor there is any finally block).
> So basically, SessionState.start() has failed with proper initialization 
> (e.g. HDFS dirs are not created, nor is the SessionState.hdfsSessionPath set 
> to non-null) and yet the execution continues.
> With RuntimeException thrown from .start() method, the caller (HiveEndPoint) 
> propagates the exception back to the HiveWriter 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379
> The exception is caught 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L442
>  but handled only as do logging and go on: 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L456
> This is the moment this exception is logged:
> {code}
> Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on 
> HDFS should be writable. Current permissions are: rw-------
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
>         ... 13 more
> {code}
> What happens next? Flume re-runs the delivery, calling HiveSink.process, 
> boiling into newConnection again. But Flume uses the SAME and exact one 
> thread it used before to do this.
> This time, the if clause: 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L454
> returns true, as the SessionState.get() return the incorrectly initialized 
> SessionState from previous attempt.
> Then, it goes into 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L466
>  and down to the 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L738
>  which fails on null value of hdfsSessionPath in SessionState.
> But this RuntimeException (NullPointerException) is not caught by 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L470
>  and so it is logged:
> {code}
> 2017-01-18 12:39:44,194 ERROR org.apache.hadoop.hive.ql.Driver: FAILED: 
> NullPointerException Non-local session path expected to be non-null
> java.lang.NullPointerException: Non-local session path expected to be non-null
>         at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
>         at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
>         at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
>         at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> Sometimes, Flume manages to run through the 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86
>  as the newConnection is created in separate thread, the Flume rushes into 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L89
>  creating another meaningless exception:
> {code}
> 2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 
> : Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
> database='default', table='flume_test', partitionVals=[20170118] }
> org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to 
> EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', 
> table='flume_test', partitionVals=[20170118] }
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>         at 
> org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
>         at 
> org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition 
> values=[20170118]. Unable to get path for end point: [20170118]
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
>         at 
> org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
>         at 
> org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
>         ... 6 more
> Caused by: NoSuchObjectException(message:partition values=[20170118])
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
>         at 
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
>         at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
>         ... 10 more
> {code}
> Proposing solution:
> If Hive Streaming API is allowed to be used with same thread again (which 
> probably is), then the threadlocal set in 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526
> has to be unset in case of any exception in proceeding blocks:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L539
> so set the thread local back to null before rethrowing exceptions here:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L568
> and here:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L602
> Links to source codes are from latest version, although I have been doing 
> testing on Hive 1.1.0. From code, it seems like
> bug has to be present also in recent versions.
> If Hive Streaming API is not allowed to be called by reusing threads, then 
> not only Flume, but probably also NiFi client 
> (https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java#L237)
>  has to be fixed (well, NiFi just copy&pasted the Flume codebase, is there 
> any other copy of this HiveWriter out there?).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to