Thanks Assaf, you tried with *tags/v2.4.0-rc2?* Full Code:
MyDataSource is the entry point which simply creates Reader and Writer public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, SessionConfigSupport { @Override public DataSourceReader createReader(DataSourceOptions options) { return new MyDataSourceReader(options.asMap()); } @Override public Optional<DataSourceWriter> createWriter(String jobId, StructType schema, SaveMode mode, DataSourceOptions options) { // creates a dataSourcewriter here.. return Optional.of(dataSourcewriter); } @Override public String keyPrefix() { return "myprefix"; } } public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch { StructType schema = null; Map<String, String> options; public MyDataSourceReader(Map<String, String> options) { System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this); this.options = options; } @Override public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { //variable this.schema is null here since readSchema() was called on a different instance System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema); //more logic...... return null; } @Override public StructType readSchema() { //some logic to discover schema this.schema = (new StructType()) .add("col1", "int") .add("col2", "string"); System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema); return this.schema; } } Thanks, Shubham On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <assaf.mendel...@rsa.com> wrote: > Could you add a fuller code example? I tried to reproduce it in my > environment and I am getting just one instance of the reader… > > > > Thanks, > > Assaf > > > > *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] > *Sent:* Tuesday, October 9, 2018 9:31 AM > *To:* user@spark.apache.org > *Subject:* DataSourceV2 APIs creating multiple instances of > DataSourceReader and hence not preserving the state > > > > [EXTERNAL EMAIL] > Please report any suspicious attachments, links, or requests for sensitive > information. > > Hi All, > > > > --Spark built with *tags/v2.4.0-rc2* > > > > Consider following DataSourceReader implementation: > > > > *public class *MyDataSourceReader *implements *DataSourceReader, > SupportsScanColumnarBatch { > > StructType *schema *= *null*; > Map<String, String> *options*; > > *public *MyDataSourceReader(Map<String, String> options) { > System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: > Instantiated...." *+ *this*); > *this*.*options *= options; > } > > @Override > *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { > > *//variable this.schema is null here since readSchema() was called on a > different instance > *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ > *this *+ *" schema: " *+ *this*.*schema*); > > *//more logic...... **return null*; > } > > @Override > *public *StructType readSchema() { > > *//some logic to discover schema **this*.*schema *= (*new *StructType()) > .add(*"col1"*, *"int"*) > .add(*"col2"*, *"string"*); > System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" > schema: " *+ *this*.*schema*); > *return this*.*schema*; > } > } > > 1) First readSchema() is called on MyDataSourceReader@instance1 which sets > class variable schema. > > 2) Now when planBatchInputPartitions() is called, it is being called on a > different instance of MyDataSourceReader and hence I am not getting the value > of schema in method planBatchInputPartitions(). > > > > How can I get value of schema which was set in readSchema() method, in > planBatchInputPartitions() method? > > > > Console Logs: > > > > scala> mysource.executeQuery("select * from movie").show > > > > MyDataSourceReader.MyDataSourceReader: > Instantiated....MyDataSourceReader@59ea8f1b > > MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: > StructType(StructField(col1,IntegerType,true), > StructField(col2,StringType,true)) > > MyDataSourceReader.MyDataSourceReader: > Instantiated....MyDataSourceReader@a3cd3ff > > MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff > schema: null > > > > Thanks, > > Shubham > > > >