DRILL-754: Scan of HBase table timed out + RAT: Ignore "*.patch" file in the project root directory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cdd2ce90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cdd2ce90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cdd2ce90 Branch: refs/heads/master Commit: cdd2ce90578cca21974f343b87513d1fa0a33f38 Parents: cc99b97 Author: Aditya Kishore <[email protected]> Authored: Thu May 15 15:48:21 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 15 17:59:50 2014 -0700 ---------------------------------------------------------------------- .../exec/store/hbase/HBaseRecordReader.java | 89 ++++++++++---------- .../org/apache/drill/hbase/BaseHBaseTest.java | 6 +- .../org/apache/drill/hbase/HBaseTestsSuite.java | 1 + pom.xml | 1 + 4 files changed, 50 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdd2ce90/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 58285db..dbf8123 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -53,22 +53,30 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class); private static final int TARGET_RECORD_COUNT = 4000; private List<SchemaPath> columns; private OutputMutator outputMutator; - private ResultScanner resultScanner; - Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap; - private Result leftOver; + private Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap; private VarBinaryVector rowKeyVector; private SchemaPath rowKeySchemaPath; + private HTable hTable; + private ResultScanner resultScanner; + + private String hbaseTable; + private Scan hbaseScan; + private Configuration hbaseConf; + private Result leftOver; + public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException { - Scan scan= new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); + hbaseConf = conf; + hbaseTable = subScanSpec.getTableName(); + hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); boolean rowKeyOnly = true; if (projectedColumns != null && projectedColumns.size() != 0) { /* @@ -90,10 +98,10 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { PathSegment child = root.getChild(); if (child != null && child.isNamed()) { byte[] qualifier = child.getNameSegment().getPath().toString().getBytes(); - scan.addColumn(family, qualifier); + hbaseScan.addColumn(family, qualifier); } else { columnIterator.remove(); - scan.addFamily(family); + hbaseScan.addFamily(family); } } } else { @@ -103,33 +111,18 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { this.columns.add(rowKeySchemaPath); } - try { - scan.setFilter(subScanSpec.getScanFilter()); - if (rowKeyOnly) { - /* if only the row key was requested, add a FirstKeyOnlyFilter to the scan - * to fetch only one KV from each row. If a filter is already part of this - * scan, add the FirstKeyOnlyFilter as the LAST filter of a MUST_PASS_ALL - * FilterList. - */ - scan.setFilter( - HBaseUtils.andFilterAtIndex(scan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter()) - ); - } - scan.setCaching(TARGET_RECORD_COUNT); - - logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.", - subScanSpec.getTableName(), conf.get(HConstants.ZOOKEEPER_QUORUM), - conf.get(HBASE_ZOOKEEPER_PORT), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - HTable table = new HTable(conf, subScanSpec.getTableName()); - resultScanner = table.getScanner(scan); - try { - table.close(); - } catch (IOException e) { - logger.warn("Failure while closing HBase table: " + subScanSpec.getTableName(), e); - } - } catch (IOException e1) { - throw new DrillRuntimeException(e1); + hbaseScan.setFilter(subScanSpec.getScanFilter()); + if (rowKeyOnly) { + /* if only the row key was requested, add a FirstKeyOnlyFilter to the scan + * to fetch only one KV from each row. If a filter is already part of this + * scan, add the FirstKeyOnlyFilter as the LAST filter of a MUST_PASS_ALL + * FilterList. + */ + hbaseScan.setFilter( + HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter()) + ); } + hbaseScan.setCaching(TARGET_RECORD_COUNT); } @Override @@ -138,23 +131,24 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { output.removeAllFields(); vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>(); - // Add Vectors to output in the order specified when creating reader - for (SchemaPath column : columns) { - try { + try { + // Add Vectors to output in the order specified when creating reader + for (SchemaPath column : columns) { if (column.equals(rowKeySchemaPath)) { MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.VARBINARY)); rowKeyVector = output.addField(field, VarBinaryVector.class); } else if (column.getRootSegment().getChild() != null) { getOrCreateColumnVector(new FamilyQualifierWrapper(column), false); } - } catch (SchemaChangeException e) { - throw new ExecutionSetupException(e); } - } - - try { output.setNewSchema(); - } catch (SchemaChangeException e) { + + logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.", + hbaseTable, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM), + hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + hTable = new HTable(hbaseConf, hbaseTable); + resultScanner = hTable.getScanner(hbaseScan); + } catch (SchemaChangeException | IOException e) { throw new ExecutionSetupException(e); } } @@ -243,8 +237,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { @Override public void cleanup() { - if (resultScanner != null) { - resultScanner.close(); + try { + if (resultScanner != null) { + resultScanner.close(); + } + if (hTable != null) { + hTable.close(); + } + } catch (IOException e) { + logger.warn("Failure while closing HBase table: " + hbaseTable, e); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdd2ce90/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java index f170025..48193eb 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java @@ -55,10 +55,10 @@ public class BaseHBaseTest extends BaseTestQuery { @BeforeClass public static void setUpBeforeClass() throws Exception { /* - * Change the following to HBaseTestsSuite.configure(true, true) if you want - * to test against a mini HBase cluster running within unit test environment + * Change the following to HBaseTestsSuite.configure(false, false) + * if you want to test against an externally running HBase cluster. */ - HBaseTestsSuite.configure(false, false); + HBaseTestsSuite.configure(true, true); HBaseTestsSuite.initCluster(); HBaseStoragePlugin plugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin("hbase"); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdd2ce90/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java index 22a76cb..3e91361 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java @@ -66,6 +66,7 @@ public class HBaseTestsSuite { synchronized (HBaseTestsSuite.class) { if (initCount.get() == 0) { conf = HBaseConfiguration.create(); + conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-unit-tests-client"); if (IS_DEBUG) { conf.set("hbase.regionserver.lease.period","10000000"); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdd2ce90/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0f748b1..d4c077a 100644 --- a/pom.xml +++ b/pom.xml @@ -137,6 +137,7 @@ <exclude>**/*.tdd</exclude> <exclude>**/*.project</exclude> <exclude>.*/**</exclude> + <exclude>*.patch</exclude> </excludes> </configuration> </plugin>
