Hi,

I am using spark v2.3.2. I have an implementation of DSV2. Here is what is
happening:

1) Obtained a dataframe using MyDataSource

scala> val df1 = spark.read.format("com.shubham.MyDataSource").load
> MyDataSource.MyDataSource
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader:
> Instantiated....com.shubham.reader.MyDataSourceReader@2b85edc7
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> df1: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more field]
>

2) show() on df1

> scala> df1.show
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pruneColumns:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> =======MyDataSourceReader.createBatchDataReaderFactories=======
> prunedSchema = StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> pushedFilters = []
> =======MyDataSourceReader.createBatchDataReaderFactories=======
> +---+---+---+
> | c1| c2| c3|
> +---+---+---+
> +---+---+---+
>

3) val df2 = df1.filter($"c3" > 1)

>
> scala> df2.show
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pruneColumns:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> =======MyDataSourceReader.createBatchDataReaderFactories=======
> prunedSchema = StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]
> =======MyDataSourceReader.createBatchDataReaderFactories=======
> +---+---+---+
> | c1| c2| c3|
> +---+---+---+
> +---+---+---+


4) Again df1.show() <=== As df2 is derived from df1(and share same instance
of MyDataSourceReader), this modifies pushedFilters even for df1

> scala> df1.show
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pruneColumns:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> =======MyDataSourceReader.createBatchDataReaderFactories=======
> prunedSchema = StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> *pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]*
> =======MyDataSourceReader.createBatchDataReaderFactories=======
> +---+---+---+
> | c1| c2| c3|
> +---+---+---+
> +---+---+---+
>

*pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)] *is not correct in step
4) as there were no filters specified for df1.

This is due to I am maintaining pushedFilters variable in MyDataSourceReader
which is modified by df2.filter().show.

Questions:
Q1: How to maintain this state in DataSourceReader implementations?
Q2: Shouldn't spark call pushFilters() method every time(regardless of
.filter() is present or not) we invoke some action, in the similar manner
as it calls pruneColumns()?

I understand that pushFilters() is only invoked when .filter() is there in
dataframe but as we saw in above scenario, it's making the state of
MyDataSourceReader inconsistent and hence the question Q2.

Minimal Code:

public class MyDataSource implements DataSourceRegister, DataSourceV2,
ReadSupport, WriteSupport {
>
>   public MyDataSource() {
>     System.out.println("MyDataSource.MyDataSource");
>   }
>
>   @Override
>   public DataSourceReader createReader(DataSourceOptions options) {
>     System.out.println("MyDataSource.createReader: Going to create a new 
> MyDataSourceReader");
>     return new MyDataSourceReader(options.asMap());
>   }
>
>   @Override
>   public Optional<DataSourceWriter> createWriter(String writeUUID, StructType 
> schema, SaveMode mode, DataSourceOptions options) {
>     System.out.println("MyDataSource.createWriter: Going to create a new 
> MyDataSourceWriter");
>     return Optional.of(new MyDataSourceWriter(schema));
>   }
>
>   @Override
>   public String shortName() {
>     return "com.shubham.MyDataSource";
>   }
> }
>
>
public class MyDataSourceReader implements DataSourceReader,
SupportsPushDownRequiredColumns, SupportsScanColumnarBatch,
SupportsPushDownFilters {
>
>   private Map<String, String> options;
>   private StructType baseSchema;
>   private StructType prunedSchema;
>   private Filter[] pushedFilters = new Filter[0];
>
>   public MyDataSourceReader(Map<String, String> options) {
>     System.out.println("MyDataSourceReader.MyDataSourceReader: 
> Instantiated...." + this);
>     this.options = options;
>   }
>
>   @Override
>   public StructType readSchema() {
>     this.baseSchema = (new StructType())
>         .add("c1", "int")
>         .add("c2", "int")
>         .add("c3", "int");
>     System.out.println("MyDataSourceReader.readSchema: " + this + " 
> baseSchema: " + this.baseSchema);
>     return this.baseSchema;
>   }
>
>
>   @Override
>   public Filter[] pushFilters(Filter[] filters) {
>     System.out.println("MyDataSourceReader.pushFilters: " + 
> Arrays.toString(filters));
>     // filters that can be pushed down.
>     // for this example, let's assume all the filters can be pushed down.
>     this.pushedFilters = filters;
>
>     // filter's that can't be pushed down.
>     return new Filter[0];
>   }
>
>   @Override
>   public Filter[] pushedFilters() {
>     //System.out.println("MyDataSourceReader.pushedFilters: " + 
> Arrays.toString(pushedFilters));
>     return this.pushedFilters;
>   }
>
>   @Override
>   public void pruneColumns(StructType requiredSchema) {
>     System.out.println("MyDataSourceReader.pruneColumns: " + requiredSchema);
>     this.prunedSchema = requiredSchema;
>   }
>
>   @Override
>   public List<DataReaderFactory<ColumnarBatch>> 
> createBatchDataReaderFactories() {
>     
> System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");
>     // do the actual operation with baseSchema, prunedSchema, pushedFilters
>
>     System.out.println("prunedSchema = " + prunedSchema);
>     System.out.println("pushedFilters = " + Arrays.toString(pushedFilters));
>
>     
> System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");
>
>     return new ArrayList<>();
>   }
>
> }
>
>
Thanks,
Shubham

Reply via email to