Re: WholeStageCodeGen + DSv2

2021-05-19 Thread Shubham Chaurasia
Hi, I remember creating one for a similar scenario in the past - https://issues.apache.org/jira/browse/SPARK-29372. Thanks, Shubham On Wed, May 19, 2021 at 5:34 PM Takeshi Yamamuro wrote: > hi, Andrew, > > Welcome any improvement proposal for that. > Could you file an issue in jira first to

Pre query execution hook for custom datasources

2020-09-18 Thread Shubham Chaurasia
Hi, In our custom datasource implementation, we want to inject some query level information. For example - scala> val df = spark.sql("some query") // uses custom datasource under the hood through Session Extensions. scala> df.count // here we want some kind of pre execution hook just before

Incorrect results in left_outer join in DSv2 implementation with filter pushdown - spark 2.3.2

2019-09-19 Thread Shubham Chaurasia
Hi, Consider the following statements: 1) > scala> val df = spark.read.format("com.shubham.MyDataSource").load > scala> df.show > +---+---+ > | i| j| > +---+---+ > | 0| 0| > | 1| -1| > | 2| -2| > | 3| -3| > | 4| -4| > +---+---+ > 2) > scala> val df1 = df.filter("i < 3") > scala> df1.show

DataSourceV2: pushFilters() is not invoked for each read call - spark 2.3.2

2019-09-06 Thread Shubham Chaurasia
Hi, I am using spark v2.3.2. I have an implementation of DSV2. Here is what is happening: 1) Obtained a dataframe using MyDataSource scala> val df1 = spark.read.format("com.shubham.MyDataSource").load > MyDataSource.MyDataSource > MyDataSource.createReader: Going to create a new

Re: Clean up method for DataSourceReader

2019-06-12 Thread Shubham Chaurasia
explored SparkListener#onJobEnd, but then how to propagate some state from DataSourceReader to SparkListener? On Wed, Jun 12, 2019 at 2:22 PM Shubham Chaurasia wrote: > Hi All, > > Is there any way to receive some event that a DataSourceReader is > finished? > I want to do some cle

Clean up method for DataSourceReader

2019-06-12 Thread Shubham Chaurasia
Hi All, Is there any way to receive some event that a DataSourceReader is finished? I want to do some clean up after all the DataReaders are finished reading and hence need some kind of cleanUp() mechanism at DataSourceReader(Driver) level. How to achieve this? For instance, in DataSourceWriter

Re: Static partitioning in partitionBy()

2019-05-08 Thread Shubham Chaurasia
nt:* Tuesday, May 7, 2019 9:35:10 AM > *To:* Shubham Chaurasia > *Cc:* dev; u...@spark.apache.org > *Subject:* Re: Static partitioning in partitionBy() > > It depends on the data source. Delta Lake (https://delta.io) allows you > to do it with the .option("replaceWhere",

Static partitioning in partitionBy()

2019-05-07 Thread Shubham Chaurasia
Hi All, Is there a way I can provide static partitions in partitionBy()? Like: df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save Above code gives following error as it tries to find column `c=c1` in df. org.apache.spark.sql.AnalysisException: Partition column `c=c1`

DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Shubham Chaurasia
Hi All, Consider the following(spark v2.4.0): Basically I change values of `spark.sql.session.timeZone` and perform an orc write. Here are 3 samples:- 1) scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata") scala> val df = sc.parallelize(Seq("2019-04-23

Re: DataSourceV2 producing wrong date value in Custom Data Writer

2019-02-06 Thread Shubham Chaurasia
uot; indicates that Spark > doesn't need to convert the values. > > Spark's internal representation for a date is the ordinal from the unix > epoch date, 1970-01-01 = 0. > > rb > > On Tue, Feb 5, 2019 at 4:46 AM Shubham Chaurasia < > shubh.chaura...@gmail.com>

DataSourceV2 producing wrong date value in Custom Data Writer

2019-02-05 Thread Shubham Chaurasia
Hi All, I am using custom DataSourceV2 implementation (*Spark version 2.3.2*) Here is how I am trying to pass in *date type *from spark shell. scala> val df = > sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype", > col("datetype").cast("date")) > scala>

Re: Behavior of checkpointLocation from options vs setting conf spark.sql.streaming.checkpointLocation

2018-12-12 Thread Shubham Chaurasia
before > restart. > > BR, > G > > > On Tue, Dec 11, 2018 at 6:45 AM Shubham Chaurasia < > shubh.chaura...@gmail.com> wrote: > >> Hi, >> >> I would like to confirm checkpointing behavior, I have observed following >> scenarios: &

Behavior of checkpointLocation from options vs setting conf spark.sql.streaming.checkpointLocation

2018-12-10 Thread Shubham Chaurasia
Hi, I would like to confirm checkpointing behavior, I have observed following scenarios: *1)* When I set checkpointLocation from streaming query like: val query = rateDF.writeStream.format("console").outputMode("append").trigger(Trigger.ProcessingTime("1 seconds")).*option("checkpointLocation",

Behavior of SaveMode.Append when table is not present

2018-11-08 Thread Shubham Chaurasia
Hi, For SaveMode.Append, the doc https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes says *When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data* However it does not