Repository: hive Updated Branches: refs/heads/master a2394c5bf -> 95a1538ae
HIVE-17098 : Race condition in Hbase tables (Oleksiy Sayankin via Zoltan Haindrich) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/95a1538a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/95a1538a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/95a1538a Branch: refs/heads/master Commit: 95a1538aed11d0f027c5fe8ac9d99bde6f69501a Parents: a2394c5 Author: Oleksiy Sayankin <oleksiy.sayan...@gmail.com> Authored: Mon Jul 17 02:39:00 2017 -0700 Committer: Ashutosh Chauhan <hashut...@apache.org> Committed: Tue Mar 27 08:45:35 2018 -0700 ---------------------------------------------------------------------- .../hive/hbase/HiveHBaseTableInputFormat.java | 161 ++++++++----------- 1 file changed, 63 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/95a1538a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 80c6485..069c9b9 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -19,13 +19,13 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -35,36 +35,18 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.TableSplit; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; -import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.lazy.LazyUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -87,71 +69,58 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase implements InputFormat<ImmutableBytesWritable, ResultWritable> { static final Logger LOG = LoggerFactory.getLogger(HiveHBaseTableInputFormat.class); - private static final Object hbaseTableMonitor = new Object(); - private Connection conn = null; + private static final Object HBASE_TABLE_MONITOR = new Object(); - @Override - public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader( - InputSplit split, - JobConf jobConf, - final Reporter reporter) throws IOException { + @Override public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(InputSplit split, + JobConf jobConf, final Reporter reporter) throws IOException { HBaseSplit hbaseSplit = (HBaseSplit) split; TableSplit tableSplit = hbaseSplit.getTableSplit(); - if (conn == null) { - conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf)); - } - initializeTable(conn, tableSplit.getTable()); - - setScan(HiveHBaseInputFormatUtil.getScan(jobConf)); + final org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable, Result> recordReader; Job job = new Job(jobConf); - TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext( - job.getConfiguration(), reporter); + TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(job.getConfiguration(), reporter); - final org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable, Result> - recordReader = createRecordReader(tableSplit, tac); - try { - recordReader.initialize(tableSplit, tac); - } catch (InterruptedException e) { - closeTable(); // Free up the HTable connections - if (conn != null) { + final Connection conn; + + synchronized (HBASE_TABLE_MONITOR) { + conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf)); + initializeTable(conn, tableSplit.getTable()); + setScan(HiveHBaseInputFormatUtil.getScan(jobConf)); + recordReader = createRecordReader(tableSplit, tac); + try { + recordReader.initialize(tableSplit, tac); + } catch (InterruptedException e) { + closeTable(); // Free up the HTable connections conn.close(); - conn = null; + throw new IOException("Failed to initialize RecordReader", e); } - throw new IOException("Failed to initialize RecordReader", e); } return new RecordReader<ImmutableBytesWritable, ResultWritable>() { - @Override - public void close() throws IOException { - recordReader.close(); - closeTable(); - if (conn != null) { + @Override public void close() throws IOException { + synchronized (HBASE_TABLE_MONITOR) { + recordReader.close(); + closeTable(); conn.close(); - conn = null; } } - @Override - public ImmutableBytesWritable createKey() { + @Override public ImmutableBytesWritable createKey() { return new ImmutableBytesWritable(); } - @Override - public ResultWritable createValue() { + @Override public ResultWritable createValue() { return new ResultWritable(new Result()); } - @Override - public long getPos() throws IOException { + @Override public long getPos() throws IOException { return 0; } - @Override - public float getProgress() throws IOException { + @Override public float getProgress() throws IOException { float progress = 0.0F; try { @@ -163,8 +132,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase return progress; } - @Override - public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException { + @Override public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException { boolean next = false; @@ -195,8 +163,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase * * @return converted table split if any */ - private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary) - throws IOException { + private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary) throws IOException { // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL @@ -212,21 +179,19 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase return scan; } - ExprNodeGenericFuncDesc filterExpr = - SerializationUtilities.deserializeExpression(filterExprSerialized); + ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized); String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; ArrayList<TypeInfo> cols = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES)); String colType = cols.get(iKey).getTypeName(); - boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string"); + boolean isKeyComparable = isKeyBinary || "string".equalsIgnoreCase(colType); String tsColName = null; if (iTimestamp >= 0) { tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp]; } - IndexPredicateAnalyzer analyzer = - newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName); + IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName); List<IndexSearchCondition> conditions = new ArrayList<IndexSearchCondition>(); ExprNodeDesc residualPredicate = analyzer.analyzePredicate(filterExpr, conditions); @@ -263,8 +228,8 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase * * @return preconfigured predicate analyzer */ - static IndexPredicateAnalyzer newIndexPredicateAnalyzer( - String keyColumnName, boolean isKeyComparable, String timestampColumn) { + static IndexPredicateAnalyzer newIndexPredicateAnalyzer(String keyColumnName, boolean isKeyComparable, + String timestampColumn) { IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); @@ -273,20 +238,17 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase // We can do other comparisons only if storage format in hbase is either binary // or we are dealing with string types since there lexicographic ordering will suffice. if (isKeyComparable) { - analyzer.addComparisonOp(keyColumnName, - "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual", + analyzer.addComparisonOp(keyColumnName, "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual", "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan", "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan", "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan", "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan"); } else { - analyzer.addComparisonOp(keyColumnName, - "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"); + analyzer.addComparisonOp(keyColumnName, "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"); } if (timestampColumn != null) { - analyzer.addComparisonOp(timestampColumn, - "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual", + analyzer.addComparisonOp(timestampColumn, "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual", "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan", "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan", "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan", @@ -296,10 +258,22 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase return analyzer; } - @Override - public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { - synchronized (hbaseTableMonitor) { - return getSplitsInternal(jobConf, numSplits); + @Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException { + synchronized (HBASE_TABLE_MONITOR) { + final UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + if (ugi == null) { + return getSplitsInternal(jobConf, numSplits); + } + + try { + return ugi.doAs(new PrivilegedExceptionAction<InputSplit[]>() { + @Override public InputSplit[] run() throws IOException { + return getSplitsInternal(jobConf, numSplits); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } } } @@ -311,9 +285,9 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase } String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - if (conn == null) { - conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf)); - } + + Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf)); + TableName tableName = TableName.valueOf(hbaseTableName); initializeTable(conn, tableName); @@ -342,8 +316,8 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase // split per region, the implementation actually takes the scan // definition into account and excludes regions which don't satisfy // the start/stop row conditions (HBASE-1829). - Scan scan = createFilterScan(jobConf, iKey, iTimestamp, - HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, + Scan scan = createFilterScan(jobConf, iKey, iTimestamp, HiveHBaseInputFormatUtil + .getStorageFormatOfKey(keyMapping.mappingSpec, jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); // The list of families that have been added to the scan @@ -360,7 +334,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase scan.addFamily(colMap.familyNameBytes); addedFamilies.add(colMap.familyName); } else { - if(!addedFamilies.contains(colMap.familyName)){ + if (!addedFamilies.contains(colMap.familyName)) { // add the column only if the family has not already been added scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); } @@ -370,11 +344,10 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase Job job = new Job(jobConf); JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); - Path [] tablePaths = FileInputFormat.getInputPaths(jobContext); + Path[] tablePaths = FileInputFormat.getInputPaths(jobContext); - List<org.apache.hadoop.mapreduce.InputSplit> splits = - super.getSplits(jobContext); - InputSplit [] results = new InputSplit[splits.size()]; + List<org.apache.hadoop.mapreduce.InputSplit> splits = super.getSplits(jobContext); + InputSplit[] results = new InputSplit[splits.size()]; for (int i = 0; i < splits.size(); i++) { results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]); @@ -383,21 +356,13 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase return results; } finally { closeTable(); - if (conn != null) { - conn.close(); - conn = null; - } + conn.close(); } } - @Override - protected void finalize() throws Throwable { + @Override protected void finalize() throws Throwable { try { closeTable(); - if (conn != null) { - conn.close(); - conn = null; - } } finally { super.finalize(); }