DRILL-4199: Add Support for HBase 1.X Highlights of the changes:
* Replaced the old HBase APIs (HBaseAdmin/HTable) with the new HBase 1.1 APIs (Connection/Admin/Table). * Added HBaseConnectionManager class which which manages the life-cycle of HBase connections inside a Drillbit process. * Updated HBase dependencies version to 1.1.3 and 1.1.1-mapr-1602-m7-5.1.0 for default and "mapr" profiles respectively. * Added `commons-logging` dependency in the `provided` scope to allow HBase test cluster to come up for Unit tests. * Relaxed banned dependency rule for `commons-logging` library for `storage-hbase` module alone, in provided scope only. * Removed the use of many deprecated APIs throughout the modules code. * Added some missing test to HBase storage plugin's test suit. * Move the GuavaPatcher code to main code execution path. * Log a message if GuavaPatcher fails instead of exiting. All unit tests are green. Closes #443 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c2d9959e Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c2d9959e Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c2d9959e Branch: refs/heads/master Commit: c2d9959e07f47a09a4a11c250f84f4874b7e1db4 Parents: 6286c0a Author: Aditya Kishore <a...@apache.org> Authored: Sun Jun 12 16:28:52 2016 -0700 Committer: Aditya Kishore <a...@apache.org> Committed: Tue Jun 14 13:37:20 2016 -0700 ---------------------------------------------------------------------- contrib/storage-hbase/pom.xml | 30 ++ .../conv/OrderedBytesBigIntConvertFrom.java | 2 +- .../impl/conv/OrderedBytesBigIntConvertTo.java | 2 +- .../conv/OrderedBytesBigIntDescConvertTo.java | 2 +- .../conv/OrderedBytesDoubleConvertFrom.java | 2 +- .../impl/conv/OrderedBytesDoubleConvertTo.java | 2 +- .../conv/OrderedBytesDoubleDescConvertTo.java | 2 +- .../impl/conv/OrderedBytesFloatConvertFrom.java | 2 +- .../impl/conv/OrderedBytesFloatConvertTo.java | 2 +- .../conv/OrderedBytesFloatDescConvertTo.java | 2 +- .../impl/conv/OrderedBytesIntConvertFrom.java | 2 +- .../fn/impl/conv/OrderedBytesIntConvertTo.java | 2 +- .../impl/conv/OrderedBytesIntDescConvertTo.java | 2 +- .../store/hbase/CompareFunctionsProcessor.java | 10 +- .../drill/exec/store/hbase/DrillHBaseTable.java | 17 +- .../store/hbase/HBaseConnectionManager.java | 109 ++++++ .../exec/store/hbase/HBaseFilterBuilder.java | 10 +- .../drill/exec/store/hbase/HBaseGroupScan.java | 29 +- .../store/hbase/HBasePushFilterIntoScan.java | 9 +- .../exec/store/hbase/HBaseRecordReader.java | 26 +- .../exec/store/hbase/HBaseScanBatchCreator.java | 2 +- .../exec/store/hbase/HBaseSchemaFactory.java | 8 +- .../exec/store/hbase/HBaseStoragePlugin.java | 93 ++++- .../drill/exec/store/hbase/HBaseSubScan.java | 4 +- .../exec/store/hbase/TableStatsCalculator.java | 32 +- .../hbase/config/HBasePersistentStore.java | 59 +-- .../config/HBasePersistentStoreProvider.java | 46 +-- .../org/apache/drill/hbase/BaseHBaseTest.java | 15 +- .../org/apache/drill/hbase/GuavaPatcher.java | 90 ----- .../drill/hbase/HBaseRecordReaderTest.java | 6 +- .../org/apache/drill/hbase/HBaseTestsSuite.java | 92 +++-- .../drill/hbase/TestHBaseConnectionManager.java | 58 +++ .../drill/hbase/TestHBaseFilterPushDown.java | 1 - .../apache/drill/hbase/TestHBaseQueries.java | 52 ++- .../hbase/TestOrderedBytesConvertFunctions.java | 22 -- .../apache/drill/hbase/TestTableGenerator.java | 372 +++++++++---------- ...30StorageHBaseHamcrestConfigurationTest.java | 8 +- .../src/test/resources/logback.xml | 10 +- .../org/apache/drill/exec/server/Drillbit.java | 7 + .../apache/drill/exec/util/GuavaPatcher.java | 89 +++++ pom.xml | 10 +- 41 files changed, 806 insertions(+), 534 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/pom.xml b/contrib/storage-hbase/pom.xml index fecb5ba..be38dd8 100644 --- a/contrib/storage-hbase/pom.xml +++ b/contrib/storage-hbase/pom.xml @@ -77,6 +77,14 @@ <version>2.1.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>commons-logging</groupId> + <!-- Needed by HBase test cluster, set to provided + scope to avoid dependency propagation --> + <artifactId>commons-logging</artifactId> + <version>1.2</version> + <scope>provided</scope> + </dependency> </dependencies> <build> @@ -130,6 +138,28 @@ </execution> </executions> </plugin> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>avoid_bad_dependencies</id> + <phase>verify</phase> + <goals> + <goal>enforce</goal> + </goals> + <configuration combine.self="override"> + <rules> + <bannedDependencies> + <includes> + <!-- Needed by HBase test cluster --> + <include>commons-logging:commons-logging:*:jar:provided</include> + </includes> + </bannedDependencies> + </rules> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertFrom.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertFrom.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertFrom.java index 3b8391d..fdef364 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertFrom.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertFrom.java @@ -40,7 +40,7 @@ public class OrderedBytesBigIntConvertFrom implements DrillSimpleFunc { @Override public void setup() { bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertTo.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertTo.java index d012531..16dfa14 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesBigIntConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(9); bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntDescConvertTo.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntDescConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntDescConvertTo.java index 463483c..1614524 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntDescConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesBigIntDescConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesBigIntDescConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(9); bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertFrom.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertFrom.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertFrom.java index b2ae268..6fbd046 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertFrom.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertFrom.java @@ -40,7 +40,7 @@ public class OrderedBytesDoubleConvertFrom implements DrillSimpleFunc { @Override public void setup() { bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertTo.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertTo.java index d90b620..a0276ce 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesDoubleConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(9); bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleDescConvertTo.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleDescConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleDescConvertTo.java index 944b1d1..29122c7 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleDescConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesDoubleDescConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesDoubleDescConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(9); bytes = new byte[9]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertFrom.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertFrom.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertFrom.java index a66e580..11cff0f 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertFrom.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertFrom.java @@ -40,7 +40,7 @@ public class OrderedBytesFloatConvertFrom implements DrillSimpleFunc { @Override public void setup() { bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertTo.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertTo.java index e41469c..2aa37ff 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesFloatConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(5); bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatDescConvertTo.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatDescConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatDescConvertTo.java index 5c40e79..318c9b3 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatDescConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesFloatDescConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesFloatDescConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(5); bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertFrom.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertFrom.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertFrom.java index 6c15947..60a660a 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertFrom.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertFrom.java @@ -40,7 +40,7 @@ public class OrderedBytesIntConvertFrom implements DrillSimpleFunc { @Override public void setup() { bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertTo.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertTo.java index d703318..63e1570 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesIntConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(5); bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntDescConvertTo.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntDescConvertTo.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntDescConvertTo.java index 6ed4fbf..0835328 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntDescConvertTo.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/OrderedBytesIntDescConvertTo.java @@ -45,7 +45,7 @@ public class OrderedBytesIntDescConvertTo implements DrillSimpleFunc { public void setup() { buffer = buffer.reallocIfNeeded(5); bytes = new byte[5]; - br = new org.apache.hadoop.hbase.util.SimplePositionedByteRange(); + br = new org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java index 2527e8d..09e7be7 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java @@ -40,7 +40,7 @@ import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; import org.apache.drill.common.expression.visitors.AbstractExprVisitor; import org.apache.hadoop.hbase.util.Order; import org.apache.hadoop.hbase.util.PositionedByteRange; -import org.apache.hadoop.hbase.util.SimplePositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.filter.Filter; @@ -254,7 +254,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "DOUBLE_OBD": if (valueArg instanceof DoubleExpression) { bb = newByteBuf(9, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br, ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING); @@ -269,7 +269,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "FLOAT_OBD": if (valueArg instanceof FloatExpression) { bb = newByteBuf(5, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br, ((FloatExpression)valueArg).getFloat(), Order.DESCENDING); @@ -284,7 +284,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "BIGINT_OBD": if (valueArg instanceof LongExpression) { bb = newByteBuf(9, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br, ((LongExpression)valueArg).getLong(), Order.DESCENDING); @@ -299,7 +299,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr case "INT_OBD": if (valueArg instanceof IntExpression) { bb = newByteBuf(5, true); - PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5); if (encodingType.endsWith("_OBD")) { org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br, ((IntExpression)valueArg).getInt(), Order.DESCENDING); http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java index e98aed2..b916ae7 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java @@ -21,24 +21,25 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Set; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.sql.type.SqlTypeName; public class DrillHBaseTable extends DrillTable implements DrillHBaseConstants { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillHBaseTable.class); - private HTableDescriptor table; + private HTableDescriptor tableDesc; public DrillHBaseTable(String storageEngineName, HBaseStoragePlugin plugin, HBaseScanSpec scanSpec) { super(storageEngineName, plugin, scanSpec); - try(HBaseAdmin admin = new HBaseAdmin(plugin.getConfig().getHBaseConf())) { - table = admin.getTableDescriptor(HBaseUtils.getBytes(scanSpec.getTableName())); + try(Admin admin = plugin.getConnection().getAdmin()) { + tableDesc = admin.getTableDescriptor(TableName.valueOf(scanSpec.getTableName())); } catch (IOException e) { throw UserException.dataReadError() .message("Failure while loading table %s in database %s.", scanSpec.getTableName(), storageEngineName) @@ -55,7 +56,7 @@ public class DrillHBaseTable extends DrillTable implements DrillHBaseConstants { fieldNameList.add(ROW_KEY); typeList.add(typeFactory.createSqlType(SqlTypeName.ANY)); - Set<byte[]> families = table.getFamiliesKeys(); + Set<byte[]> families = tableDesc.getFamiliesKeys(); for (byte[] family : families) { fieldNameList.add(Bytes.toString(family)); typeList.add(typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), typeFactory.createSqlType(SqlTypeName.ANY))); http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java new file mode 100644 index 0000000..2dd0673 --- /dev/null +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hbase; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.hbase.HBaseStoragePlugin.HBaseConnectionKey; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.UncheckedExecutionException; + +/** + * <p>A singleton class which manages the lifecycle of HBase connections.</p> + * <p>One connection per storage plugin instance is maintained.</p> + */ +public final class HBaseConnectionManager + extends CacheLoader<HBaseConnectionKey, Connection> implements RemovalListener<HBaseConnectionKey, Connection> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseConnectionManager.class); + + public static final HBaseConnectionManager INSTANCE = new HBaseConnectionManager(); + + private final LoadingCache<HBaseConnectionKey, Connection> connectionCache; + + private HBaseConnectionManager() { + this.connectionCache = CacheBuilder.newBuilder() + .expireAfterAccess(1, TimeUnit.HOURS) // Connections will be closed after 1 hour of inactivity + .removalListener(this) + .build(this); + } + + private boolean isValid(Connection conn) { + return conn != null + && !conn.isAborted() + && !conn.isClosed(); + } + + @Override + public Connection load(HBaseConnectionKey key) throws Exception { + Connection connection = ConnectionFactory.createConnection(key.getHBaseConf()); + logger.info("HBase connection '{}' created.", connection); + return connection; + } + + @Override + public void onRemoval(RemovalNotification<HBaseConnectionKey, Connection> notification) { + try { + Connection conn = notification.getValue(); + if (isValid(conn)) { + conn.close(); + } + logger.info("HBase connection '{}' closed.", conn); + } catch (Throwable t) { + logger.warn("Error while closing HBase connection.", t); + } + } + + public Connection getConnection(HBaseConnectionKey key) { + checkNotNull(key); + try { + Connection conn = connectionCache.get(key); + if (!isValid(conn)) { + key.lock(); // invalidate the connection with a per storage plugin lock + try { + conn = connectionCache.get(key); + if (!isValid(conn)) { + connectionCache.invalidate(key); + conn = connectionCache.get(key); + } + } finally { + key.unlock(); + } + } + return conn; + } catch (ExecutionException | UncheckedExecutionException e) { + throw UserException.dataReadError(e.getCause()).build(logger); + } + } + + public void closeConnection(HBaseConnectionKey key) { + checkNotNull(key); + connectionCache.invalidate(key); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java index 0e25fa6..394e128 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.NullComparator; -import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; @@ -332,20 +331,19 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, return null; } -private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call, - CompareFunctionsProcessor processor) { + private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call, CompareFunctionsProcessor processor) { byte[] startRow = processor.getRowKeyPrefixStartRow(); byte[] stopRow = processor.getRowKeyPrefixStopRow(); Filter filter = processor.getRowKeyPrefixFilter(); if (startRow != HConstants.EMPTY_START_ROW || - stopRow != HConstants.EMPTY_END_ROW || - filter != null) { + stopRow != HConstants.EMPTY_END_ROW || + filter != null) { return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter); } // else return null; -} + } } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index ae243b3..e474c11 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -33,7 +33,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; -import com.fasterxml.jackson.annotation.JsonCreator; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -49,11 +48,16 @@ import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; @@ -146,28 +150,29 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst private void init() { logger.debug("Getting region locations"); - try { - HTable table = new HTable(storagePluginConfig.getHBaseConf(), hbaseScanSpec.getTableName()); - this.hTableDesc = table.getTableDescriptor(); - NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); - statsCalculator = new TableStatsCalculator(table, hbaseScanSpec, storagePlugin.getContext().getConfig(), storagePluginConfig); + TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName()); + Connection conn = storagePlugin.getConnection(); + + try (Admin admin = conn.getAdmin(); + RegionLocator locator = conn.getRegionLocator(tableName)) { + this.hTableDesc = admin.getTableDescriptor(tableName); + List<HRegionLocation> regionLocations = locator.getAllRegionLocations(); + statsCalculator = new TableStatsCalculator(conn, hbaseScanSpec, storagePlugin.getContext().getConfig(), storagePluginConfig); boolean foundStartRegion = false; regionsToScan = new TreeMap<HRegionInfo, ServerName>(); - for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) { - HRegionInfo regionInfo = mapEntry.getKey(); + for (HRegionLocation regionLocation : regionLocations) { + HRegionInfo regionInfo = regionLocation.getRegionInfo(); if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) { continue; } foundStartRegion = true; - regionsToScan.put(regionInfo, mapEntry.getValue()); + regionsToScan.put(regionInfo, regionLocation.getServerName()); scanSizeInBytes += statsCalculator.getRegionSizeInBytes(regionInfo.getRegionName()); if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) { break; } } - - table.close(); } catch (IOException e) { throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e); } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java index a1cd9c9..172a547 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java @@ -18,21 +18,20 @@ package org.apache.drill.exec.store.hbase; +import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.planner.logical.DrillOptiq; import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.FilterPrel; -import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.store.StoragePluginOptimizerRule; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.rex.RexNode; import com.google.common.collect.ImmutableList; @@ -95,7 +94,7 @@ public abstract class HBasePushFilterIntoScan extends StoragePluginOptimizerRule } // convert the filter to one that references the child of the project - final RexNode condition = RelOptUtil.pushFilterPastProject(filter.getCondition(), project); + final RexNode condition = RelOptUtil.pushPastProject(filter.getCondition(), project); doPushFilterToScan(call, filter, project, scan, groupScan, condition); } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 0af0009..b3a7039 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -43,14 +43,13 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.complex.MapVector; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import com.google.common.base.Preconditions; @@ -67,20 +66,22 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas private Map<String, MapVector> familyVectorMap; private VarBinaryVector rowKeyVector; - private HTable hTable; + private Table hTable; private ResultScanner resultScanner; - private String hbaseTableName; + private TableName hbaseTableName; private Scan hbaseScan; - private Configuration hbaseConf; private OperatorContext operatorContext; private boolean rowKeyOnly; - public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec, + private final Connection connection; + + public HBaseRecordReader(Connection connection, HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context) { - hbaseConf = conf; - hbaseTableName = Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName(); + this.connection = connection; + hbaseTableName = TableName.valueOf( + Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName()); hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); hbaseScan .setFilter(subScanSpec.getScanFilter()) @@ -136,10 +137,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas familyVectorMap = new HashMap<String, MapVector>(); try { - logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.", - hbaseTableName, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM), - hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - hTable = new HTable(hbaseConf, hbaseTableName); + hTable = connection.getTable(hbaseTableName); // Add top-level column-family map vectors to output in the order specified // when creating reader (order of first appearance in query). http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java index 7f779df..3a098fc 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java @@ -45,7 +45,7 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{ if ((columns = subScan.getColumns())==null) { columns = GroupScan.ALL_COLUMNS; } - readers.add(new HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), scanSpec, columns, context)); + readers.add(new HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, columns, context)); } catch (Exception e1) { throw new ExecutionSetupException(e1); } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java index 866a2dc..56dfc10 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java @@ -21,15 +21,13 @@ import java.io.IOException; import java.util.Collections; import java.util.Set; -import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; - import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.SchemaFactory; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; @@ -79,11 +77,11 @@ public class HBaseSchemaFactory implements SchemaFactory { @Override public Set<String> getTableNames() { - try(HBaseAdmin admin = new HBaseAdmin(plugin.getConfig().getHBaseConf())) { + try(Admin admin = plugin.getConnection().getAdmin()) { HTableDescriptor[] tables = admin.listTables(); Set<String> tableNames = Sets.newHashSet(); for (HTableDescriptor table : tables) { - tableNames.add(new String(table.getName())); + tableNames.add(new String(table.getTableName().getNameAsString())); } return tableNames; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java index 08fd8e1..81899cf 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java @@ -19,36 +19,39 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import org.apache.calcite.schema.SchemaPlus; - import org.apache.drill.common.JSONOptions; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; public class HBaseStoragePlugin extends AbstractStoragePlugin { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePlugin.class); + private static final HBaseConnectionManager hbaseConnectionManager = HBaseConnectionManager.INSTANCE; private final DrillbitContext context; - private final HBaseStoragePluginConfig engineConfig; + private final HBaseStoragePluginConfig storeConfig; private final HBaseSchemaFactory schemaFactory; + private final HBaseConnectionKey connectionKey; - @SuppressWarnings("unused") private final String name; - public HBaseStoragePlugin(HBaseStoragePluginConfig configuration, DrillbitContext context, String name) + public HBaseStoragePlugin(HBaseStoragePluginConfig storeConfig, DrillbitContext context, String name) throws IOException { this.context = context; this.schemaFactory = new HBaseSchemaFactory(this, name); - this.engineConfig = configuration; + this.storeConfig = storeConfig; this.name = name; + this.connectionKey = new HBaseConnectionKey(); } public DrillbitContext getContext() { @@ -73,11 +76,85 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin { @Override public HBaseStoragePluginConfig getConfig() { - return engineConfig; + return storeConfig; } @Override public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) { return ImmutableSet.of(HBasePushFilterIntoScan.FILTER_ON_SCAN, HBasePushFilterIntoScan.FILTER_ON_PROJECT); } -} \ No newline at end of file + + @Override + public void close() throws Exception { + hbaseConnectionManager.closeConnection(connectionKey); + } + + public Connection getConnection() { + return hbaseConnectionManager.getConnection(connectionKey); + } + + /** + * An internal class which serves the key in a map of {@link HBaseStoragePlugin} => {@link Connection}. + */ + class HBaseConnectionKey { + + private final ReentrantLock lock = new ReentrantLock(); + + private HBaseConnectionKey() {} + + public void lock() { + lock.lock(); + } + + public void unlock() { + lock.unlock(); + } + + public Configuration getHBaseConf() { + return storeConfig.getHBaseConf(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((storeConfig == null) ? 0 : storeConfig.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj == null) { + return false; + } else if (getClass() != obj.getClass()) { + return false; + } + + HBaseStoragePlugin other = ((HBaseConnectionKey) obj).getHBaseStoragePlugin(); + if (name == null) { + if (other.name != null) { + return false; + } + } else if (!name.equals(other.name)) { + return false; + } + if (storeConfig == null) { + if (other.storeConfig != null) { + return false; + } + } else if (!storeConfig.equals(other.storeConfig)) { + return false; + } + return true; + } + + private HBaseStoragePlugin getHBaseStoragePlugin() { + return HBaseStoragePlugin.this; + } + + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java index 08ea7f1..302ccca 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.hbase; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -39,7 +40,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; // Class containing information for reading a single HBase region @JsonTypeName("hbase-region-scan") @@ -111,7 +111,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan { @Override public Iterator<PhysicalOperator> iterator() { - return Iterators.emptyIterator(); + return Collections.emptyIterator(); } public static class HBaseSubScanSpec { http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java index bfd04fe..4e0365c 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -29,15 +30,18 @@ import org.apache.drill.common.config.DrillConfig; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; /** @@ -64,14 +68,16 @@ public class TableStatsCalculator { /** * Computes size of each region for table. * - * @param table + * @param conn * @param hbaseScanSpec * @param config * @throws IOException */ - public TableStatsCalculator(HTable table, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException { - HBaseAdmin admin = new HBaseAdmin(table.getConfiguration()); - try { + public TableStatsCalculator(Connection conn, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException { + TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName()); + try (Admin admin = conn.getAdmin(); + Table table = conn.getTable(tableName); + RegionLocator locator = conn.getRegionLocator(tableName)) { int rowsToSample = rowsToSample(config); if (rowsToSample > 0) { Scan scan = new Scan(hbaseScanSpec.getStartRow(), hbaseScanSpec.getStopRow()); @@ -89,7 +95,7 @@ public class TableStatsCalculator { Cell[] cells = row.rawCells(); if (cells != null) { for (Cell cell : cells) { - rowSizeSum += CellUtil.estimatedSizeOf(cell); + rowSizeSum += CellUtil.estimatedSerializedSizeOf(cell); } } } @@ -105,13 +111,13 @@ public class TableStatsCalculator { return; } - logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\"."); + logger.info("Calculating region sizes for table '{}'.", tableName.getNameAsString()); //get regions for table - Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet(); + List<HRegionLocation> tableRegionInfos = locator.getAllRegionLocations(); Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); - for (HRegionInfo regionInfo : tableRegionInfos) { - tableRegions.add(regionInfo.getRegionName()); + for (HRegionLocation regionInfo : tableRegionInfos) { + tableRegions.add(regionInfo.getRegionInfo().getRegionName()); } ClusterStatus clusterStatus = null; @@ -145,8 +151,6 @@ public class TableStatsCalculator { } } logger.debug("Region sizes calculated"); - } finally { - admin.close(); } } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java index ac78eb0..2d329a8 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java @@ -25,36 +25,40 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.NoSuchElementException; -import com.google.common.collect.Iterators; -import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.store.sys.BasePersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreMode; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.collect.Iterators; + public class HBasePersistentStore<V> extends BasePersistentStore<V> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStore.class); private final PersistentStoreConfig<V> config; - private final HTableInterface table; + private final Table hbaseTable; + private final String hbaseTableName; + private final String tableName; private final byte[] tableNameStartKey; private final byte[] tableNameStopKey; - public HBasePersistentStore(PersistentStoreConfig<V> config, HTableInterface table) { + public HBasePersistentStore(PersistentStoreConfig<V> config, Table table) { this.tableName = config.getName() + '\0'; this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00" this.tableNameStopKey = this.tableNameStartKey.clone(); this.tableNameStopKey[tableNameStartKey.length-1] = 1; this.config = config; - this.table = table; + this.hbaseTable = table; + this.hbaseTableName = table.getName().getNameAsString(); } @Override @@ -71,13 +75,15 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { try { Get get = new Get(row(key)); get.addColumn(family, QUALIFIER); - Result r = table.get(get); + Result r = hbaseTable.get(get); if(r.isEmpty()){ return null; } return value(r); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while getting row '%s' from for table '%s'", key, hbaseTableName) + .build(logger); } } @@ -89,10 +95,12 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { protected synchronized void put(String key, byte[] family, V value) { try { Put put = new Put(row(key)); - put.add(family, QUALIFIER, bytes(value)); - table.put(put); + put.addColumn(family, QUALIFIER, bytes(value)); + hbaseTable.put(put); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName) + .build(logger); } } @@ -100,10 +108,12 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { public synchronized boolean putIfAbsent(String key, V value) { try { Put put = new Put(row(key)); - put.add(FAMILY, QUALIFIER, bytes(value)); - return table.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put); + put.addColumn(FAMILY, QUALIFIER, bytes(value)); + return hbaseTable.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName) + .build(logger); } } @@ -127,7 +137,7 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { try { return config.getSerializer().serialize(value); } catch (IOException e) { - throw new DrillRuntimeException(e); + throw UserException.dataReadError(e).build(logger); } } @@ -135,17 +145,18 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { try { return config.getSerializer().deserialize(result.value()); } catch (IOException e) { - throw new DrillRuntimeException(e); + throw UserException.dataReadError(e).build(logger); } } private void delete(byte[] row) { try { Delete del = new Delete(row); - table.delete(del); + hbaseTable.delete(del); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while deleting row '" + Bytes.toStringBinary(row) - + "' from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while deleting row '%s' from for table '%s'", Bytes.toStringBinary(row), hbaseTableName) + .build(logger); } } @@ -161,9 +172,11 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { scan.addColumn(FAMILY, QUALIFIER); scan.setCaching(Math.min(take, 100)); scan.setBatch(take); // set batch size - scanner = table.getScanner(scan); + scanner = hbaseTable.getScanner(scan); } catch (IOException e) { - throw new DrillRuntimeException("Caught error while creating HBase scanner for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while creating HBase scanner for table '%s'" + hbaseTableName) + .build(logger); } } @@ -175,7 +188,9 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { done = true; } } catch (IOException e) { - throw new DrillRuntimeException("Caught error while fetching rows from for table:" + Bytes.toString(table.getTableName()), e); + throw UserException.dataReadError(e) + .message("Caught error while fetching rows from for table '%s'", hbaseTableName) + .build(logger); } } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java index 6e379c6..1dd44cd 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.store.hbase.config; import java.io.IOException; import java.util.Map; -import com.google.common.annotations.VisibleForTesting; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.store.hbase.DrillHBaseConstants; @@ -33,12 +32,15 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.annotations.VisibleForTesting; + public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class); @@ -46,13 +48,13 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { static final byte[] QUALIFIER = Bytes.toBytes("d"); - private final String storeTableName; + private final TableName hbaseTableName; private Configuration hbaseConf; - private HConnection connection; + private Connection connection; - private HTableInterface table; + private Table hbaseTable; public HBasePersistentStoreProvider(PersistentStoreRegistry registry) { @SuppressWarnings("unchecked") @@ -64,13 +66,13 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue())); } } - this.storeTableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE); + this.hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE)); } @VisibleForTesting public HBasePersistentStoreProvider(Configuration conf, String storeTableName) { this.hbaseConf = conf; - this.storeTableName = storeTableName; + this.hbaseTableName = TableName.valueOf(storeTableName); } @@ -80,7 +82,7 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { switch(config.getMode()){ case BLOB_PERSISTENT: case PERSISTENT: - return new HBasePersistentStore<>(config, this.table); + return new HBasePersistentStore<>(config, this.hbaseTable); default: throw new IllegalStateException(); @@ -89,35 +91,33 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { @Override - @SuppressWarnings("deprecation") public void start() throws IOException { - this.connection = HConnectionManager.createConnection(hbaseConf); + this.connection = ConnectionFactory.createConnection(hbaseConf); - try(HBaseAdmin admin = new HBaseAdmin(connection)) { - if (!admin.tableExists(storeTableName)) { - HTableDescriptor desc = new HTableDescriptor(storeTableName); + try(Admin admin = connection.getAdmin()) { + if (!admin.tableExists(hbaseTableName)) { + HTableDescriptor desc = new HTableDescriptor(hbaseTableName); desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1)); admin.createTable(desc); } else { - HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName)); + HTableDescriptor desc = admin.getTableDescriptor(hbaseTableName); if (!desc.hasFamily(FAMILY)) { - throw new DrillRuntimeException("The HBase table " + storeTableName + throw new DrillRuntimeException("The HBase table " + hbaseTableName + " specified as persistent store exists but does not contain column family: " + (Bytes.toString(FAMILY))); } } } - this.table = connection.getTable(storeTableName); - this.table.setAutoFlush(true); + this.hbaseTable = connection.getTable(hbaseTableName); } @Override public synchronized void close() { - if (this.table != null) { + if (this.hbaseTable != null) { try { - this.table.close(); - this.table = null; + this.hbaseTable.close(); + this.hbaseTable = null; } catch (IOException e) { logger.warn("Caught exception while closing HBase table.", e); } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java index 0ccd8d6..5ce823c 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java @@ -27,6 +27,7 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.hbase.HBaseStoragePlugin; import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig; +import org.apache.drill.exec.util.GuavaPatcher; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.junit.AfterClass; @@ -38,6 +39,10 @@ import com.google.common.io.Files; public class BaseHBaseTest extends BaseTestQuery { + static { + GuavaPatcher.patch(); + } + private static final String HBASE_STORAGE_PLUGIN_NAME = "hbase"; protected static Configuration conf = HBaseConfiguration.create(); @@ -46,16 +51,13 @@ public class BaseHBaseTest extends BaseTestQuery { protected static HBaseStoragePluginConfig storagePluginConfig; - @BeforeClass public static void setupDefaultTestCluster() throws Exception { - GuavaPatcher.patch(); - /* * Change the following to HBaseTestsSuite.configure(false, true) * if you want to test against an externally running HBase cluster. */ - HBaseTestsSuite.configure(true, true); + HBaseTestsSuite.configure(true /*manageHBaseCluster*/, true /*createTables*/); HBaseTestsSuite.initCluster(); BaseTestQuery.setupDefaultTestCluster(); @@ -66,7 +68,6 @@ public class BaseHBaseTest extends BaseTestQuery { storagePluginConfig.setEnabled(true); storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort()); pluginRegistry.createOrUpdate(HBASE_STORAGE_PLUGIN_NAME, storagePluginConfig, true); - } @AfterClass @@ -105,9 +106,7 @@ public class BaseHBaseTest extends BaseTestQuery { } protected String canonizeHBaseSQL(String sql) { - return sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1); + return sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1.getNameAsString()); } - - } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/GuavaPatcher.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/GuavaPatcher.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/GuavaPatcher.java deleted file mode 100644 index 8f24da8..0000000 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/GuavaPatcher.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.hbase; - -import java.lang.reflect.Modifier; - -import javassist.ClassPool; -import javassist.CtClass; -import javassist.CtConstructor; -import javassist.CtMethod; -import javassist.CtNewMethod; - -import org.apache.drill.common.CatastrophicFailure; - -public class GuavaPatcher { - - private static boolean patched; - - public static synchronized void patch() { - if (!patched) { - try { - patchStopwatch(); - patchCloseables(); - patched = true; - } catch (Exception e) { - CatastrophicFailure.exit(e, "Unable to patch Guava classes.", -100); - } - } - } - - /** - * Makes Guava stopwatch look like the old version for compatibility with hbase-server (for test purposes). - */ - private static void patchStopwatch() throws Exception { - - ClassPool cp = ClassPool.getDefault(); - CtClass cc = cp.get("com.google.common.base.Stopwatch"); - - // Expose the constructor for Stopwatch for old libraries who use the pattern new Stopwatch().start(). - for (CtConstructor c : cc.getConstructors()) { - if (!Modifier.isStatic(c.getModifiers())) { - c.setModifiers(Modifier.PUBLIC); - } - } - - // Add back the Stopwatch.elapsedMillis() method for old consumers. - CtMethod newmethod = CtNewMethod.make( - "public long elapsedMillis() { return elapsed(java.util.concurrent.TimeUnit.MILLISECONDS); }", cc); - cc.addMethod(newmethod); - - // Load the modified class instead of the original. - cc.toClass(); - - System.out.println("Google's Stopwatch patched for old HBase Guava version."); - } - - private static void patchCloseables() throws Exception { - - ClassPool cp = ClassPool.getDefault(); - CtClass cc = cp.get("com.google.common.io.Closeables"); - - - // Add back the Closeables.closeQuietly() method for old consumers. - CtMethod newmethod = CtNewMethod.make( - "public static void closeQuietly(java.io.Closeable closeable) { try{closeable.close();}catch(Exception e){} }", - cc); - cc.addMethod(newmethod); - - // Load the modified class instead of the original. - cc.toClass(); - - System.out.println("Google's Closeables patched for old HBase Guava version."); - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java index 6414f8b..b297b41 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java @@ -24,19 +24,19 @@ public class HBaseRecordReaderTest extends BaseHBaseTest { @Test public void testLocalDistributed() throws Exception { String planName = "/hbase/hbase_scan_screen_physical.json"; - runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 8); + runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1.getNameAsString(), 8); } @Test public void testLocalDistributedColumnSelect() throws Exception { String planName = "/hbase/hbase_scan_screen_physical_column_select.json"; - runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 3); + runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1.getNameAsString(), 3); } @Test public void testLocalDistributedFamilySelect() throws Exception { String planName = "/hbase/hbase_scan_screen_physical_family_select.json"; - runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 4); + runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1.getNameAsString(), 4); } } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java index 8f59b67..3dd3608 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java @@ -21,11 +21,16 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.exec.util.GuavaPatcher; +import org.apache.drill.hbase.test.Drill2130StorageHBaseHamcrestConfigurationTest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runner.RunWith; @@ -34,38 +39,46 @@ import org.junit.runners.Suite.SuiteClasses; @RunWith(Suite.class) @SuiteClasses({ - TestHBaseQueries.class, - TestHBaseRegexParser.class, + Drill2130StorageHBaseHamcrestConfigurationTest.class, HBaseRecordReaderTest.class, + TestHBaseCFAsJSONString.class, + TestHBaseConnectionManager.class, TestHBaseFilterPushDown.class, TestHBaseProjectPushDown.class, + TestHBaseQueries.class, + TestHBaseRegexParser.class, TestHBaseRegionScanAssignments.class, TestHBaseTableProvider.class, - TestHBaseCFAsJSONString.class + TestOrderedBytesConvertFunctions.class }) public class HBaseTestsSuite { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class); + static { + GuavaPatcher.patch(); + } + private static final boolean IS_DEBUG = ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0; - protected static final String TEST_TABLE_1 = "TestTable1"; - protected static final String TEST_TABLE_3 = "TestTable3"; - protected static final String TEST_TABLE_COMPOSITE_DATE = "TestTableCompositeDate"; - protected static final String TEST_TABLE_COMPOSITE_TIME = "TestTableCompositeTime"; - protected static final String TEST_TABLE_COMPOSITE_INT = "TestTableCompositeInt"; - protected static final String TEST_TABLE_DOUBLE_OB = "TestTableDoubleOB"; - protected static final String TEST_TABLE_FLOAT_OB = "TestTableFloatOB"; - protected static final String TEST_TABLE_BIGINT_OB = "TestTableBigIntOB"; - protected static final String TEST_TABLE_INT_OB = "TestTableIntOB"; - protected static final String TEST_TABLE_DOUBLE_OB_DESC = "TestTableDoubleOBDesc"; - protected static final String TEST_TABLE_FLOAT_OB_DESC = "TestTableFloatOBDesc"; - protected static final String TEST_TABLE_BIGINT_OB_DESC = "TestTableBigIntOBDesc"; - protected static final String TEST_TABLE_INT_OB_DESC = "TestTableIntOBDesc"; - protected static final String TEST_TABLE_NULL_STR = "TestTableNullStr"; + protected static final TableName TEST_TABLE_1 = TableName.valueOf("TestTable1"); + protected static final TableName TEST_TABLE_3 = TableName.valueOf("TestTable3"); + protected static final TableName TEST_TABLE_COMPOSITE_DATE = TableName.valueOf("TestTableCompositeDate"); + protected static final TableName TEST_TABLE_COMPOSITE_TIME = TableName.valueOf("TestTableCompositeTime"); + protected static final TableName TEST_TABLE_COMPOSITE_INT = TableName.valueOf("TestTableCompositeInt"); + protected static final TableName TEST_TABLE_DOUBLE_OB = TableName.valueOf("TestTableDoubleOB"); + protected static final TableName TEST_TABLE_FLOAT_OB = TableName.valueOf("TestTableFloatOB"); + protected static final TableName TEST_TABLE_BIGINT_OB = TableName.valueOf("TestTableBigIntOB"); + protected static final TableName TEST_TABLE_INT_OB = TableName.valueOf("TestTableIntOB"); + protected static final TableName TEST_TABLE_DOUBLE_OB_DESC = TableName.valueOf("TestTableDoubleOBDesc"); + protected static final TableName TEST_TABLE_FLOAT_OB_DESC = TableName.valueOf("TestTableFloatOBDesc"); + protected static final TableName TEST_TABLE_BIGINT_OB_DESC = TableName.valueOf("TestTableBigIntOBDesc"); + protected static final TableName TEST_TABLE_INT_OB_DESC = TableName.valueOf("TestTableIntOBDesc"); + protected static final TableName TEST_TABLE_NULL_STR = TableName.valueOf("TestTableNullStr"); private static Configuration conf; - private static HBaseAdmin admin; + private static Connection conn; + private static Admin admin; private static HBaseTestingUtility UTIL; @@ -82,15 +95,13 @@ public class HBaseTestsSuite { @BeforeClass public static void initCluster() throws Exception { - GuavaPatcher.patch(); - if (initCount.get() == 0) { synchronized (HBaseTestsSuite.class) { if (initCount.get() == 0) { conf = HBaseConfiguration.create(); conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-unit-tests-client"); if (IS_DEBUG) { - conf.set("hbase.regionserver.lease.period","10000000"); + conf.set("hbase.client.scanner.timeout.period","10000000"); } if (manageHBaseCluster) { @@ -105,7 +116,8 @@ public class HBaseTestsSuite { logger.info("HBase mini cluster started. Zookeeper port: '{}'", getZookeeperPort()); } - admin = new HBaseAdmin(conf); + conn = ConnectionFactory.createConnection(conf); + admin = conn.getAdmin(); if (createTables || !tablesExist()) { createTestTables(); @@ -172,20 +184,20 @@ public class HBaseTestsSuite { * multiple fragments. Hence the number of regions in the HBase table is set to 1. * Will revert to multiple region once the issue is resolved. */ - TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 2); - TestTableGenerator.generateHBaseDataset3(admin, TEST_TABLE_3, 1); - TestTableGenerator.generateHBaseDatasetCompositeKeyDate(admin, TEST_TABLE_COMPOSITE_DATE, 1); - TestTableGenerator.generateHBaseDatasetCompositeKeyTime(admin, TEST_TABLE_COMPOSITE_TIME, 1); - TestTableGenerator.generateHBaseDatasetCompositeKeyInt(admin, TEST_TABLE_COMPOSITE_INT, 1); - TestTableGenerator.generateHBaseDatasetDoubleOB(admin, TEST_TABLE_DOUBLE_OB, 1); - TestTableGenerator.generateHBaseDatasetFloatOB(admin, TEST_TABLE_FLOAT_OB, 1); - TestTableGenerator.generateHBaseDatasetBigIntOB(admin, TEST_TABLE_BIGINT_OB, 1); - TestTableGenerator.generateHBaseDatasetIntOB(admin, TEST_TABLE_INT_OB, 1); - TestTableGenerator.generateHBaseDatasetDoubleOBDesc(admin, TEST_TABLE_DOUBLE_OB_DESC, 1); - TestTableGenerator.generateHBaseDatasetFloatOBDesc(admin, TEST_TABLE_FLOAT_OB_DESC, 1); - TestTableGenerator.generateHBaseDatasetBigIntOBDesc(admin, TEST_TABLE_BIGINT_OB_DESC, 1); - TestTableGenerator.generateHBaseDatasetIntOBDesc(admin, TEST_TABLE_INT_OB_DESC, 1); - TestTableGenerator.generateHBaseDatasetNullStr(admin, TEST_TABLE_NULL_STR, 1); + TestTableGenerator.generateHBaseDataset1(conn, admin, TEST_TABLE_1, 2); + TestTableGenerator.generateHBaseDataset3(conn, admin, TEST_TABLE_3, 1); + TestTableGenerator.generateHBaseDatasetCompositeKeyDate(conn, admin, TEST_TABLE_COMPOSITE_DATE, 1); + TestTableGenerator.generateHBaseDatasetCompositeKeyTime(conn, admin, TEST_TABLE_COMPOSITE_TIME, 1); + TestTableGenerator.generateHBaseDatasetCompositeKeyInt(conn, admin, TEST_TABLE_COMPOSITE_INT, 1); + TestTableGenerator.generateHBaseDatasetDoubleOB(conn, admin, TEST_TABLE_DOUBLE_OB, 1); + TestTableGenerator.generateHBaseDatasetFloatOB(conn, admin, TEST_TABLE_FLOAT_OB, 1); + TestTableGenerator.generateHBaseDatasetBigIntOB(conn, admin, TEST_TABLE_BIGINT_OB, 1); + TestTableGenerator.generateHBaseDatasetIntOB(conn, admin, TEST_TABLE_INT_OB, 1); + TestTableGenerator.generateHBaseDatasetDoubleOBDesc(conn, admin, TEST_TABLE_DOUBLE_OB_DESC, 1); + TestTableGenerator.generateHBaseDatasetFloatOBDesc(conn, admin, TEST_TABLE_FLOAT_OB_DESC, 1); + TestTableGenerator.generateHBaseDatasetBigIntOBDesc(conn, admin, TEST_TABLE_BIGINT_OB_DESC, 1); + TestTableGenerator.generateHBaseDatasetIntOBDesc(conn, admin, TEST_TABLE_INT_OB_DESC, 1); + TestTableGenerator.generateHBaseDatasetNullStr(conn, admin, TEST_TABLE_NULL_STR, 1); } private static void cleanupTestTables() throws IOException { @@ -228,8 +240,12 @@ public class HBaseTestsSuite { HBaseTestsSuite.createTables = createTables; } - public static HBaseAdmin getAdmin() { + public static Admin getAdmin() { return admin; } + public static Connection getConnection() { + return conn; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java new file mode 100644 index 0000000..9b3daf0 --- /dev/null +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.hbase; + +import org.junit.Test; + +public class TestHBaseConnectionManager extends BaseHBaseTest { + + @Test + public void testHBaseConnectionManager() throws Exception{ + setColumnWidth(8); + runHBaseSQLVerifyCount("SELECT\n" + + "row_key\n" + + "FROM\n" + + " hbase.`[TABLE_NAME]` tableName" + , 8); + + /* + * Simulate HBase connection close and ensure that the connection + * will be reestablished automatically. + */ + storagePlugin.getConnection().close(); + runHBaseSQLVerifyCount("SELECT\n" + + "row_key\n" + + "FROM\n" + + " hbase.`[TABLE_NAME]` tableName" + , 8); + + /* + * Simulate HBase cluster restart and ensure that running query against + * HBase does not require Drill cluster restart. + */ + HBaseTestsSuite.getHBaseTestingUtility().shutdownMiniHBaseCluster(); + HBaseTestsSuite.getHBaseTestingUtility().restartHBaseCluster(1); + runHBaseSQLVerifyCount("SELECT\n" + + "row_key\n" + + "FROM\n" + + " hbase.`[TABLE_NAME]` tableName" + , 8); + + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c2d9959e/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java index 7ef7954..56386a0 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java @@ -18,7 +18,6 @@ package org.apache.drill.hbase; import org.apache.drill.PlanTestBase; -import org.junit.Ignore; import org.junit.Test; public class TestHBaseFilterPushDown extends BaseHBaseTest {