DRILL-5896: Handle HBase columns vector creation in the HBaseRecordReader closes #1005
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dfd43d02 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dfd43d02 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dfd43d02 Branch: refs/heads/master Commit: dfd43d020498c09dcb2c3fed4e8c6df23d755d55 Parents: 59c7447 Author: Prasad Nagaraj Subramanya <prasadn...@gmail.com> Authored: Wed Oct 25 21:20:07 2017 -0700 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Mon Nov 13 11:06:20 2017 +0200 ---------------------------------------------------------------------- .../exec/store/hbase/HBaseRecordReader.java | 34 +++++++++++++------- 1 file changed, 23 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/dfd43d02/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 4e822df..631c44d 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -75,6 +75,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas private TableName hbaseTableName; private Scan hbaseScan; + // scan instance to capture columns for vector creation + private Scan hbaseScanColumnsOnly; + private Set<String> completeFamilies; private OperatorContext operatorContext; private boolean rowKeyOnly; @@ -87,6 +90,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas hbaseTableName = TableName.valueOf( Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName()); hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); + hbaseScanColumnsOnly = new Scan(); hbaseScan .setFilter(subScanSpec.getScanFilter()) .setCaching(TARGET_RECORD_COUNT); @@ -108,7 +112,8 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas @Override protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) { Set<SchemaPath> transformed = Sets.newLinkedHashSet(); - Set<String> completeFamilies = Sets.newHashSet(); + completeFamilies = Sets.newHashSet(); + rowKeyOnly = true; if (!isStarQuery()) { for (SchemaPath column : columns) { @@ -121,16 +126,18 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas byte[] family = root.getPath().getBytes(); transformed.add(SchemaPath.getSimplePath(root.getPath())); PathSegment child = root.getChild(); - if (!completeFamilies.contains(new String(family, StandardCharsets.UTF_8).toLowerCase())) { - if (child != null && child.isNamed()) { - byte[] qualifier = child.getNameSegment().getPath().getBytes(); + if (child != null && child.isNamed()) { + byte[] qualifier = child.getNameSegment().getPath().getBytes(); + hbaseScanColumnsOnly.addColumn(family, qualifier); + if (!completeFamilies.contains(root.getPath())) { hbaseScan.addColumn(family, qualifier); - } else { - hbaseScan.addFamily(family); - completeFamilies.add(new String(family, StandardCharsets.UTF_8).toLowerCase()); } + } else { + hbaseScan.addFamily(family); + completeFamilies.add(root.getPath()); } } + /* if only the row key was requested, add a FirstKeyOnlyFilter to the scan * to fetch only one KV from each row. If a filter is already part of this * scan, add the FirstKeyOnlyFilter as the LAST filter of a MUST_PASS_ALL @@ -168,11 +175,10 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas } } - // Add map and child vectors for any HBase column families and/or HBase - // columns that are requested (in order to avoid later creation of dummy - // NullableIntVectors for them). + // Add map and child vectors for any HBase columns that are requested (in + // order to avoid later creation of dummy NullableIntVectors for them). final Set<Map.Entry<byte[], NavigableSet<byte []>>> familiesEntries = - hbaseScan.getFamilyMap().entrySet(); + hbaseScanColumnsOnly.getFamilyMap().entrySet(); for (Map.Entry<byte[], NavigableSet<byte []>> familyEntry : familiesEntries) { final String familyName = new String(familyEntry.getKey(), StandardCharsets.UTF_8); @@ -186,6 +192,12 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas } } } + + // Add map vectors for any HBase column families that are requested. + for (String familyName : completeFamilies) { + getOrCreateFamilyVector(familyName, false); + } + resultScanner = hTable.getScanner(hbaseScan); } catch (SchemaChangeException | IOException e) { throw new ExecutionSetupException(e);