DRILL-754: Scan of HBase table timed out

+ RAT: Ignore "*.patch" file in the project root directory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cdd2ce90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cdd2ce90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cdd2ce90

Branch: refs/heads/master
Commit: cdd2ce90578cca21974f343b87513d1fa0a33f38
Parents: cc99b97
Author: Aditya Kishore <[email protected]>
Authored: Thu May 15 15:48:21 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Thu May 15 17:59:50 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     | 89 ++++++++++----------
 .../org/apache/drill/hbase/BaseHBaseTest.java   |  6 +-
 .../org/apache/drill/hbase/HBaseTestsSuite.java |  1 +
 pom.xml                                         |  1 +
 4 files changed, 50 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdd2ce90/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 58285db..dbf8123 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -53,22 +53,30 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 
 public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
 
   private static final int TARGET_RECORD_COUNT = 4000;
 
   private List<SchemaPath> columns;
   private OutputMutator outputMutator;
 
-  private ResultScanner resultScanner;
-  Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap;
-  private Result leftOver;
+  private Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap;
   private VarBinaryVector rowKeyVector;
   private SchemaPath rowKeySchemaPath;
 
+  private HTable hTable;
+  private ResultScanner resultScanner;
+
+  private String hbaseTable;
+  private Scan hbaseScan;
+  private Configuration hbaseConf;
+  private Result leftOver;
+
   public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec 
subScanSpec,
       List<SchemaPath> projectedColumns, FragmentContext context) throws 
OutOfMemoryException {
-    Scan scan= new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
+    hbaseConf = conf;
+    hbaseTable = subScanSpec.getTableName();
+    hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
     boolean rowKeyOnly = true;
     if (projectedColumns != null && projectedColumns.size() != 0) {
       /*
@@ -90,10 +98,10 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
         PathSegment child = root.getChild();
         if (child != null && child.isNamed()) {
           byte[] qualifier = 
child.getNameSegment().getPath().toString().getBytes();
-          scan.addColumn(family, qualifier);
+          hbaseScan.addColumn(family, qualifier);
         } else {
           columnIterator.remove();
-          scan.addFamily(family);
+          hbaseScan.addFamily(family);
         }
       }
     } else {
@@ -103,33 +111,18 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
       this.columns.add(rowKeySchemaPath);
     }
 
-    try {
-      scan.setFilter(subScanSpec.getScanFilter());
-      if (rowKeyOnly) {
-        /* if only the row key was requested, add a FirstKeyOnlyFilter to the 
scan
-         * to fetch only one KV from each row. If a filter is already part of 
this
-         * scan, add the FirstKeyOnlyFilter as the LAST filter of a 
MUST_PASS_ALL
-         * FilterList.
-         */
-        scan.setFilter(
-            HBaseUtils.andFilterAtIndex(scan.getFilter(), 
HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter())
-        );
-      }
-      scan.setCaching(TARGET_RECORD_COUNT);
-
-      logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum 
'{}', port '{}', znode '{}'.",
-                   subScanSpec.getTableName(), 
conf.get(HConstants.ZOOKEEPER_QUORUM),
-                   conf.get(HBASE_ZOOKEEPER_PORT), 
conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-      HTable table = new HTable(conf, subScanSpec.getTableName());
-      resultScanner = table.getScanner(scan);
-      try {
-        table.close();
-      } catch (IOException e) {
-        logger.warn("Failure while closing HBase table: " + 
subScanSpec.getTableName(), e);
-      }
-    } catch (IOException e1) {
-      throw new DrillRuntimeException(e1);
+    hbaseScan.setFilter(subScanSpec.getScanFilter());
+    if (rowKeyOnly) {
+      /* if only the row key was requested, add a FirstKeyOnlyFilter to the 
scan
+       * to fetch only one KV from each row. If a filter is already part of 
this
+       * scan, add the FirstKeyOnlyFilter as the LAST filter of a MUST_PASS_ALL
+       * FilterList.
+       */
+      hbaseScan.setFilter(
+          HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), 
HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter())
+          );
     }
+    hbaseScan.setCaching(TARGET_RECORD_COUNT);
   }
 
   @Override
@@ -138,23 +131,24 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
     output.removeAllFields();
     vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>();
 
-    // Add Vectors to output in the order specified when creating reader
-    for (SchemaPath column : columns) {
-      try {
+    try {
+      // Add Vectors to output in the order specified when creating reader
+      for (SchemaPath column : columns) {
         if (column.equals(rowKeySchemaPath)) {
           MaterializedField field = MaterializedField.create(column, 
Types.required(TypeProtos.MinorType.VARBINARY));
           rowKeyVector = output.addField(field, VarBinaryVector.class);
         } else if (column.getRootSegment().getChild() != null) {
           getOrCreateColumnVector(new FamilyQualifierWrapper(column), false);
         }
-      } catch (SchemaChangeException e) {
-        throw new ExecutionSetupException(e);
       }
-    }
-
-    try {
       output.setNewSchema();
-    } catch (SchemaChangeException e) {
+
+      logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum 
'{}', port '{}', znode '{}'.",
+          hbaseTable, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM),
+          hbaseConf.get(HBASE_ZOOKEEPER_PORT), 
hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+      hTable = new HTable(hbaseConf, hbaseTable);
+      resultScanner = hTable.getScanner(hbaseScan);
+    } catch (SchemaChangeException | IOException e) {
       throw new ExecutionSetupException(e);
     }
   }
@@ -243,8 +237,15 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
 
   @Override
   public void cleanup() {
-    if (resultScanner != null) {
-      resultScanner.close();
+    try {
+      if (resultScanner != null) {
+        resultScanner.close();
+      }
+      if (hTable != null) {
+        hTable.close();
+      }
+    } catch (IOException e) {
+      logger.warn("Failure while closing HBase table: " + hbaseTable, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdd2ce90/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index f170025..48193eb 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -55,10 +55,10 @@ public class BaseHBaseTest extends BaseTestQuery {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     /*
-     * Change the following to HBaseTestsSuite.configure(true, true) if you 
want
-     * to test against a mini HBase cluster running within unit test 
environment
+     * Change the following to HBaseTestsSuite.configure(false, false)
+     * if you want to test against an externally running HBase cluster.
      */
-    HBaseTestsSuite.configure(false, false);
+    HBaseTestsSuite.configure(true, true);
 
     HBaseTestsSuite.initCluster();
     HBaseStoragePlugin plugin = (HBaseStoragePlugin) 
bit.getContext().getStorage().getPlugin("hbase");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdd2ce90/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 22a76cb..3e91361 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -66,6 +66,7 @@ public class HBaseTestsSuite {
       synchronized (HBaseTestsSuite.class) {
         if (initCount.get() == 0) {
           conf = HBaseConfiguration.create();
+          conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, 
"drill-hbase-unit-tests-client");
           if (IS_DEBUG) {
             conf.set("hbase.regionserver.lease.period","10000000");
           }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdd2ce90/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0f748b1..d4c077a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,6 +137,7 @@
             <exclude>**/*.tdd</exclude>
             <exclude>**/*.project</exclude>
             <exclude>.*/**</exclude>
+            <exclude>*.patch</exclude>
           </excludes>
         </configuration>
       </plugin>

Reply via email to