Repository: parquet-mr Updated Branches: refs/heads/master c44f982e8 -> fb46b941f
PARQUET-397: Implement Pig predicate pushdown This is based on #296 from @danielcweeks and implements a few remaining review items. Closes #296. Author: Daniel Weeks <[email protected]> Author: Ryan Blue <[email protected]> Closes #331 from rdblue/PARQUET-397-pig-predicate-pushdown and squashes the following commits: c7a9b02 [Ryan Blue] PARQUET-397: Address review comments. 54e23a6 [Ryan Blue] PARQUET-397: Update Pig PPD to throw for bad expressions. 388099b [Daniel Weeks] Cleaning up imports 6b405b4 [Daniel Weeks] Merge remote-tracking branch 'rdblue/pig-predicate-pushdown' into pig-predicate-pushdown f1ef73e [Daniel Weeks] Fixed binary type and storing filter predicate a39fdff [Ryan Blue] WIP: Handle a few error cases in Pig predicate pushdown. 2666849 [Daniel Weeks] Fixed test to check the actual number of materialized rows from the reader 7b019a6 [Daniel Weeks] update tests and logging f8ca447 [Daniel Weeks] Add predicate pushdown using filter2 api Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/fb46b941 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/fb46b941 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/fb46b941 Branch: refs/heads/master Commit: fb46b941f7763314d667c437c06b1675e61c3d38 Parents: c44f982 Author: Daniel Weeks <[email protected]> Authored: Fri Feb 26 10:28:07 2016 -0800 Committer: Ryan Blue <[email protected]> Committed: Fri Feb 26 10:28:07 2016 -0800 ---------------------------------------------------------------------- .../org/apache/parquet/pig/ParquetLoader.java | 186 ++++++++++++++++++- .../apache/parquet/pig/TestParquetLoader.java | 57 ++++-- pom.xml | 4 +- 3 files changed, 232 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/fb46b941/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java ---------------------------------------------------------------------- diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java index 0575dce..be54aa8 100644 --- a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java +++ b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java @@ -30,9 +30,13 @@ import static org.apache.parquet.pig.TupleReadSupport.PARQUET_PIG_REQUIRED_FIELD import static org.apache.parquet.pig.TupleReadSupport.PARQUET_COLUMN_INDEX_ACCESS; import static org.apache.parquet.pig.TupleReadSupport.getPigSchemaFromMultipleFiles; +import static org.apache.parquet.filter2.predicate.FilterApi.*; + import java.io.IOException; import java.lang.ref.Reference; import java.lang.ref.SoftReference; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.WeakHashMap; @@ -42,9 +46,17 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.LogicalInverseRewriter; +import org.apache.parquet.filter2.predicate.Operators; +import org.apache.parquet.io.api.Binary; import org.apache.pig.Expression; +import org.apache.pig.Expression.BetweenExpression; +import org.apache.pig.Expression.InExpression; +import org.apache.pig.Expression.UnaryExpression; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; +import org.apache.pig.LoadPredicatePushdown; import org.apache.pig.LoadPushDown; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; @@ -57,6 +69,11 @@ import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.parser.ParserException; +import static org.apache.pig.Expression.BinaryExpression; +import static org.apache.pig.Expression.Column; +import static org.apache.pig.Expression.Const; +import static org.apache.pig.Expression.OpType; + import org.apache.parquet.Log; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.metadata.GlobalMetaData; @@ -70,9 +87,12 @@ import org.apache.parquet.io.ParquetDecodingException; * @author Julien Le Dem * */ -public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown { +public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown, LoadPredicatePushdown { private static final Log LOG = Log.getLog(ParquetLoader.class); + public static final String ENABLE_PREDICATE_FILTER_PUSHDOWN = "parquet.pig.predicate.pushdown.enable"; + private static final boolean DEFAULT_PREDICATE_PUSHDOWN_ENABLED = false; + // Using a weak hash map will ensure that the cache will be gc'ed when there is memory pressure static final Map<String, Reference<ParquetInputFormat<Tuple>>> inputFormatCache = new WeakHashMap<String, Reference<ParquetInputFormat<Tuple>>>(); @@ -172,6 +192,11 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema)); getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList)); getConfiguration(job).set(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess)); + + FilterPredicate filterPredicate = (FilterPredicate) getFromUDFContext(ParquetInputFormat.FILTER_PREDICATE); + if(filterPredicate != null) { + ParquetInputFormat.setFilterPredicate(getConfiguration(job), filterPredicate); + } } @Override @@ -392,4 +417,163 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow return s; } + @Override + public List<String> getPredicateFields(String s, Job job) throws IOException { + if(!job.getConfiguration().getBoolean(ENABLE_PREDICATE_FILTER_PUSHDOWN, DEFAULT_PREDICATE_PUSHDOWN_ENABLED)) { + return null; + } + + List<String> fields = new ArrayList<String>(); + + for(FieldSchema field : schema.getFields()) { + switch(field.type) { + case DataType.BOOLEAN: + case DataType.INTEGER: + case DataType.LONG: + case DataType.FLOAT: + case DataType.DOUBLE: + case DataType.CHARARRAY: + fields.add(field.alias); + break; + default: + // Skip BYTEARRAY, TUPLE, MAP, BAG, DATETIME, BIGINTEGER, BIGDECIMAL + break; + } + } + + return fields; + } + + @Override + public List<Expression.OpType> getSupportedExpressionTypes() { + OpType supportedTypes [] = { + OpType.OP_EQ, + OpType.OP_NE, + OpType.OP_GT, + OpType.OP_GE, + OpType.OP_LT, + OpType.OP_LE, + OpType.OP_AND, + OpType.OP_OR, + //OpType.OP_BETWEEN, // not implemented in Pig yet + //OpType.OP_IN, // not implemented in Pig yet + OpType.OP_NOT + }; + + return Arrays.asList(supportedTypes); + } + + @Override + public void setPushdownPredicate(Expression e) throws IOException { + LOG.info("Pig pushdown expression: " + e); + + FilterPredicate pred = buildFilter(e); + LOG.info("Parquet filter predicate expression: " + pred); + + storeInUDFContext(ParquetInputFormat.FILTER_PREDICATE, pred); + } + + private FilterPredicate buildFilter(Expression e) { + OpType op = e.getOpType(); + + if (e instanceof BinaryExpression) { + Expression lhs = ((BinaryExpression) e).getLhs(); + Expression rhs = ((BinaryExpression) e).getRhs(); + + switch (op) { + case OP_AND: + return and(buildFilter(lhs), buildFilter(rhs)); + case OP_OR: + return or(buildFilter(lhs), buildFilter(rhs)); + case OP_BETWEEN: + BetweenExpression between = (BetweenExpression) rhs; + return and( + buildFilter(OpType.OP_GE, (Column) lhs, (Const) between.getLower()), + buildFilter(OpType.OP_LE, (Column) lhs, (Const) between.getUpper())); + case OP_IN: + FilterPredicate current = null; + for (Object value : ((InExpression) rhs).getValues()) { + FilterPredicate next = buildFilter(OpType.OP_EQ, (Column) lhs, (Const) value); + if (current != null) { + current = or(current, next); + } else { + current = next; + } + } + return current; + } + + if (lhs instanceof Column && rhs instanceof Const) { + return buildFilter(op, (Column) lhs, (Const) rhs); + } else if (lhs instanceof Const && rhs instanceof Column) { + return buildFilter(op, (Column) rhs, (Const) lhs); + } + } else if (e instanceof UnaryExpression && op == OpType.OP_NOT) { + return LogicalInverseRewriter.rewrite( + not(buildFilter(((UnaryExpression) e).getExpression()))); + } + + throw new RuntimeException("Could not build filter for expression: " + e); + } + + private FilterPredicate buildFilter(OpType op, Column col, Const value) { + String name = col.getName(); + try { + FieldSchema f = schema.getField(name); + switch (f.type) { + case DataType.BOOLEAN: + Operators.BooleanColumn boolCol = booleanColumn(name); + switch(op) { + case OP_EQ: return eq(boolCol, getValue(value, boolCol.getColumnType())); + case OP_NE: return notEq(boolCol, getValue(value, boolCol.getColumnType())); + default: throw new RuntimeException( + "Operation " + op + " not supported for boolean column: " + name); + } + case DataType.INTEGER: + Operators.IntColumn intCol = intColumn(name); + return op(op, intCol, value); + case DataType.LONG: + Operators.LongColumn longCol = longColumn(name); + return op(op, longCol, value); + case DataType.FLOAT: + Operators.FloatColumn floatCol = floatColumn(name); + return op(op, floatCol, value); + case DataType.DOUBLE: + Operators.DoubleColumn doubleCol = doubleColumn(name); + return op(op, doubleCol, value); + case DataType.CHARARRAY: + Operators.BinaryColumn binaryCol = binaryColumn(name); + return op(op, binaryCol, value); + default: + throw new RuntimeException("Unsupported type " + f.type + " for field: " + name); + } + } catch (FrontendException e) { + throw new RuntimeException("Error processing pushdown for column:" + col, e); + } + } + + private static <C extends Comparable<C>, COL extends Operators.Column<C> & Operators.SupportsLtGt> + FilterPredicate op(Expression.OpType op, COL col, Const valueExpr) { + C value = getValue(valueExpr, col.getColumnType()); + switch (op) { + case OP_EQ: return eq(col, value); + case OP_NE: return notEq(col, value); + case OP_GT: return gt(col, value); + case OP_GE: return gtEq(col, value); + case OP_LT: return lt(col, value); + case OP_LE: return ltEq(col, value); + } + return null; + } + + private static <C extends Comparable<C>> C getValue(Const valueExpr, Class<C> type) { + Object value = valueExpr.getValue(); + + if (value instanceof String) { + value = Binary.fromString((String) value); + } + + return type.cast(value); + } + } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/fb46b941/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java ---------------------------------------------------------------------- diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java index 6f11538..8e57424 100644 --- a/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java +++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java @@ -22,17 +22,21 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.ExecType; import org.apache.pig.LoadPushDown.RequiredField; import org.apache.pig.LoadPushDown.RequiredFieldList; import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.builtin.mock.Storage; import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.DataType; import static org.apache.pig.data.DataType.*; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.tools.pigstats.JobStats; import org.junit.Assert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals; @@ -175,11 +179,11 @@ public class TestParquetLoader { for (int i = 0; i < rows; i++) { list.add(Storage.tuple(i, "a"+i, i*2)); } - data.set("in", "i:int, a:chararray, b:int", list ); + data.set("in", "i:int, a:chararray, b:int", list); pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); pigServer.deleteFile(out); - pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();"); + pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName() + "();"); pigServer.executeBatch(); //Test Null Padding at the end @@ -212,7 +216,7 @@ public class TestParquetLoader { for (int i = 0; i < rows; i++) { list.add(Storage.tuple(i, (long)i, (float)i, (double)i, Integer.toString(i), Boolean.TRUE)); } - data.set("in", "i:int, l:long, f:float, d:double, s:chararray, b:boolean", list ); + data.set("in", "i:int, l:long, f:float, d:double, s:chararray, b:boolean", list); pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); pigServer.deleteFile(out); @@ -268,11 +272,11 @@ public class TestParquetLoader { pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); pigServer.deleteFile(out); - pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();"); + pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName() + "();"); pigServer.executeBatch(); //Test Null Padding at the end - pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()+"('n1:int, n2:double, n3:long, n4:chararray', 'true');"); + pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName() + "('n1:int, n2:double, n3:long, n4:chararray', 'true');"); pigServer.registerQuery("STORE B into 'out' using mock.Storage();"); pigServer.executeBatch(); @@ -285,7 +289,7 @@ public class TestParquetLoader { assertEquals(4, t.size()); assertEquals(i, t.get(0)); - assertEquals(i*1.0, t.get(1)); + assertEquals(i * 1.0, t.get(1)); assertEquals(i*2L, t.get(2)); assertEquals("v"+i, t.get(3)); } @@ -306,10 +310,10 @@ public class TestParquetLoader { pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); pigServer.deleteFile(out); - pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();"); + pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName() + "();"); pigServer.executeBatch(); - pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()+"('n1:int, n2:double, n3:long, n4:chararray', 'true');"); + pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName() + "('n1:int, n2:double, n3:long, n4:chararray', 'true');"); pigServer.registerQuery("C = foreach B generate n1, n3;"); pigServer.registerQuery("STORE C into 'out' using mock.Storage();"); pigServer.executeBatch(); @@ -325,10 +329,39 @@ public class TestParquetLoader { assertEquals(i, t.get(0)); assertEquals(i*2L, t.get(1)); } - } - + } + @Test - public void testRead() { - + public void testPredicatePushdown() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(ParquetLoader.ENABLE_PREDICATE_FILTER_PUSHDOWN, true); + + PigServer pigServer = new PigServer(ExecType.LOCAL, conf); + pigServer.setValidateEachStatement(true); + + String out = "target/out"; + String out2 = "target/out2"; + int rows = 10; + Data data = Storage.resetData(pigServer); + List<Tuple> list = new ArrayList<Tuple>(); + for (int i = 0; i < rows; i++) { + list.add(Storage.tuple(i, i*1.0, i*2L, "v"+i)); + } + data.set("in", "c1:int, c2:double, c3:long, c4:chararray", list); + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); + pigServer.deleteFile(out); + pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName() + "();"); + pigServer.executeBatch(); + + pigServer.deleteFile(out2); + pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName() + "('c1:int, c2:double, c3:long, c4:chararray');"); + pigServer.registerQuery("C = FILTER B by c1 == 1 or c1 == 5;"); + pigServer.registerQuery("STORE C into '" + out2 +"' using mock.Storage();"); + List<ExecJob> jobs = pigServer.executeBatch(); + + long recordsRead = jobs.get(0).getStatistics().getInputStats().get(0).getNumberRecords(); + + assertEquals(2, recordsRead); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/fb46b941/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 98fd862..f606faa 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,7 @@ <!-- scala.binary.version is used for projects that fetch dependencies that are in scala --> <scala.binary.version>2.10</scala.binary.version> <scala.maven.test.skip>false</scala.maven.test.skip> - <pig.version>0.11.1</pig.version> + <pig.version>0.14.0</pig.version> <pig.classifier/> <thrift.version>0.7.0</thrift.version> <fastutil.version>6.5.7</fastutil.version> @@ -524,7 +524,7 @@ <!-- test hadoop-1 with the same jars that were produced for default profile --> <maven.main.skip>true</maven.main.skip> <hadoop.version>2.3.0</hadoop.version> - <pig.version>0.13.0</pig.version> + <pig.version>0.14.0</pig.version> <pig.classifier>h2</pig.classifier> </properties> </profile>
