Pre query execution hook for custom datasources
Hi, In our custom datasource implementation, we want to inject some query level information. For example - scala> val df = spark.sql("some query") // uses custom datasource under the hood through Session Extensions. scala> df.count // here we want some kind of pre execution hook just before the query starts it's execution Is there a hook or some kind of callback that we can implement to achieve this? Or similar to org.apache.spark.sql.util.QueryExecutionListener which provides callbacks for onSuccess and onFailure when query finishes, we want something like "*beforeStart()*". Any ideas on how to implement this? Thanks, Shubham
Incorrect results in left_outer join in DSv2 implementation with filter pushdown - spark 2.3.2
Hi, Consider the following statements: 1) > scala> val df = spark.read.format("com.shubham.MyDataSource").load > scala> df.show > +---+---+ > | i| j| > +---+---+ > | 0| 0| > | 1| -1| > | 2| -2| > | 3| -3| > | 4| -4| > +---+---+ > 2) > scala> val df1 = df.filter("i < 3") > scala> df1.show > +---+---+ > | i| j| > +---+---+ > | 0| 0| > | 1| -1| > | 2| -2| > +---+---+ > 3) > scala> df.join(df1, Seq("i"), "left_outer").show > +---+---+---+ > | i| j| j| > +---+---+---+ > | 1| -1| -1| > | 2| -2| -2| > | 0| 0| 0| > +---+---+---+ 3) is not producing the right results for left_outer join. Here is the minimal code. --- public class MyDataSourceReader implements DataSourceReader, SupportsPushDownFilters { private Filter[] pushedFilters = new Filter[0]; private boolean hasFilters = false; public MyDataSourceReader(Map options) { System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated" + this); } @Override public StructType readSchema() { return (new StructType()) .add("i", "int") .add("j", "int"); } @Override public Filter[] pushFilters(Filter[] filters) { System.out.println("MyDataSourceReader.pushFilters: " + Arrays.toString(filters)); hasFilters = true; pushedFilters = filters; // filter's that can't be pushed down. return new Filter[0]; } @Override public Filter[] pushedFilters() { return pushedFilters; } @Override public List> createDataReaderFactories() { System.out.println("===MyDataSourceReader.createBatchDataReaderFactories==="); int ltFilter = Integer.MAX_VALUE; if (hasFilters) { ltFilter = getLTFilter("i"); } hasFilters = false; return Lists.newArrayList(new SimpleDataReaderFactory(0, 5, ltFilter)); } private int getLTFilter(String attributeName) { int filterValue = Integer.MAX_VALUE; for (Filter pushedFilter : pushedFilters) { if (pushedFilter instanceof LessThan) { LessThan lt = (LessThan) pushedFilter; if (lt.attribute().equals(attributeName)) { filterValue = (int) lt.value(); } } } return filterValue; } } public class SimpleDataReaderFactory implements DataReaderFactory { private final int start; private final int end; private int current; private final int iLTFilter; public SimpleDataReaderFactory(int start, int end, int iLTFilter) { this.start = start; this.end = end; this.iLTFilter = iLTFilter; } @Override public DataReader createDataReader() { return new SimpleDataReader(start, end, iLTFilter); } public static class SimpleDataReader implements DataReader { private final int start; private final int end; private int current; private int iLTFilter; public SimpleDataReader(int start, int end, int iLTFilter) { this.start = start; this.end = end; this.current = start - 1; this.iLTFilter = iLTFilter; } @Override public boolean next() { current++; return current < end && current < iLTFilter ; } @Override public Row get() { return new GenericRow(new Object[]{current, -current}); } @Override public void close() { } } } It seems that somehow spark is applying filter (i < 3) after left_join operation too because of which we see incorrect results in 3). However I don't see any filter node after join in plan. == Physical Plan == > *(5) Project [i#136, j#137, j#228] > +- SortMergeJoin [i#136], [i#227], LeftOuter >:- *(2) Sort [i#136 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(i#136, 200) >: +- *(1) DataSourceV2Scan [i#136, j#137], > com.shubham.reader.MyDataSourceReader@714bd7ad >+- *(4) Sort [i#227 ASC NULLS FIRST], false, 0 > +- ReusedExchange [i#227, j#228], Exchange hashpartitioning(i#136, > 200) Any ideas what might be going wrong? Thanks, Shubham
DataSourceV2: pushFilters() is not invoked for each read call - spark 2.3.2
Hi, I am using spark v2.3.2. I have an implementation of DSV2. Here is what is happening: 1) Obtained a dataframe using MyDataSource scala> val df1 = spark.read.format("com.shubham.MyDataSource").load > MyDataSource.MyDataSource > MyDataSource.createReader: Going to create a new MyDataSourceReader > MyDataSourceReader.MyDataSourceReader: > Instantiatedcom.shubham.reader.MyDataSourceReader@2b85edc7 > MyDataSourceReader.readSchema: > com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: > StructType(StructField(c1,IntegerType,true), > StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) > df1: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more field] > 2) show() on df1 > scala> df1.show > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pruneColumns: > StructType(StructField(c1,IntegerType,true), > StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) > MyDataSourceReader.readSchema: > com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: > StructType(StructField(c1,IntegerType,true), > StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > ===MyDataSourceReader.createBatchDataReaderFactories=== > prunedSchema = StructType(StructField(c1,IntegerType,true), > StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) > pushedFilters = [] > ===MyDataSourceReader.createBatchDataReaderFactories=== > +---+---+---+ > | c1| c2| c3| > +---+---+---+ > +---+---+---+ > 3) val df2 = df1.filter($"c3" > 1) > > scala> df2.show > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushedFilters: [] > MyDataSourceReader.pushFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pruneColumns: > StructType(StructField(c1,IntegerType,true), > StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) > MyDataSourceReader.readSchema: > com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: > StructType(StructField(c1,IntegerType,true), > StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > ===MyDataSourceReader.createBatchDataReaderFactories=== > prunedSchema = StructType(StructField(c1,IntegerType,true), > StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) > pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)] > ===MyDataSourceReader.createBatchDataReaderFactories=== > +---+---+---+ > | c1| c2| c3| > +---+---+---+ > +---+---+---+ 4) Again df1.show() <=== As df2 is derived from df1(and share same instance of MyDataSourceReader), this modifies pushedFilters even for df1 > scala> df1.show > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pruneColumns: > StructType(StructField(c1,IntegerType,true), > StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) > MyDataSourceReader.readSchema: > com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: > StructType(StructField(c1,IntegerType,true), > StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] > MyDataSourceReader.pushed
Re: Clean up method for DataSourceReader
FYI, I am already using QueryExecutionListener which satisfies the requirements. But that only works for dataframe APIs. If someone does df.rdd().someAction(), QueryExecutionListener is never invoked. I want something like QueryExecutionListener works in case of df.rdd().someAction() too. I explored SparkListener#onJobEnd, but then how to propagate some state from DataSourceReader to SparkListener? On Wed, Jun 12, 2019 at 2:22 PM Shubham Chaurasia wrote: > Hi All, > > Is there any way to receive some event that a DataSourceReader is > finished? > I want to do some clean up after all the DataReaders are finished reading > and hence need some kind of cleanUp() mechanism at DataSourceReader(Driver) > level. > > How to achieve this? > > For instance, in DataSourceWriter we can rely on commit() and abort() > methods to know that all the DataWriters are finished. > > Thanks, > Shubham >
Clean up method for DataSourceReader
Hi All, Is there any way to receive some event that a DataSourceReader is finished? I want to do some clean up after all the DataReaders are finished reading and hence need some kind of cleanUp() mechanism at DataSourceReader(Driver) level. How to achieve this? For instance, in DataSourceWriter we can rely on commit() and abort() methods to know that all the DataWriters are finished. Thanks, Shubham
Re: Static partitioning in partitionBy()
Thanks On Wed, May 8, 2019 at 10:36 AM Felix Cheung wrote: > You could > > df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save > > It could get some data skew problem but might work for you > > > > -- > *From:* Burak Yavuz > *Sent:* Tuesday, May 7, 2019 9:35:10 AM > *To:* Shubham Chaurasia > *Cc:* dev; user@spark.apache.org > *Subject:* Re: Static partitioning in partitionBy() > > It depends on the data source. Delta Lake (https://delta.io) allows you > to do it with the .option("replaceWhere", "c = c1"). With other file > formats, you can write directly into the partition directory > (tablePath/c=c1), but you lose atomicity. > > On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia > wrote: > >> Hi All, >> >> Is there a way I can provide static partitions in partitionBy()? >> >> Like: >> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save >> >> Above code gives following error as it tries to find column `c=c1` in df. >> >> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found >> in schema struct; >> >> Thanks, >> Shubham >> >
Static partitioning in partitionBy()
Hi All, Is there a way I can provide static partitions in partitionBy()? Like: df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save Above code gives following error as it tries to find column `c=c1` in df. org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found in schema struct; Thanks, Shubham
Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files
Writing: scala> df.write.orc("") For looking into contents, I used orc-tools-X.Y.Z-uber.jar ( https://orc.apache.org/docs/java-tools.html) On Wed, Apr 24, 2019 at 6:24 PM Wenchen Fan wrote: > How did you read/write the timestamp value from/to ORC file? > > On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia < > shubh.chaura...@gmail.com> wrote: > >> Hi All, >> >> Consider the following(spark v2.4.0): >> >> Basically I change values of `spark.sql.session.timeZone` and perform an >> orc write. Here are 3 samples:- >> >> 1) >> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata") >> >> scala> val df = sc.parallelize(Seq("2019-04-23 >> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp")) >> df: org.apache.spark.sql.DataFrame = [ts: timestamp] >> >> df.show() Output ORC File Contents >> - >> 2019-04-23 09:15:04 {"ts":"2019-04-23 09:15:04.0"} >> >> 2) >> scala> spark.conf.set("spark.sql.session.timeZone", "UTC") >> >> df.show() Output ORC File Contents >> - >> 2019-04-23 03:45:04 {"ts":"2019-04-23 09:15:04.0"} >> >> 3) >> scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") >> >> df.show() Output ORC File Contents >> - >> 2019-04-22 20:45:04 {"ts":"2019-04-23 09:15:04.0"} >> >> It can be seen that in all the three cases it stores {"ts":"2019-04-23 >> 09:15:04.0"} in orc file. I understand that orc file also contains writer >> timezone with respect to which spark is able to convert back to actual time >> when it reads orc.(and that is equal to df.show()) >> >> But it's problematic in the sense that it is not adjusting(plus/minus) >> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23 >> 09:15:04.0"} in ORC file. I mean loading data to any system other than >> spark would be a problem. >> >> Any ideas/suggestions on that? >> >> PS: For csv files, it stores exactly what we see as the output of >> df.show() >> >> Thanks, >> Shubham >> >>
DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files
Hi All, Consider the following(spark v2.4.0): Basically I change values of `spark.sql.session.timeZone` and perform an orc write. Here are 3 samples:- 1) scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata") scala> val df = sc.parallelize(Seq("2019-04-23 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp")) df: org.apache.spark.sql.DataFrame = [ts: timestamp] df.show() Output ORC File Contents - 2019-04-23 09:15:04 {"ts":"2019-04-23 09:15:04.0"} 2) scala> spark.conf.set("spark.sql.session.timeZone", "UTC") df.show() Output ORC File Contents - 2019-04-23 03:45:04 {"ts":"2019-04-23 09:15:04.0"} 3) scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") df.show() Output ORC File Contents - 2019-04-22 20:45:04 {"ts":"2019-04-23 09:15:04.0"} It can be seen that in all the three cases it stores {"ts":"2019-04-23 09:15:04.0"} in orc file. I understand that orc file also contains writer timezone with respect to which spark is able to convert back to actual time when it reads orc.(and that is equal to df.show()) But it's problematic in the sense that it is not adjusting(plus/minus) timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23 09:15:04.0"} in ORC file. I mean loading data to any system other than spark would be a problem. Any ideas/suggestions on that? PS: For csv files, it stores exactly what we see as the output of df.show() Thanks, Shubham
Re: DataSourceV2 producing wrong date value in Custom Data Writer
Thanks Ryan On Tue, Feb 5, 2019 at 10:28 PM Ryan Blue wrote: > Shubham, > > DataSourceV2 passes Spark's internal representation to your source and > expects Spark's internal representation back from the source. That's why > you consume and produce InternalRow: "internal" indicates that Spark > doesn't need to convert the values. > > Spark's internal representation for a date is the ordinal from the unix > epoch date, 1970-01-01 = 0. > > rb > > On Tue, Feb 5, 2019 at 4:46 AM Shubham Chaurasia < > shubh.chaura...@gmail.com> wrote: > >> Hi All, >> >> I am using custom DataSourceV2 implementation (*Spark version 2.3.2*) >> >> Here is how I am trying to pass in *date type *from spark shell. >> >> scala> val df = >>> sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype", >>> col("datetype").cast("date")) >>> scala> df.write.format("com.shubham.MyDataSource").save >> >> >> Below is the minimal write() method of my DataWriter implementation. >> >> @Override >> public void write(InternalRow record) throws IOException { >> ByteArrayOutputStream format = streamingRecordFormatter.format(record); >> System.out.println("MyDataWriter.write: " + record.get(0, >> DataTypes.DateType)); >> >> } >> >> It prints an integer as output: >> >> MyDataWriter.write: 17039 >> >> >> Is this a bug? or I am doing something wrong? >> >> Thanks, >> Shubham >> > > > -- > Ryan Blue > Software Engineer > Netflix >
DataSourceV2 producing wrong date value in Custom Data Writer
Hi All, I am using custom DataSourceV2 implementation (*Spark version 2.3.2*) Here is how I am trying to pass in *date type *from spark shell. scala> val df = > sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype", > col("datetype").cast("date")) > scala> df.write.format("com.shubham.MyDataSource").save Below is the minimal write() method of my DataWriter implementation. @Override public void write(InternalRow record) throws IOException { ByteArrayOutputStream format = streamingRecordFormatter.format(record); System.out.println("MyDataWriter.write: " + record.get(0, DataTypes.DateType)); } It prints an integer as output: MyDataWriter.write: 17039 Is this a bug? or I am doing something wrong? Thanks, Shubham
Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state
Alright, so it is a big project which uses a SQL store underneath. I extracted out the minimal code and made a smaller project out of it and still it is creating multiple instances. Here is my project: ├── my-datasource.iml ├── pom.xml ├── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └── shubham │ │ │ ├── MyDataSource.java │ │ │ └── reader │ │ │ └── MyDataSourceReader.java MyDataSource.java - package com.shubham; import com.shubham.reader.MyDataSourceReader; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.ReadSupport; import org.apache.spark.sql.sources.v2.WriteSupport; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.types.StructType; import java.util.Optional; public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport { public DataSourceReader createReader(DataSourceOptions options) { System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader"); return new MyDataSourceReader(options.asMap()); } public Optional createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) { return Optional.empty(); } } MyDataSourceReader.java - package com.shubham.reader; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.reader.InputPartition; import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnarBatch; import java.util.ArrayList; import java.util.List; import java.util.Map; public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch { private Map options; private StructType schema; public MyDataSourceReader(Map options) { System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated" + this); this.options = options; } @Override public StructType readSchema() { this.schema = (new StructType()) .add("col1", "int") .add("col2", "string"); System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema); return this.schema; } @Override public List> planBatchInputPartitions() { System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema); return new ArrayList<>(); } } spark-shell output scala> spark.read.format("com.shubham.MyDataSource").option("query", "select * from some_table").load.show MyDataSource.createReader: Going to create a new MyDataSourceReader MyDataSourceReader.MyDataSourceReader: Instantiatedcom.shubham.reader.MyDataSourceReader@69fa5536 MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@69fa5536 schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true)) MyDataSource.createReader: Going to create a new MyDataSourceReader MyDataSourceReader.MyDataSourceReader: Instantiatedcom.shubham.reader.MyDataSourceReader@3095c449 MyDataSourceReader.planBatchInputPartitions: com.shubham.reader.MyDataSourceReader@3095c449 schema: null +++ |col1|col2| +++ +++ Here 2 instances of reader, MyDataSourceReader@69fa5536 and MyDataSourceReader@3095c449 are being created. Consequently schema is null in MyDataSourceReader@3095c449. Am I not doing it the correct way? Thanks, Shubham On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf wrote: > I am using v2.4.0-RC2 > > > > The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). > How are you calling it? > > > > When I do: > > Val df = spark.read.format(mypackage).load().show() > > I am getting a single creation, how are you creating the reader? > > > > Thanks, > > Assaf > > > > *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] > *Sent:* Tuesday, October 9, 2018 2:02 PM > *To:* Mendelson, Assaf; user@spark.apache.org > *Subject:* Re: DataSourceV2 APIs creating multiple instances of > DataSourceReader and hence not preserving the state > > > > [EXTERNAL EMAIL] > Please report any suspicious attachments, links, or requests for sensitive > information. > > Thanks Assaf, you tried with *tags/v2.4.0-rc2?* > > > > Full Code: > > > > MyDataSource is the entry point w
Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state
Thanks Assaf, you tried with *tags/v2.4.0-rc2?* Full Code: MyDataSource is the entry point which simply creates Reader and Writer public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, SessionConfigSupport { @Override public DataSourceReader createReader(DataSourceOptions options) { return new MyDataSourceReader(options.asMap()); } @Override public Optional createWriter(String jobId, StructType schema, SaveMode mode, DataSourceOptions options) { // creates a dataSourcewriter here.. return Optional.of(dataSourcewriter); } @Override public String keyPrefix() { return "myprefix"; } } public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch { StructType schema = null; Map options; public MyDataSourceReader(Map options) { System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated" + this); this.options = options; } @Override public List> planBatchInputPartitions() { //variable this.schema is null here since readSchema() was called on a different instance System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema); //more logic.. return null; } @Override public StructType readSchema() { //some logic to discover schema this.schema = (new StructType()) .add("col1", "int") .add("col2", "string"); System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema); return this.schema; } } Thanks, Shubham On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf wrote: > Could you add a fuller code example? I tried to reproduce it in my > environment and I am getting just one instance of the reader… > > > > Thanks, > > Assaf > > > > *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] > *Sent:* Tuesday, October 9, 2018 9:31 AM > *To:* user@spark.apache.org > *Subject:* DataSourceV2 APIs creating multiple instances of > DataSourceReader and hence not preserving the state > > > > [EXTERNAL EMAIL] > Please report any suspicious attachments, links, or requests for sensitive > information. > > Hi All, > > > > --Spark built with *tags/v2.4.0-rc2* > > > > Consider following DataSourceReader implementation: > > > > *public class *MyDataSourceReader *implements *DataSourceReader, > SupportsScanColumnarBatch { > > StructType *schema *= *null*; > Map *options*; > > *public *MyDataSourceReader(Map options) { > System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: > Instantiated" *+ *this*); > *this*.*options *= options; > } > > @Override > *public *List> planBatchInputPartitions() { > > *//variable this.schema is null here since readSchema() was called on a > different instance > *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ > *this *+ *" schema: " *+ *this*.*schema*); > > *//more logic..**return null*; > } > > @Override > *public *StructType readSchema() { > > *//some logic to discover schema**this*.*schema *= (*new *StructType()) > .add(*"col1"*, *"int"*) > .add(*"col2"*, *"string"*); > System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" > schema: " *+ *this*.*schema*); > *return this*.*schema*; > } > } > > 1) First readSchema() is called on MyDataSourceReader@instance1 which sets > class variable schema. > > 2) Now when planBatchInputPartitions() is called, it is being called on a > different instance of MyDataSourceReader and hence I am not getting the value > of schema in method planBatchInputPartitions(). > > > > How can I get value of schema which was set in readSchema() method, in > planBatchInputPartitions() method? > > > > Console Logs: > > > > scala> mysource.executeQuery("select * from movie").show > > > > MyDataSourceReader.MyDataSourceReader: > InstantiatedMyDataSourceReader@59ea8f1b > > MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: > StructType(StructField(col1,IntegerType,true), > StructField(col2,StringType,true)) > > MyDataSourceReader.MyDataSourceReader: > InstantiatedMyDataSourceReader@a3cd3ff > > MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff > schema: null > > > > Thanks, > > Shubham > > > >
DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state
Hi All, --Spark built with *tags/v2.4.0-rc2* Consider following DataSourceReader implementation: public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch { StructType schema = null; Map options; public MyDataSourceReader(Map options) { System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated" + this); this.options = options; } @Override public List> planBatchInputPartitions() { //variable this.schema is null here since readSchema() was called on a different instance System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema); //more logic.. return null; } @Override public StructType readSchema() { //some logic to discover schema this.schema = (new StructType()) .add("col1", "int") .add("col2", "string"); System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema); return this.schema; } } 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema. 2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions(). How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method? Console Logs: scala> mysource.executeQuery("select * from movie").show MyDataSourceReader.MyDataSourceReader: InstantiatedMyDataSourceReader@59ea8f1b MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true)) MyDataSourceReader.MyDataSourceReader: InstantiatedMyDataSourceReader@a3cd3ff MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null Thanks, Shubham
Target java version not set when building spark with tags/v2.4.0-rc2
Hi All, I built spark with *tags/v2.4.0-rc2* using ./build/mvn -DskipTests -Phadoop-2.7 -Dhadoop.version=3.1.0 clean install Now from spark-shell when ever I call any static method residing in an interface, it shows me error like : :28: error: Static methods in interface require -target:jvm-1.8 However spark shell is getting correct java version, I verified like: scala> java.lang.System.getProperty("java.version") res1: String = 1.8.0_181 This was not the case when I was building with tags/v2.3.2 Have I missed something? Thanks, Shubham