HAWQ-1115. Implement filter-push down for IN on PXF service side.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/3988fa9d Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/3988fa9d Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/3988fa9d Branch: refs/heads/HAWQ-1114_ Commit: 3988fa9d650bfce07c9951abfe52f093f148b3ee Parents: f536900 Author: Oleksandr Diachenko <odiache...@pivotal.io> Authored: Tue Nov 22 15:37:15 2016 -0800 Committer: Oleksandr Diachenko <odiache...@pivotal.io> Committed: Tue Nov 22 15:37:15 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hawq/pxf/api/FilterParser.java | 56 ++++++++++++++++++-- .../org/apache/hawq/pxf/api/io/DataType.java | 19 +++++++ .../apache/hawq/pxf/api/FilterParserTest.java | 22 ++++++++ .../hawq/pxf/plugins/hive/HiveORCAccessor.java | 9 ++++ .../pxf/plugins/hive/HiveORCAccessorTest.java | 15 ++++++ .../plugins/hive/HiveORCSearchArgumentTest.java | 42 ++++++++++++--- 6 files changed, 151 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java index e362eed..ff0d972 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java @@ -25,6 +25,8 @@ import org.apache.hawq.pxf.api.io.DataType; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; import java.util.Stack; /** @@ -65,11 +67,13 @@ public class FilterParser { private Stack<Object> operandsStack; private FilterBuilder filterBuilder; public static final char COL_OP = 'a'; - public static final char CONST_OP = 'c'; + public static final char SCALAR_CONST_OP = 'c'; + public static final char LIST_CONST_OP = 'm'; public static final char CONST_LEN = 's'; public static final char CONST_DATA = 'd'; public static final char COMP_OP = 'o'; public static final char LOG_OP = 'l'; + public static final String DEFAULT_CHARSET = "UTF-8"; /** Supported operations by the parser. */ @@ -83,7 +87,8 @@ public class FilterParser { HDOP_NE, HDOP_LIKE, HDOP_IS_NULL, - HDOP_IS_NOT_NULL + HDOP_IS_NOT_NULL, + HDOP_IN } /** @@ -213,8 +218,11 @@ public class FilterParser { case COL_OP: operandsStack.push(new ColumnIndex(safeToInt(parseNumber()))); break; - case CONST_OP: - operandsStack.push(new Constant(parseParameter())); + case SCALAR_CONST_OP: + operandsStack.push(new Constant(parseScalarParameter())); + break; + case LIST_CONST_OP: + operandsStack.push(new Constant(parseListParameter())); break; case COMP_OP: opNumber = safeToInt(parseNumber()); @@ -354,6 +362,10 @@ public class FilterParser { } private Object convertDataType(byte[] byteData, int start, int end, DataType dataType) throws Exception { + + if (byteData.length < end) + throw new FilterStringSyntaxException("filter string is shorter than expected"); + String data = new String(byteData, start, end-start, DEFAULT_CHARSET); try { switch (dataType) { @@ -391,7 +403,7 @@ public class FilterParser { /** * Parses either a number or a string. */ - private Object parseParameter() throws Exception { + private Object parseScalarParameter() throws Exception { if (index == filterByteArr.length) { throw new FilterStringSyntaxException("argument should follow at " + index); } @@ -418,6 +430,40 @@ public class FilterParser { return data; } + private Object parseListParameter() throws Exception { + if (index == filterByteArr.length) { + throw new FilterStringSyntaxException("argument should follow at " + index); + } + + DataType dataType = DataType.get(parseConstDataType()); + List<Object> data = new ArrayList<Object>(); + if (dataType == DataType.UNSUPPORTED_TYPE) { + throw new FilterStringSyntaxException("invalid DataType OID at " + (index - 1)); + } + + if (dataType.getTypeElem() == null) { + throw new FilterStringSyntaxException("expected non-scalar datatype, but got datatype with oid = " + dataType.getOID()); + } + + while (((char) filterByteArr[index]) == CONST_LEN) { + int dataLength = parseDataLength(); + + if (index + dataLength > filterByteArr.length) { + throw new FilterStringSyntaxException("data size larger than filter string starting at " + index); + } + + if (((char) filterByteArr[index]) != CONST_DATA) { + throw new FilterStringSyntaxException("data delimiter 'd' expected at " + index); + } + + index++; + data.add(convertDataType(filterByteArr, index, index+dataLength, dataType.getTypeElem())); + index += dataLength; + } + + return data; + } + private Long parseNumber() throws Exception { if (index == filterByteArr.length) { throw new FilterStringSyntaxException("numeric argument expected at " + index); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java index cac700c..d3db038 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java @@ -49,17 +49,32 @@ public enum DataType { TIME(1083), TIMESTAMP(1114), NUMERIC(1700), + + INT2ARRAY(1005), + INT4ARRAY(1007), + INT8ARRAY(1016), + BOOLARRAY(1000), + TEXTARRAY(1009), + UNSUPPORTED_TYPE(-1); private static final Map<Integer, DataType> lookup = new HashMap<>(); static { + + INT2ARRAY.typeElem = SMALLINT; + INT4ARRAY.typeElem = INTEGER; + INT8ARRAY.typeElem = BIGINT; + BOOLARRAY.typeElem = BOOLEAN; + TEXTARRAY.typeElem = TEXT; + for (DataType dt : EnumSet.allOf(DataType.class)) { lookup.put(dt.getOID(), dt); } } private final int OID; + private DataType typeElem; DataType(int OID) { this.OID = OID; @@ -81,4 +96,8 @@ public enum DataType { public int getOID() { return OID; } + + public DataType getTypeElem() { + return typeElem; + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java index 46f60f1..b754253 100644 --- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java +++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java @@ -143,6 +143,24 @@ public class FilterParserTest { index = 6; exception = "failed to parse number data type starting at " + index; runParseNegative("const operand with an invalid value", filter, exception); + + filter = "m1122"; + index = 4; + exception = "invalid DataType OID at " + index; + runParseNegative("const operand with an invalid datatype", filter, exception); + + filter = "m20"; + exception = "expected non-scalar datatype, but got datatype with oid = 20"; + runParseNegative("const operand with an scalar datatype instead of list", filter, exception); + + filter = "m1007s1d1s1d2s2d3"; + exception = "filter string is shorter than expected"; + runParseNegative("const operand with list datatype, and \"d\" tag has less data than indicated in \"s\" tag", filter, exception); + + filter = "m1007s1d1s1d2s2d123"; + index = 18; + exception = "unknown opcode 3(51) at " + index; + runParseNegative("const operand with list datatype, and \"d\" tag has more data than indicated in \"s\" tag", filter, exception); } @Test @@ -272,6 +290,10 @@ public class FilterParserTest { op = Operation.HDOP_IS_NOT_NULL; runParseOneUnaryOperation("this filter was build from HDOP_IS_NULL", filter, op); + filter = "a1m1005s1d1s1d2s1d3o10"; + op = Operation.HDOP_IN; + runParseOneOperation("this filter was built from HDOP_IN", filter, op); + } @Test http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java index 9d79f97..dc195f4 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java @@ -195,6 +195,15 @@ public class HiveORCAccessor extends HiveAccessor { case HDOP_IS_NOT_NULL: builder.startNot().isNull(filterColumnName).end(); break; + case HDOP_IN: + if (filterValue instanceof List) { + @SuppressWarnings("unchecked") + List<Object> l = (List<Object>)filterValue; + builder.in(filterColumnName, l.toArray()); + } else { + throw new IllegalArgumentException("filterValue should be instace of List for HDOP_IN operation"); + } + break; default: { LOG.debug("Filter push-down is not supported for " + filter.getOperation() + "operation."); return false; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java index c5700b6..7bbe811 100644 --- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java +++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java @@ -103,4 +103,19 @@ public class HiveORCAccessorTest { assertEquals(sarg.toKryo(), jobConf.get(SARG_PUSHDOWN)); } + @Test + public void parseFilterWithIn() throws Exception { + + when(inputData.hasFilter()).thenReturn(true); + when(inputData.getFilterString()).thenReturn("a1m1007s1d1s1d2s1d3o10"); + when(columnDesc.columnName()).thenReturn("FOO"); + when(inputData.getColumn(1)).thenReturn(columnDesc); + + accessor.openForRead(); + + SearchArgument sarg = SearchArgumentFactory.newBuilder().startAnd().in("FOO", 1, 2, 3).end().build(); + + assertEquals(sarg.toKryo(), jobConf.get(SARG_PUSHDOWN)); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java index 382f065..74d87e7 100644 --- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java +++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java @@ -40,14 +40,25 @@ public class HiveORCSearchArgumentTest { HiveFilterBuilder eval = new HiveFilterBuilder(null); Object filter = eval.getFilterObject(filterStr); - Object current = filter; SearchArgument.Builder filterBuilder = SearchArgumentFactory.newBuilder(); buildExpression(filterBuilder, Arrays.asList(filter)); SearchArgument sarg = filterBuilder.build(); Assert.assertEquals("and(or(lt(col1, 5), not(lteq(col1, 1))), or(lt(col1, 5), lteq(col1, 3)))", sarg.toFilterPredicate().toString()); } - private void buildExpression(SearchArgument.Builder builder, List<Object> filterList) { + @Test + public void buildIn() throws Exception { + String filterStr = "a0m1009s4drow1s4drow2o10a1m1009s3ds_6s3ds_7o10l0"; + HiveFilterBuilder eval = new HiveFilterBuilder(null); + Object filter = eval.getFilterObject(filterStr); + + SearchArgument.Builder filterBuilder = SearchArgumentFactory.newBuilder(); + buildExpression(filterBuilder, Arrays.asList(filter)); + SearchArgument sarg = filterBuilder.build(); + Assert.assertEquals("and(or(eq(col1, Binary{\"row1\"}), eq(col1, Binary{\"row2\"})), or(eq(col1, Binary{\"s_6\"}), eq(col1, Binary{\"s_7\"})))", sarg.toFilterPredicate().toString()); + } + + private boolean buildExpression(SearchArgument.Builder builder, List<Object> filterList) { for (Object f : filterList) { if (f instanceof LogicalFilter) { switch(((LogicalFilter) f).getOperator()) { @@ -61,15 +72,21 @@ public class HiveORCSearchArgumentTest { builder.startNot(); break; } - buildExpression(builder, ((LogicalFilter) f).getFilterList()); - builder.end(); + if (buildExpression(builder, ((LogicalFilter) f).getFilterList())) { + builder.end(); + } else { + return false; + } } else { - buildArgument(builder, f); + if (!buildArgument(builder, f)) { + return false; + } } } + return true; } - private void buildArgument(SearchArgument.Builder builder, Object filterObj) { + private boolean buildArgument(SearchArgument.Builder builder, Object filterObj) { /* The below functions will not be compatible and requires update with Hive 2.0 APIs */ BasicFilter filter = (BasicFilter) filterObj; int filterColumnIndex = filter.getColumn().index(); @@ -97,7 +114,18 @@ public class HiveORCSearchArgumentTest { case HDOP_NE: builder.startNot().equals(filterColumnName, filterValue).end(); break; + case HDOP_IN: + if (filterValue instanceof List) { + @SuppressWarnings("unchecked") + List<Object> l = (List<Object>)filterValue; + builder.in(filterColumnName, l.toArray()); + } else { + throw new IllegalArgumentException("filterValue should be instace of List for HDOP_IN operation"); + } + break; + default: + return false; } - return; + return true; } }