Port DRILL-3492 and DRILL-3364 to maprdb plugin + Fix build issues due to API changes in DRILL-3535
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/df19a019 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/df19a019 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/df19a019 Branch: refs/heads/master Commit: df19a019117db3448a01f34daf804df747ca94b8 Parents: ba8a785 Author: Smidth Panchamia <spancha...@mapr.com> Authored: Thu Aug 6 15:50:44 2015 -0700 Committer: Aditya Kishore <a...@apache.org> Committed: Fri Sep 9 10:08:28 2016 -0700 ---------------------------------------------------------------------- .../store/maprdb/CompareFunctionsProcessor.java | 421 ++++++++++++++++--- .../exec/store/maprdb/MapRDBFilterBuilder.java | 79 +++- .../exec/store/maprdb/MapRDBFormatMatcher.java | 14 +- .../exec/store/maprdb/MapRDBFormatPlugin.java | 2 +- .../store/maprdb/MapRDBPushFilterIntoScan.java | 130 ++++-- 5 files changed, 529 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java index de8e080..c6c2504 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.maprdb; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.nio.ByteBuffer; import java.nio.ByteOrder; import org.apache.drill.common.expression.CastExpression; @@ -35,7 +36,16 @@ import org.apache.drill.common.expression.ValueExpressions.IntExpression; import org.apache.drill.common.expression.ValueExpressions.LongExpression; import org.apache.drill.common.expression.ValueExpressions.QuotedString; import org.apache.drill.common.expression.ValueExpressions.TimeExpression; +import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.hadoop.hbase.util.Order; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; + +import org.apache.drill.exec.store.hbase.DrillHBaseConstants; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.PrefixFilter; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; @@ -47,6 +57,15 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr private boolean isEqualityFn; private SchemaPath path; private String functionName; + private boolean sortOrderAscending; + + // Fields for row-key prefix comparison + // If the query is on row-key prefix, we cannot use a standard template to identify startRow, stopRow and filter + // Hence, we use these local variables(set depending upon the encoding type in user query) + private boolean isRowKeyPrefixComparison; + byte[] rowKeyPrefixStartRow; + byte[] rowKeyPrefixStopRow; + Filter rowKeyPrefixFilter; public static boolean isCompareFunction(String functionName) { return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName); @@ -79,6 +98,8 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr this.functionName = functionName; this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName) && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName); + this.isRowKeyPrefixComparison = false; + this.sortOrderAscending = true; } public byte[] getValue() { @@ -97,6 +118,26 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr return functionName; } + public boolean isRowKeyPrefixComparison() { + return isRowKeyPrefixComparison; + } + + public byte[] getRowKeyPrefixStartRow() { + return rowKeyPrefixStartRow; + } + + public byte[] getRowKeyPrefixStopRow() { + return rowKeyPrefixStopRow; + } + + public Filter getRowKeyPrefixFilter() { + return rowKeyPrefixFilter; + } + + public boolean isSortOrderAscending() { + return sortOrderAscending; + } + @Override public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException { if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) { @@ -107,75 +148,341 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr @Override public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException { - if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM && e.getInput() instanceof SchemaPath) { - ByteBuf bb = null; + if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM) { + String encodingType = e.getEncodingType(); - switch (encodingType) { - case "INT_BE": - case "INT": - case "UINT_BE": - case "UINT": - case "UINT4_BE": - case "UINT4": - if (valueArg instanceof IntExpression - && (isEqualityFn || encodingType.startsWith("U"))) { - bb = newByteBuf(4, encodingType.endsWith("_BE")); - bb.writeInt(((IntExpression)valueArg).getInt()); + int prefixLength = 0; + + // Handle scan pruning in the following scenario: + // The row-key is a composite key and the CONVERT_FROM() function has byte_substr() as input function which is + // querying for the first few bytes of the row-key(start-offset 1) + // Example WHERE clause: + // CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'DATE_EPOCH_BE') < DATE '2015-06-17' + if (e.getInput() instanceof FunctionCall) { + + // We can prune scan range only for big-endian encoded data + if (encodingType.endsWith("_BE") == false) { + return false; } - break; - case "BIGINT_BE": - case "BIGINT": - case "UINT8_BE": - case "UINT8": - if (valueArg instanceof LongExpression - && (isEqualityFn || encodingType.startsWith("U"))) { - bb = newByteBuf(8, encodingType.endsWith("_BE")); - bb.writeLong(((LongExpression)valueArg).getLong()); + + FunctionCall call = (FunctionCall)e.getInput(); + String functionName = call.getName(); + if (!functionName.equalsIgnoreCase("byte_substr")) { + return false; } - break; - case "FLOAT": - if (valueArg instanceof FloatExpression && isEqualityFn) { - bb = newByteBuf(4, true); - bb.writeFloat(((FloatExpression)valueArg).getFloat()); + + LogicalExpression nameArg = call.args.get(0); + LogicalExpression valueArg1 = call.args.size() >= 2 ? call.args.get(1) : null; + LogicalExpression valueArg2 = call.args.size() >= 3 ? call.args.get(2) : null; + + if (((nameArg instanceof SchemaPath) == false) || + (valueArg1 == null) || ((valueArg1 instanceof IntExpression) == false) || + (valueArg2 == null) || ((valueArg2 instanceof IntExpression) == false)) { + return false; } - break; - case "DOUBLE": - if (valueArg instanceof DoubleExpression && isEqualityFn) { - bb = newByteBuf(8, true); - bb.writeDouble(((DoubleExpression)valueArg).getDouble()); + + boolean isRowKey = ((SchemaPath)nameArg).getAsUnescapedPath().equals(DrillHBaseConstants.ROW_KEY); + int offset = ((IntExpression)valueArg1).getInt(); + + if (!isRowKey || (offset != 1)) { + return false; + } + + this.path = (SchemaPath)nameArg; + prefixLength = ((IntExpression)valueArg2).getInt(); + this.isRowKeyPrefixComparison = true; + return visitRowKeyPrefixConvertExpression(e, prefixLength, valueArg); + } + + if (e.getInput() instanceof SchemaPath) { + ByteBuf bb = null; + + switch (encodingType) { + case "INT_BE": + case "INT": + case "UINT_BE": + case "UINT": + case "UINT4_BE": + case "UINT4": + if (valueArg instanceof IntExpression + && (isEqualityFn || encodingType.startsWith("U"))) { + bb = newByteBuf(4, encodingType.endsWith("_BE")); + bb.writeInt(((IntExpression)valueArg).getInt()); + } + break; + case "BIGINT_BE": + case "BIGINT": + case "UINT8_BE": + case "UINT8": + if (valueArg instanceof LongExpression + && (isEqualityFn || encodingType.startsWith("U"))) { + bb = newByteBuf(8, encodingType.endsWith("_BE")); + bb.writeLong(((LongExpression)valueArg).getLong()); + } + break; + case "FLOAT": + if (valueArg instanceof FloatExpression && isEqualityFn) { + bb = newByteBuf(4, true); + bb.writeFloat(((FloatExpression)valueArg).getFloat()); + } + break; + case "DOUBLE": + if (valueArg instanceof DoubleExpression && isEqualityFn) { + bb = newByteBuf(8, true); + bb.writeDouble(((DoubleExpression)valueArg).getDouble()); + } + break; + case "TIME_EPOCH": + case "TIME_EPOCH_BE": + if (valueArg instanceof TimeExpression) { + bb = newByteBuf(8, encodingType.endsWith("_BE")); + bb.writeLong(((TimeExpression)valueArg).getTime()); + } + break; + case "DATE_EPOCH": + case "DATE_EPOCH_BE": + if (valueArg instanceof DateExpression) { + bb = newByteBuf(8, encodingType.endsWith("_BE")); + bb.writeLong(((DateExpression)valueArg).getDate()); + } + break; + case "BOOLEAN_BYTE": + if (valueArg instanceof BooleanExpression) { + bb = newByteBuf(1, false /* does not matter */); + bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0); + } + break; + case "DOUBLE_OB": + case "DOUBLE_OBD": + if (valueArg instanceof DoubleExpression) { + bb = newByteBuf(9, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br, + ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br, + ((DoubleExpression)valueArg).getDouble(), Order.ASCENDING); + } + } + break; + case "FLOAT_OB": + case "FLOAT_OBD": + if (valueArg instanceof FloatExpression) { + bb = newByteBuf(5, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br, + ((FloatExpression)valueArg).getFloat(), Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br, + ((FloatExpression)valueArg).getFloat(), Order.ASCENDING); + } + } + break; + case "BIGINT_OB": + case "BIGINT_OBD": + if (valueArg instanceof LongExpression) { + bb = newByteBuf(9, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br, + ((LongExpression)valueArg).getLong(), Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br, + ((LongExpression)valueArg).getLong(), Order.ASCENDING); + } + } + break; + case "INT_OB": + case "INT_OBD": + if (valueArg instanceof IntExpression) { + bb = newByteBuf(5, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br, + ((IntExpression)valueArg).getInt(), Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br, + ((IntExpression)valueArg).getInt(), Order.ASCENDING); + } + } + break; + case "UTF8_OB": + case "UTF8_OBD": + if (valueArg instanceof QuotedString) { + int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length; + bb = newByteBuf(stringLen + 2, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br, + ((QuotedString)valueArg).value, Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br, + ((QuotedString)valueArg).value, Order.ASCENDING); + } + } + break; + case "UTF8": + // let visitSchemaPath() handle this. + return e.getInput().accept(this, valueArg); + } + + if (bb != null) { + this.value = bb.array(); + this.path = (SchemaPath)e.getInput(); + return true; } - break; - case "TIME_EPOCH": - case "TIME_EPOCH_BE": - if (valueArg instanceof TimeExpression) { - bb = newByteBuf(8, encodingType.endsWith("_BE")); - bb.writeLong(((TimeExpression)valueArg).getTime()); + } + } + return false; + } + + private Boolean visitRowKeyPrefixConvertExpression(ConvertExpression e, + int prefixLength, LogicalExpression valueArg) { + String encodingType = e.getEncodingType(); + rowKeyPrefixStartRow = HConstants.EMPTY_START_ROW; + rowKeyPrefixStopRow = HConstants.EMPTY_START_ROW; + rowKeyPrefixFilter = null; + + if ((encodingType.compareTo("UINT4_BE") == 0) || + (encodingType.compareTo("UINT_BE") == 0)) { + if (prefixLength != 4) { + throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix"); + } + + int val; + if ((valueArg instanceof IntExpression) == false) { + return false; + } + + val = ((IntExpression)valueArg).getInt(); + + // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >= + switch (functionName) { + case "equal": + rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(4).putInt(val).array()); + rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array(); + rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array(); + return true; + case "greater_than_or_equal_to": + rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array(); + return true; + case "greater_than": + rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val + 1).array(); + return true; + case "less_than_or_equal_to": + rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array(); + return true; + case "less_than": + rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val).array(); + return true; + } + + return false; + } + + if ((encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) || + (encodingType.compareTo("TIME_EPOCH_BE") == 0) || + (encodingType.compareTo("UINT8_BE") == 0)) { + + if (prefixLength != 8) { + throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix"); + } + + long val; + if (encodingType.compareTo("TIME_EPOCH_BE") == 0) { + if ((valueArg instanceof TimeExpression) == false) { + return false; } - break; - case "DATE_EPOCH": - case "DATE_EPOCH_BE": - if (valueArg instanceof DateExpression) { - bb = newByteBuf(8, encodingType.endsWith("_BE")); - bb.writeLong(((DateExpression)valueArg).getDate()); + + val = ((TimeExpression)valueArg).getTime(); + } else if (encodingType.compareTo("UINT8_BE") == 0){ + if ((valueArg instanceof LongExpression) == false) { + return false; } - break; - case "BOOLEAN_BYTE": - if (valueArg instanceof BooleanExpression) { - bb = newByteBuf(1, false /* does not matter */); - bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0); + + val = ((LongExpression)valueArg).getLong(); + } else if (encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) { + if ((valueArg instanceof TimeStampExpression) == false) { + return false; } - break; - case "UTF8": - // let visitSchemaPath() handle this. - return e.getInput().accept(this, valueArg); + + val = ((TimeStampExpression)valueArg).getTimeStamp(); + } else { + // Should not reach here. + return false; } - if (bb != null) { - this.value = bb.array(); - this.path = (SchemaPath)e.getInput(); + // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >= + switch (functionName) { + case "equal": + rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(8).putLong(val).array()); + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array(); + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array(); + return true; + case "greater_than_or_equal_to": + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array(); return true; + case "greater_than": + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val + 1).array(); + return true; + case "less_than_or_equal_to": + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array(); + return true; + case "less_than": + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val).array(); + return true; + } + + return false; + } + + if (encodingType.compareTo("DATE_EPOCH_BE") == 0) { + if ((valueArg instanceof DateExpression) == false) { + return false; + } + + if (prefixLength != 8) { + throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix"); } + + final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24; + long dateToSet; + // For DATE encoding, the operators that we push-down are =, <>, <, <=, >, >= + switch (functionName) { + case "equal": + long startDate = ((DateExpression)valueArg).getDate(); + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(startDate).array(); + long stopDate = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY; + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(stopDate).array(); + return true; + case "greater_than_or_equal_to": + dateToSet = ((DateExpression)valueArg).getDate(); + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array(); + return true; + case "greater_than": + dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY; + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array(); + return true; + case "less_than_or_equal_to": + dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY; + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array(); + return true; + case "less_than": + dateToSet = ((DateExpression)valueArg).getDate(); + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array(); + return true; + } + + return false; } + return false; } @@ -237,4 +544,4 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr .build(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java index 45fe22f..a4a3938 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.NullComparator; +import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; @@ -159,6 +160,7 @@ public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void String functionName = processor.getFunctionName(); SchemaPath field = processor.getPath(); byte[] fieldValue = processor.getValue(); + boolean sortOrderAscending = processor.isSortOrderAscending(); boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY); if (!(isRowKey || (!field.getRootSegment().isLastPath() @@ -172,6 +174,10 @@ public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void return null; } + if (processor.isRowKeyPrefixComparison()) { + return createRowKeyPrefixScanSpec(call, processor); + } + CompareOp compareOp = null; boolean isNullTest = false; ByteArrayComparable comparator = new BinaryComparator(fieldValue); @@ -189,29 +195,59 @@ public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void compareOp = CompareOp.NOT_EQUAL; break; case "greater_than_or_equal_to": - compareOp = CompareOp.GREATER_OR_EQUAL; - if (isRowKey) { - startRow = fieldValue; + if (sortOrderAscending) { + compareOp = CompareOp.GREATER_OR_EQUAL; + if (isRowKey) { + startRow = fieldValue; + } + } else { + compareOp = CompareOp.LESS_OR_EQUAL; + if (isRowKey) { + // stopRow should be just greater than 'value' + stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + } } break; case "greater_than": - compareOp = CompareOp.GREATER; - if (isRowKey) { - // startRow should be just greater than 'value' - startRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + if (sortOrderAscending) { + compareOp = CompareOp.GREATER; + if (isRowKey) { + // startRow should be just greater than 'value' + startRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + } + } else { + compareOp = CompareOp.LESS; + if (isRowKey) { + stopRow = fieldValue; + } } break; case "less_than_or_equal_to": - compareOp = CompareOp.LESS_OR_EQUAL; - if (isRowKey) { - // stopRow should be just greater than 'value' - stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + if (sortOrderAscending) { + compareOp = CompareOp.LESS_OR_EQUAL; + if (isRowKey) { + // stopRow should be just greater than 'value' + stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + } + } else { + compareOp = CompareOp.GREATER_OR_EQUAL; + if (isRowKey) { + startRow = fieldValue; + } } break; case "less_than": - compareOp = CompareOp.LESS; - if (isRowKey) { - stopRow = fieldValue; + if (sortOrderAscending) { + compareOp = CompareOp.LESS; + if (isRowKey) { + stopRow = fieldValue; + } + } else { + compareOp = CompareOp.GREATER; + if (isRowKey) { + // startRow should be just greater than 'value' + startRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + } } break; case "isnull": @@ -299,4 +335,19 @@ public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void return null; } + private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call, + CompareFunctionsProcessor processor) { + byte[] startRow = processor.getRowKeyPrefixStartRow(); + byte[] stopRow = processor.getRowKeyPrefixStopRow(); + Filter filter = processor.getRowKeyPrefixFilter(); + + if (startRow != HConstants.EMPTY_START_ROW || + stopRow != HConstants.EMPTY_END_ROW || + filter != null) { + return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter); + } + + // else + return null; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java index f2e6ceb..b048e37 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java @@ -43,12 +43,16 @@ public class MapRDBFormatMatcher extends FormatMatcher { @Override public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException { FileStatus status = selection.getFirstPath(fs); - if (status instanceof MapRFileStatus) { - if (((MapRFileStatus) status).isTable()) { - return new FormatSelection(getFormatPlugin().getConfig(), selection); - } + if (!isFileReadable(fs, status)) { + return null; } - return null; + + return new FormatSelection(getFormatPlugin().getConfig(), selection); + } + + @Override + public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException { + return (status instanceof MapRFileStatus) && ((MapRFileStatus) status).isTable(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java index eb72db0..6a10d22 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java @@ -99,7 +99,7 @@ public class MapRDBFormatPlugin implements FormatPlugin { @Override @JsonIgnore public Set<StoragePluginOptimizerRule> getOptimizerRules() { - return ImmutableSet.of(MapRDBPushFilterIntoScan.INSTANCE); + return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java index 7d79a8e..50f3d95 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java @@ -24,71 +24,121 @@ import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.FilterPrel; import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.hbase.HBaseScanSpec; import org.apache.calcite.rel.RelNode; import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rex.RexNode; import com.google.common.collect.ImmutableList; -public class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule { +public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class); - public static final StoragePluginOptimizerRule INSTANCE = new MapRDBPushFilterIntoScan(); +// public static final StoragePluginOptimizerRule INSTANCE = new MapRDBPushFilterIntoScan(); - private MapRDBPushFilterIntoScan() { - super(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan"); + private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) { + super(operand, description); } - @Override - public void onMatch(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(1); - final FilterPrel filter = (FilterPrel) call.rel(0); - final RexNode condition = filter.getCondition(); - - MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan(); - if (groupScan.isFilterPushedDown()) { - /* - * The rule can get triggered again due to the transformed "scan => filter" sequence - * created by the earlier execution of this rule when we could not do a complete - * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon - * this flag to not do a re-processing of the rule on the already transformed call. - */ - return; + public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") { + + @Override + public void onMatch(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(1); + final FilterPrel filter = (FilterPrel) call.rel(0); + final RexNode condition = filter.getCondition(); + + MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan(); + if (groupScan.isFilterPushedDown()) { + /* + * The rule can get triggered again due to the transformed "scan => filter" sequence + * created by the earlier execution of this rule when we could not do a complete + * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon + * this flag to not do a re-processing of the rule on the already transformed call. + */ + return; + } + + doPushFilterToScan(call, filter, null, scan, groupScan, condition); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(1); + if (scan.getGroupScan() instanceof MapRDBGroupScan) { + return super.matches(call); + } + return false; } + }; - LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); - MapRDBFilterBuilder hbaseFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp); - HBaseScanSpec newScanSpec = hbaseFilterBuilder.parseTree(); + public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") { + + @Override + public void onMatch(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(2); + final ProjectPrel project = (ProjectPrel) call.rel(1); + final FilterPrel filter = (FilterPrel) call.rel(0); + + MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan(); + if (groupScan.isFilterPushedDown()) { + /* + * The rule can get triggered again due to the transformed "scan => filter" sequence + * created by the earlier execution of this rule when we could not do a complete + * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon + * this flag to not do a re-processing of the rule on the already transformed call. + */ + return; + } + + // convert the filter to one that references the child of the project + final RexNode condition = RelOptUtil.pushFilterPastProject(filter.getCondition(), project); + + doPushFilterToScan(call, filter, project, scan, groupScan, condition); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(2); + if (scan.getGroupScan() instanceof MapRDBGroupScan) { + return super.matches(call); + } + return false; + } + }; + + protected void doPushFilterToScan(final RelOptRuleCall call, final FilterPrel filter, final ProjectPrel project, final ScanPrel scan, final MapRDBGroupScan groupScan, final RexNode condition) { + + final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); + final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp); + final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree(); if (newScanSpec == null) { return; //no filter pushdown ==> No transformation. } - final MapRDBGroupScan newGroupsScan = new MapRDBGroupScan(groupScan.getUserName(), - groupScan.getStoragePlugin(), groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns()); + final MapRDBGroupScan newGroupsScan = new MapRDBGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(), + groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns()); newGroupsScan.setFilterPushedDown(true); final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType()); - if (hbaseFilterBuilder.isAllExpressionsConverted()) { - /* - * Since we could convert the entire filter condition expression into an HBase filter, - * we can eliminate the filter operator altogether. - */ - call.transformTo(newScanPrel); - } else { - call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of((RelNode)newScanPrel))); - } - } - @Override - public boolean matches(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(1); - if (scan.getGroupScan() instanceof MapRDBGroupScan) { - return super.matches(call); + // Depending on whether is a project in the middle, assign either scan or copy of project to childRel. + final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));; + + if (maprdbFilterBuilder.isAllExpressionsConverted()) { + /* + * Since we could convert the entire filter condition expression into an HBase filter, + * we can eliminate the filter operator altogether. + */ + call.transformTo(childRel); + } else { + call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel))); } - return false; } }