Hi everyone, I am trying to improve the performance of data loading from disk. For that I have implemented my own RDD and now I am trying to increase the performance with predicate pushdown. I have used many sources including the documentations and https://www.slideshare.net/databricks/yin-huai-20150325meetupwithdemos.
What I hope to achieve is accepting the filters on: `public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) ` and use them for filtering the data loaded to the dataframe. I have not implemented any Filters and from my understanding all the basic filters should be built in (eq, gt etc...). The interesting part of my code: public class MyProvider implements RelationProvider { @Override public BaseRelation createRelation(SQLContext sqlContext, scala.collection.immutable.Map<String, String> parameters) { System.out.println("createRelation"); return new MylRelation(sqlContext, JavaConversions.mapAsJavaMap (parameters)); } } public class MyRelation extends BaseRelation implements TableScan, PrunedScan, PrunedFilteredScan { @Override public StructType schema() { return ExpressionEncoder.javaBean(EventBean.class).schema(); } public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) { System.out.println(Arrays.toString(requiredColumns)); System.out.println(Arrays.toString(filters)); } } of course I implemented the other 2 versions of buildScan. I use it in the following manner: SQLContext sqc = sparkSession.sqlContext(); Dataset<Row> eventDataFrame = sqc.load("com.MyProvider", loadingOptions); eventDataFrame.createOrReplaceTempView( "events" ); Dataset<Row> sqlResult = sparkSession.sql( query ); System.out.println(sqlResult.queryExecution().executedPlan().toString()); Where the query looks like this: "SELECT field1,field2,field3 from events WHERE field1>3" The original Bean has 26 fields and the requiredColumns array has the relevant fields only which is good. The Filters array is empty regardless what I do. I also tried: SQLContext sqc = sparkSession.sqlContext(); Dataset<Row> eventDataFrame = sqc.load("com.myProvider", loadingOptions); eventDataFrame.createOrReplaceTempView( "events" ); Dataset<Row> sqlResult = eventDataFrame.select("field1", "field2").filter( "field1>4"); Another thing that bothers me is in the planning, I don't see the catalyst optimization printing, maybe it's not being optimized? The planning looks like this: *Filter (cast(field1#27 as int) > 4) +- *Scan MyRelation @3a709cc7 [field1#27,field2#26] ReadSchema: struct<field1:smallint,field2:smallint> Thank you for your time and support. Anton P.