Generally please avoid System.out.println, but use a logger -even for examples. People may take these examples from here and put it in their production code.
> Am 09.10.2018 um 15:39 schrieb Shubham Chaurasia <shubh.chaura...@gmail.com>: > > Alright, so it is a big project which uses a SQL store underneath. > I extracted out the minimal code and made a smaller project out of it and > still it is creating multiple instances. > > Here is my project: > > ├── my-datasource.iml > ├── pom.xml > ├── src > │ ├── main > │ │ ├── java > │ │ │ └── com > │ │ │ └── shubham > │ │ │ ├── MyDataSource.java > │ │ │ └── reader > │ │ │ └── MyDataSourceReader.java > > > MyDataSource.java > ------------------------------------------------- > package com.shubham; > > import com.shubham.reader.MyDataSourceReader; > import org.apache.spark.sql.SaveMode; > import org.apache.spark.sql.sources.v2.DataSourceOptions; > import org.apache.spark.sql.sources.v2.DataSourceV2; > import org.apache.spark.sql.sources.v2.ReadSupport; > import org.apache.spark.sql.sources.v2.WriteSupport; > import org.apache.spark.sql.sources.v2.reader.DataSourceReader; > import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; > import org.apache.spark.sql.types.StructType; > > import java.util.Optional; > > public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport { > > public DataSourceReader createReader(DataSourceOptions options) { > System.out.println("MyDataSource.createReader: Going to create a new > MyDataSourceReader"); > return new MyDataSourceReader(options.asMap()); > } > > public Optional<DataSourceWriter> createWriter(String writeUUID, StructType > schema, SaveMode mode, DataSourceOptions options) { > return Optional.empty(); > } > } > > MyDataSourceReader.java > ------------------------------------------------- > package com.shubham.reader; > > import org.apache.spark.sql.sources.v2.reader.DataSourceReader; > import org.apache.spark.sql.sources.v2.reader.InputPartition; > import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.sql.vectorized.ColumnarBatch; > > import java.util.ArrayList; > import java.util.List; > import java.util.Map; > > public class MyDataSourceReader implements DataSourceReader, > SupportsScanColumnarBatch { > > private Map<String, String> options; > private StructType schema; > > public MyDataSourceReader(Map<String, String> options) { > System.out.println("MyDataSourceReader.MyDataSourceReader: > Instantiated...." + this); > this.options = options; > } > > @Override > public StructType readSchema() { > this.schema = (new StructType()) > .add("col1", "int") > .add("col2", "string"); > System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " > + this.schema); > return this.schema; > } > > @Override > public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { > System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this > + " schema: " + this.schema); > return new ArrayList<>(); > } > } > > ---------------------------------------- > spark-shell output > ---------------------------------------- > scala> spark.read.format("com.shubham.MyDataSource").option("query", "select > * from some_table").load.show > > MyDataSource.createReader: Going to create a new MyDataSourceReader > MyDataSourceReader.MyDataSourceReader: > Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536 > MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@69fa5536 > schema: StructType(StructField(col1,IntegerType,true), > StructField(col2,StringType,true)) > MyDataSource.createReader: Going to create a new MyDataSourceReader > MyDataSourceReader.MyDataSourceReader: > Instantiated....com.shubham.reader.MyDataSourceReader@3095c449 > MyDataSourceReader.planBatchInputPartitions: > com.shubham.reader.MyDataSourceReader@3095c449 schema: null > +----+----+ > |col1|col2| > +----+----+ > +----+----+ > > > Here 2 instances of reader, MyDataSourceReader@69fa5536 and > MyDataSourceReader@3095c449 are being created. Consequently schema is null in > MyDataSourceReader@3095c449. > > Am I not doing it the correct way? > > Thanks, > Shubham > >> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <assaf.mendel...@rsa.com> >> wrote: >> I am using v2.4.0-RC2 >> >> >> >> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). >> How are you calling it? >> >> >> >> When I do: >> >> Val df = spark.read.format(mypackage).load().show() >> >> I am getting a single creation, how are you creating the reader? >> >> >> >> Thanks, >> >> Assaf >> >> >> >> From: Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] >> Sent: Tuesday, October 9, 2018 2:02 PM >> To: Mendelson, Assaf; user@spark.apache.org >> Subject: Re: DataSourceV2 APIs creating multiple instances of >> DataSourceReader and hence not preserving the state >> >> >> >> [EXTERNAL EMAIL] >> Please report any suspicious attachments, links, or requests for sensitive >> information. >> >> Thanks Assaf, you tried with tags/v2.4.0-rc2? >> >> >> >> Full Code: >> >> >> >> MyDataSource is the entry point which simply creates Reader and Writer >> >> >> >> public class MyDataSource implements DataSourceV2, WriteSupport, >> ReadSupport, SessionConfigSupport { >> >> >> >> @Override public DataSourceReader createReader(DataSourceOptions options) { >> >> return new MyDataSourceReader(options.asMap()); >> >> } >> >> >> >> @Override >> >> public Optional<DataSourceWriter> createWriter(String jobId, StructType >> schema, >> >> SaveMode mode, DataSourceOptions options) { >> >> // creates a dataSourcewriter here.. >> >> return Optional.of(dataSourcewriter); >> >> } >> >> >> >> @Override public String keyPrefix() { >> >> return "myprefix"; >> >> } >> >> >> >> } >> >> >> >> public class MyDataSourceReader implements DataSourceReader, >> SupportsScanColumnarBatch { >> >> >> >> StructType schema = null; >> >> Map<String, String> options; >> >> >> >> public MyDataSourceReader(Map<String, String> options) { >> >> System.out.println("MyDataSourceReader.MyDataSourceReader: >> Instantiated...." + this); >> >> this.options = options; >> >> } >> >> >> >> @Override >> >> public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { >> >> //variable this.schema is null here since readSchema() was called on a >> different instance >> >> System.out.println("MyDataSourceReader.planBatchInputPartitions: " + >> this + " schema: " + this.schema); >> >> //more logic...... >> >> return null; >> >> } >> >> >> >> @Override >> >> public StructType readSchema() { >> >> //some logic to discover schema >> >> this.schema = (new StructType()) >> >> .add("col1", "int") >> >> .add("col2", "string"); >> >> System.out.println("MyDataSourceReader.readSchema: " + this + " schema: >> " + this.schema); >> >> return this.schema; >> >> } >> >> } >> >> >> >> Thanks, >> >> Shubham >> >> >> >> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <assaf.mendel...@rsa.com> >> wrote: >> >> Could you add a fuller code example? I tried to reproduce it in my >> environment and I am getting just one instance of the reader… >> >> >> >> Thanks, >> >> Assaf >> >> >> >> From: Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] >> Sent: Tuesday, October 9, 2018 9:31 AM >> To: user@spark.apache.org >> Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader >> and hence not preserving the state >> >> >> >> [EXTERNAL EMAIL] >> Please report any suspicious attachments, links, or requests for sensitive >> information. >> >> Hi All, >> >> >> >> --Spark built with tags/v2.4.0-rc2 >> >> >> >> Consider following DataSourceReader implementation: >> >> >> >> public class MyDataSourceReader implements DataSourceReader, >> SupportsScanColumnarBatch { >> >> StructType schema = null; >> Map<String, String> options; >> >> public MyDataSourceReader(Map<String, String> options) { >> System.out.println("MyDataSourceReader.MyDataSourceReader: >> Instantiated...." + this); >> this.options = options; >> } >> >> @Override >> public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { >> //variable this.schema is null here since readSchema() was called on a >> different instance >> System.out.println("MyDataSourceReader.planBatchInputPartitions: " + >> this + " schema: " + this.schema); >> //more logic...... >> return null; >> } >> >> @Override >> public StructType readSchema() { >> //some logic to discover schema >> this.schema = (new StructType()) >> .add("col1", "int") >> .add("col2", "string"); >> System.out.println("MyDataSourceReader.readSchema: " + this + " schema: >> " + this.schema); >> return this.schema; >> } >> } >> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets >> class variable schema. >> 2) Now when planBatchInputPartitions() is called, it is being called on a >> different instance of MyDataSourceReader and hence I am not getting the >> value of schema in method planBatchInputPartitions(). >> >> How can I get value of schema which was set in readSchema() method, in >> planBatchInputPartitions() method? >> >> Console Logs: >> >> scala> mysource.executeQuery("select * from movie").show >> >> MyDataSourceReader.MyDataSourceReader: >> Instantiated....MyDataSourceReader@59ea8f1b >> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: >> StructType(StructField(col1,IntegerType,true), >> StructField(col2,StringType,true)) >> MyDataSourceReader.MyDataSourceReader: >> Instantiated....MyDataSourceReader@a3cd3ff >> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff >> schema: null >> >> >> Thanks, >> >> Shubham >> >>