Pre query execution hook for custom datasources

2020-09-18 Thread Shubham Chaurasia
Hi,

In our custom datasource implementation, we want to inject some query level
information.

For example -

scala> val df = spark.sql("some query")   // uses custom datasource under
the hood through Session Extensions.

scala> df.count //  here we want some kind of pre execution hook just
before the query starts it's execution

Is there a hook or some kind of callback that we can implement to achieve
this?

Or similar to org.apache.spark.sql.util.QueryExecutionListener which
provides callbacks for onSuccess and onFailure when query finishes, we want
something like "*beforeStart()*".

Any ideas on how to implement this?

Thanks,
Shubham


Incorrect results in left_outer join in DSv2 implementation with filter pushdown - spark 2.3.2

2019-09-19 Thread Shubham Chaurasia
Hi,

Consider the following statements:

1)
> scala> val df = spark.read.format("com.shubham.MyDataSource").load
> scala> df.show
> +---+---+
> |  i|  j|
> +---+---+
> |  0|  0|
> |  1| -1|
> |  2| -2|
> |  3| -3|
> |  4| -4|
> +---+---+
> 2)
> scala> val df1 = df.filter("i < 3")
> scala> df1.show
> +---+---+
> |  i|  j|
> +---+---+
> |  0|  0|
> |  1| -1|
> |  2| -2|
> +---+---+
> 3)
> scala> df.join(df1, Seq("i"), "left_outer").show
> +---+---+---+
> |  i|  j|  j|
> +---+---+---+
> |  1| -1| -1|
> |  2| -2| -2|
> |  0|  0|  0|
> +---+---+---+


3) is not producing the right results for left_outer join.

Here is the minimal code.

---

public class MyDataSourceReader implements DataSourceReader,
SupportsPushDownFilters {

  private Filter[] pushedFilters = new Filter[0];
  private boolean hasFilters = false;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
  }

  @Override
  public StructType readSchema() {
return (new StructType())
.add("i", "int")
.add("j", "int");
  }

  @Override
  public Filter[] pushFilters(Filter[] filters) {
System.out.println("MyDataSourceReader.pushFilters: " +
Arrays.toString(filters));
hasFilters = true;
pushedFilters = filters;
// filter's that can't be pushed down.
return new Filter[0];
  }

  @Override
  public Filter[] pushedFilters() {
return pushedFilters;
  }

  @Override
  public List> createDataReaderFactories() {

System.out.println("===MyDataSourceReader.createBatchDataReaderFactories===");
int ltFilter = Integer.MAX_VALUE;
if (hasFilters) {
  ltFilter = getLTFilter("i");
}
hasFilters = false;
return Lists.newArrayList(new SimpleDataReaderFactory(0, 5, ltFilter));
  }

  private int getLTFilter(String attributeName) {
int filterValue = Integer.MAX_VALUE;
for (Filter pushedFilter : pushedFilters) {
  if (pushedFilter instanceof LessThan) {
LessThan lt = (LessThan) pushedFilter;
if (lt.attribute().equals(attributeName)) {
  filterValue = (int) lt.value();
}
  }
}
return filterValue;
  }

}



public class SimpleDataReaderFactory implements DataReaderFactory {

  private final int start;
  private final int end;
  private int current;
  private final int iLTFilter;

  public SimpleDataReaderFactory(int start, int end, int iLTFilter) {
this.start = start;
this.end = end;
this.iLTFilter = iLTFilter;
  }

  @Override
  public DataReader createDataReader() {
return new SimpleDataReader(start, end, iLTFilter);
  }

  public static class SimpleDataReader implements DataReader {
private final int start;
private final int end;
private int current;
private int iLTFilter;

public SimpleDataReader(int start, int end, int iLTFilter) {
  this.start = start;
  this.end = end;
  this.current = start - 1;
  this.iLTFilter = iLTFilter;
}
@Override
public boolean next() {
  current++;
  return current < end && current < iLTFilter ;
}
@Override
public Row get() {
  return new GenericRow(new Object[]{current, -current});
}
@Override
public void close() {
}
  }
}



It seems that somehow spark is applying filter (i < 3) after left_join
operation too because of which we see incorrect results in 3).
However I don't see any filter node after join in plan.

== Physical Plan ==
> *(5) Project [i#136, j#137, j#228]
> +- SortMergeJoin [i#136], [i#227], LeftOuter
>:- *(2) Sort [i#136 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(i#136, 200)
>: +- *(1) DataSourceV2Scan [i#136, j#137],
> com.shubham.reader.MyDataSourceReader@714bd7ad
>+- *(4) Sort [i#227 ASC NULLS FIRST], false, 0
>   +- ReusedExchange [i#227, j#228], Exchange hashpartitioning(i#136,
> 200)


Any ideas what might be going wrong?

Thanks,
Shubham


DataSourceV2: pushFilters() is not invoked for each read call - spark 2.3.2

2019-09-06 Thread Shubham Chaurasia
Field(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> ===MyDataSourceReader.createBatchDataReaderFactories===
> prunedSchema = StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> *pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]*
> ===MyDataSourceReader.createBatchDataReaderFactories===
> +---+---+---+
> | c1| c2| c3|
> +---+---+---+
> +---+---+---+
>

*pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)] *is not correct in step
4) as there were no filters specified for df1.

This is due to I am maintaining pushedFilters variable in MyDataSourceReader
which is modified by df2.filter().show.

Questions:
Q1: How to maintain this state in DataSourceReader implementations?
Q2: Shouldn't spark call pushFilters() method every time(regardless of
.filter() is present or not) we invoke some action, in the similar manner
as it calls pruneColumns()?

I understand that pushFilters() is only invoked when .filter() is there in
dataframe but as we saw in above scenario, it's making the state of
MyDataSourceReader inconsistent and hence the question Q2.

Minimal Code:

public class MyDataSource implements DataSourceRegister, DataSourceV2,
ReadSupport, WriteSupport {
>
>   public MyDataSource() {
> System.out.println("MyDataSource.MyDataSource");
>   }
>
>   @Override
>   public DataSourceReader createReader(DataSourceOptions options) {
> System.out.println("MyDataSource.createReader: Going to create a new 
> MyDataSourceReader");
> return new MyDataSourceReader(options.asMap());
>   }
>
>   @Override
>   public Optional createWriter(String writeUUID, StructType 
> schema, SaveMode mode, DataSourceOptions options) {
> System.out.println("MyDataSource.createWriter: Going to create a new 
> MyDataSourceWriter");
> return Optional.of(new MyDataSourceWriter(schema));
>   }
>
>   @Override
>   public String shortName() {
> return "com.shubham.MyDataSource";
>   }
> }
>
>
public class MyDataSourceReader implements DataSourceReader,
SupportsPushDownRequiredColumns, SupportsScanColumnarBatch,
SupportsPushDownFilters {
>
>   private Map options;
>   private StructType baseSchema;
>   private StructType prunedSchema;
>   private Filter[] pushedFilters = new Filter[0];
>
>   public MyDataSourceReader(Map options) {
> System.out.println("MyDataSourceReader.MyDataSourceReader: 
> Instantiated" + this);
> this.options = options;
>   }
>
>   @Override
>   public StructType readSchema() {
> this.baseSchema = (new StructType())
> .add("c1", "int")
> .add("c2", "int")
> .add("c3", "int");
> System.out.println("MyDataSourceReader.readSchema: " + this + " 
> baseSchema: " + this.baseSchema);
> return this.baseSchema;
>   }
>
>
>   @Override
>   public Filter[] pushFilters(Filter[] filters) {
> System.out.println("MyDataSourceReader.pushFilters: " + 
> Arrays.toString(filters));
> // filters that can be pushed down.
> // for this example, let's assume all the filters can be pushed down.
> this.pushedFilters = filters;
>
> // filter's that can't be pushed down.
> return new Filter[0];
>   }
>
>   @Override
>   public Filter[] pushedFilters() {
> //System.out.println("MyDataSourceReader.pushedFilters: " + 
> Arrays.toString(pushedFilters));
> return this.pushedFilters;
>   }
>
>   @Override
>   public void pruneColumns(StructType requiredSchema) {
> System.out.println("MyDataSourceReader.pruneColumns: " + requiredSchema);
> this.prunedSchema = requiredSchema;
>   }
>
>   @Override
>   public List> 
> createBatchDataReaderFactories() {
> 
> System.out.println("===MyDataSourceReader.createBatchDataReaderFactories===");
> // do the actual operation with baseSchema, prunedSchema, pushedFilters
>
> System.out.println("prunedSchema = " + prunedSchema);
> System.out.println("pushedFilters = " + Arrays.toString(pushedFilters));
>
> 
> System.out.println("===MyDataSourceReader.createBatchDataReaderFactories===");
>
> return new ArrayList<>();
>   }
>
> }
>
>
Thanks,
Shubham


Re: Clean up method for DataSourceReader

2019-06-12 Thread Shubham Chaurasia
FYI, I am already using QueryExecutionListener which satisfies the
requirements.

But that only works for dataframe APIs. If someone does
df.rdd().someAction(), QueryExecutionListener is never invoked. I want
something like QueryExecutionListener works in case of
df.rdd().someAction() too.
I explored SparkListener#onJobEnd, but then how to propagate some state
from DataSourceReader to SparkListener?

On Wed, Jun 12, 2019 at 2:22 PM Shubham Chaurasia 
wrote:

> Hi All,
>
> Is there any way to receive some event that a DataSourceReader is
> finished?
> I want to do some clean up after all the DataReaders are finished reading
> and hence need some kind of cleanUp() mechanism at DataSourceReader(Driver)
> level.
>
> How to achieve this?
>
> For instance, in DataSourceWriter we can rely on commit() and abort()
> methods to know that all the DataWriters are finished.
>
> Thanks,
> Shubham
>


Clean up method for DataSourceReader

2019-06-12 Thread Shubham Chaurasia
Hi All,

Is there any way to receive some event that a DataSourceReader is finished?
I want to do some clean up after all the DataReaders are finished reading
and hence need some kind of cleanUp() mechanism at DataSourceReader(Driver)
level.

How to achieve this?

For instance, in DataSourceWriter we can rely on commit() and abort()
methods to know that all the DataWriters are finished.

Thanks,
Shubham


Re: Static partitioning in partitionBy()

2019-05-08 Thread Shubham Chaurasia
Thanks

On Wed, May 8, 2019 at 10:36 AM Felix Cheung 
wrote:

> You could
>
> df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save
>
> It could get some data skew problem but might work for you
>
>
>
> --
> *From:* Burak Yavuz 
> *Sent:* Tuesday, May 7, 2019 9:35:10 AM
> *To:* Shubham Chaurasia
> *Cc:* dev; user@spark.apache.org
> *Subject:* Re: Static partitioning in partitionBy()
>
> It depends on the data source. Delta Lake (https://delta.io) allows you
> to do it with the .option("replaceWhere", "c = c1"). With other file
> formats, you can write directly into the partition directory
> (tablePath/c=c1), but you lose atomicity.
>
> On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
> wrote:
>
>> Hi All,
>>
>> Is there a way I can provide static partitions in partitionBy()?
>>
>> Like:
>> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>>
>> Above code gives following error as it tries to find column `c=c1` in df.
>>
>> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
>> in schema struct;
>>
>> Thanks,
>> Shubham
>>
>


Static partitioning in partitionBy()

2019-05-07 Thread Shubham Chaurasia
Hi All,

Is there a way I can provide static partitions in partitionBy()?

Like:
df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save

Above code gives following error as it tries to find column `c=c1` in df.

org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
in schema struct;

Thanks,
Shubham


Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Shubham Chaurasia
Writing:
scala> df.write.orc("")

For looking into contents, I used orc-tools-X.Y.Z-uber.jar (
https://orc.apache.org/docs/java-tools.html)

On Wed, Apr 24, 2019 at 6:24 PM Wenchen Fan  wrote:

> How did you read/write the timestamp value from/to ORC file?
>
> On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia <
> shubh.chaura...@gmail.com> wrote:
>
>> Hi All,
>>
>> Consider the following(spark v2.4.0):
>>
>> Basically I change values of `spark.sql.session.timeZone` and perform an
>> orc write. Here are 3 samples:-
>>
>> 1)
>> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
>>
>> scala> val df = sc.parallelize(Seq("2019-04-23
>> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
>> df: org.apache.spark.sql.DataFrame = [ts: timestamp]
>>
>> df.show() Output  ORC File Contents
>> -
>> 2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}
>>
>> 2)
>> scala> spark.conf.set("spark.sql.session.timeZone", "UTC")
>>
>> df.show() Output  ORC File Contents
>> -
>> 2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>
>> 3)
>> scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>
>> df.show() Output  ORC File Contents
>> -
>> 2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>
>> It can be seen that in all the three cases it stores {"ts":"2019-04-23
>> 09:15:04.0"} in orc file. I understand that orc file also contains writer
>> timezone with respect to which spark is able to convert back to actual time
>> when it reads orc.(and that is equal to df.show())
>>
>> But it's problematic in the sense that it is not adjusting(plus/minus)
>> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
>> 09:15:04.0"} in ORC file. I mean loading data to any system other than
>> spark would be a problem.
>>
>> Any ideas/suggestions on that?
>>
>> PS: For csv files, it stores exactly what we see as the output of
>> df.show()
>>
>> Thanks,
>> Shubham
>>
>>


DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Shubham Chaurasia
Hi All,

Consider the following(spark v2.4.0):

Basically I change values of `spark.sql.session.timeZone` and perform an
orc write. Here are 3 samples:-

1)
scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")

scala> val df = sc.parallelize(Seq("2019-04-23
09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

df.show() Output  ORC File Contents
-
2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}

2)
scala> spark.conf.set("spark.sql.session.timeZone", "UTC")

df.show() Output  ORC File Contents
-
2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}

3)
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")

df.show() Output  ORC File Contents
-
2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}

It can be seen that in all the three cases it stores {"ts":"2019-04-23
09:15:04.0"} in orc file. I understand that orc file also contains writer
timezone with respect to which spark is able to convert back to actual time
when it reads orc.(and that is equal to df.show())

But it's problematic in the sense that it is not adjusting(plus/minus)
timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
09:15:04.0"} in ORC file. I mean loading data to any system other than
spark would be a problem.

Any ideas/suggestions on that?

PS: For csv files, it stores exactly what we see as the output of df.show()

Thanks,
Shubham


Re: DataSourceV2 producing wrong date value in Custom Data Writer

2019-02-06 Thread Shubham Chaurasia
Thanks Ryan

On Tue, Feb 5, 2019 at 10:28 PM Ryan Blue  wrote:

> Shubham,
>
> DataSourceV2 passes Spark's internal representation to your source and
> expects Spark's internal representation back from the source. That's why
> you consume and produce InternalRow: "internal" indicates that Spark
> doesn't need to convert the values.
>
> Spark's internal representation for a date is the ordinal from the unix
> epoch date, 1970-01-01 = 0.
>
> rb
>
> On Tue, Feb 5, 2019 at 4:46 AM Shubham Chaurasia <
> shubh.chaura...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am using custom DataSourceV2 implementation (*Spark version 2.3.2*)
>>
>> Here is how I am trying to pass in *date type *from spark shell.
>>
>> scala> val df =
>>> sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype",
>>> col("datetype").cast("date"))
>>> scala> df.write.format("com.shubham.MyDataSource").save
>>
>>
>> Below is the minimal write() method of my DataWriter implementation.
>>
>> @Override
>> public void write(InternalRow record) throws IOException {
>>   ByteArrayOutputStream format = streamingRecordFormatter.format(record);
>>   System.out.println("MyDataWriter.write: " + record.get(0, 
>> DataTypes.DateType));
>>
>> }
>>
>> It prints an integer as output:
>>
>> MyDataWriter.write: 17039
>>
>>
>> Is this a bug?  or I am doing something wrong?
>>
>> Thanks,
>> Shubham
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


DataSourceV2 producing wrong date value in Custom Data Writer

2019-02-05 Thread Shubham Chaurasia
Hi All,

I am using custom DataSourceV2 implementation (*Spark version 2.3.2*)

Here is how I am trying to pass in *date type *from spark shell.

scala> val df =
> sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype",
> col("datetype").cast("date"))
> scala> df.write.format("com.shubham.MyDataSource").save


Below is the minimal write() method of my DataWriter implementation.

@Override
public void write(InternalRow record) throws IOException {
  ByteArrayOutputStream format = streamingRecordFormatter.format(record);
  System.out.println("MyDataWriter.write: " + record.get(0,
DataTypes.DateType));

}

It prints an integer as output:

MyDataWriter.write: 17039


Is this a bug?  or I am doing something wrong?

Thanks,
Shubham


Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Shubham Chaurasia
Alright, so it is a big project which uses a SQL store underneath.
I extracted out the minimal code and made a smaller project out of it and
still it is creating multiple instances.

Here is my project:

├── my-datasource.iml
├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │   └── shubham
│   │   │   ├── MyDataSource.java
│   │   │   └── reader
│   │   │   └── MyDataSourceReader.java


MyDataSource.java
-

package com.shubham;

import com.shubham.reader.MyDataSourceReader;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;

import java.util.Optional;

public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {

  public DataSourceReader createReader(DataSourceOptions options) {
System.out.println("MyDataSource.createReader: Going to create a
new MyDataSourceReader");
return new MyDataSourceReader(options.asMap());
  }

  public Optional createWriter(String writeUUID,
StructType schema, SaveMode mode, DataSourceOptions options) {
return Optional.empty();
  }
}


MyDataSourceReader.java
-

package com.shubham.reader;

import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  private Map options;
  private StructType schema;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
this.options = options;
  }

  @Override
  public StructType readSchema() {
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + "
schema: " + this.schema);
return this.schema;
  }

  @Override
  public List> planBatchInputPartitions() {
System.out.println("MyDataSourceReader.planBatchInputPartitions: "
+ this + " schema: " + this.schema);
return new ArrayList<>();
  }
}



spark-shell output

scala> spark.read.format("com.shubham.MyDataSource").option("query",
"select * from some_table").load.show

MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiatedcom.shubham.reader.MyDataSourceReader@69fa5536
MyDataSourceReader.readSchema:
com.shubham.reader.MyDataSourceReader@69fa5536 schema:
StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiatedcom.shubham.reader.MyDataSourceReader@3095c449
MyDataSourceReader.planBatchInputPartitions:
com.shubham.reader.MyDataSourceReader@3095c449 schema: null
+++
|col1|col2|
+++
+++


Here 2 instances of reader, MyDataSourceReader@69fa5536 and
MyDataSourceReader@3095c449 are being created. Consequently schema is null
in MyDataSourceReader@3095c449.

Am I not doing it the correct way?

Thanks,
Shubham

On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf 
wrote:

> I am using v2.4.0-RC2
>
>
>
> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null).
> How are you calling it?
>
>
>
> When I do:
>
> Val df = spark.read.format(mypackage).load().show()
>
> I am getting a single creation, how are you creating the reader?
>
>
>
> Thanks,
>
> Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
> *Sent:* Tuesday, October 9, 2018 2:02 PM
> *To:* Mendelson, Assaf; user@spark.apache.org
> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>
>
>
> Full Code:
>
>
>
> MyDataSource is the entry point w

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Shubham Chaurasia
Thanks Assaf, you tried with *tags/v2.4.0-rc2?*

Full Code:

MyDataSource is the entry point which simply creates Reader and Writer

public class MyDataSource implements DataSourceV2, WriteSupport,
ReadSupport, SessionConfigSupport {

  @Override public DataSourceReader createReader(DataSourceOptions options)
{
return new MyDataSourceReader(options.asMap());
  }

  @Override
  public Optional createWriter(String jobId, StructType
schema,
  SaveMode mode, DataSourceOptions options) {
// creates a dataSourcewriter here..
return Optional.of(dataSourcewriter);
  }

  @Override public String keyPrefix() {
return "myprefix";
  }

}

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a
different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
this + " schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema:
" + this.schema);
return this.schema;
  }
}

Thanks,
Shubham

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf 
wrote:

> Could you add a fuller code example? I tried to reproduce it in my
> environment and I am getting just one instance of the reader…
>
>
>
> Thanks,
>
> Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
> *Sent:* Tuesday, October 9, 2018 9:31 AM
> *To:* user@spark.apache.org
> *Subject:* DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Hi All,
>
>
>
> --Spark built with *tags/v2.4.0-rc2*
>
>
>
> Consider following DataSourceReader implementation:
>
>
>
> *public class *MyDataSourceReader *implements *DataSourceReader, 
> SupportsScanColumnarBatch {
>
>   StructType *schema *= *null*;
>   Map *options*;
>
>   *public *MyDataSourceReader(Map options) {
> System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: 
> Instantiated" *+ *this*);
> *this*.*options *= options;
>   }
>
>   @Override
>   *public *List> planBatchInputPartitions() {
>
> *//variable this.schema is null here since readSchema() was called on a 
> different instance
> *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ 
> *this *+ *" schema: " *+ *this*.*schema*);
>
> *//more logic..**return null*;
>   }
>
>   @Override
>   *public *StructType readSchema() {
>
> *//some logic to discover schema**this*.*schema *= (*new *StructType())
> .add(*"col1"*, *"int"*)
> .add(*"col2"*, *"string"*);
> System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" 
> schema: " *+ *this*.*schema*);
> *return this*.*schema*;
>   }
> }
>
> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets 
> class variable schema.
>
> 2) Now when planBatchInputPartitions() is called, it is being called on a 
> different instance of MyDataSourceReader and hence I am not getting the value 
> of schema in method planBatchInputPartitions().
>
>
>
> How can I get value of schema which was set in readSchema() method, in 
> planBatchInputPartitions() method?
>
>
>
> Console Logs:
>
>
>
> scala> mysource.executeQuery("select * from movie").show
>
>
>
> MyDataSourceReader.MyDataSourceReader: 
> InstantiatedMyDataSourceReader@59ea8f1b
>
> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: 
> StructType(StructField(col1,IntegerType,true), 
> StructField(col2,StringType,true))
>
> MyDataSourceReader.MyDataSourceReader: 
> InstantiatedMyDataSourceReader@a3cd3ff
>
> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff 
> schema: null
>
>
>
> Thanks,
>
> Shubham
>
>
>
>


DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Shubham Chaurasia
Hi All,

--Spark built with *tags/v2.4.0-rc2*

Consider following DataSourceReader implementation:

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called
on a different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: "
+ this + " schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + "
schema: " + this.schema);
return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which
sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called
on a different instance of MyDataSourceReader and hence I am not
getting the value of schema in method planBatchInputPartitions().

How can I get value of schema which was set in readSchema() method, in
planBatchInputPartitions() method?

Console Logs:

scala> mysource.executeQuery("select * from movie").show

MyDataSourceReader.MyDataSourceReader:
InstantiatedMyDataSourceReader@59ea8f1b
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema:
StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader:
InstantiatedMyDataSourceReader@a3cd3ff
MyDataSourceReader.planBatchInputPartitions:
MyDataSourceReader@a3cd3ff schema: null


Thanks,
Shubham


Target java version not set when building spark with tags/v2.4.0-rc2

2018-10-07 Thread Shubham Chaurasia
Hi All,

I built spark with *tags/v2.4.0-rc2* using
./build/mvn -DskipTests -Phadoop-2.7  -Dhadoop.version=3.1.0 clean install

Now from spark-shell when ever I call any static method residing in an
interface, it shows me error like :
:28: error: Static methods in interface require -target:jvm-1.8

However spark shell is getting correct java version, I verified like:

scala> java.lang.System.getProperty("java.version")
res1: String = 1.8.0_181

This was not the case when I was building with tags/v2.3.2

Have I missed something?

Thanks,
Shubham


Error in java_gateway.py

2018-08-08 Thread shubham
Following the code snippets on  this thread
  , I got a working version of
XGBoost on pyspark. But one issues I am still facing is the following
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/dummy_package/xgboost/xgboost.py",
line 92, in __init__self._java_obj =
self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator",
self.uid)  File
"/Users/ultrauser/Downloads/spark/python/pyspark/ml/wrapper.py", line 61, in
_new_java_objjava_obj = getattr(java_obj, name)  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/java_gateway.py",
line 1598, in __getattr__raise Py4JError("{0} does not exist in the
JVM".format(new_fqn))py4j.protocol.Py4JError:
ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator does not exist in the
JVMException ignored in: Traceback (most recent call last):  File
"/Users/ultrauser/Downloads/spark/python/pyspark/ml/wrapper.py", line 105,
in __del__   
SparkContext._active_spark_context._gateway.detach(self._java_obj)  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/java_gateway.py",
line 2000, in detachjava_object._detach()AttributeError: 'NoneType'
object has no attribute '_detach'
>From what I read on StackOverflow and elsewhere, this looks like an issue of
jar locations. I have two jar files that are needed for this code to work
   
xgboost4j-0.72.jar   
xgboost4j-spark-0.72
But I am not sure how to proceed. This is what I have tried so far
place the xgboost jar files in 
/Library/Java/Extensions
set the environment variables 
import osos.environ['PYSPARK_SUBMIT_ARGS'] = '--jars
/Users/ultrauser/Downloads/xgboost4j-0.72.jar,
/Users/ultrauser/Downloads/xgboost4j-spark-0.72.jar pyspark-shell'
But the error still persists. Is there something I am missing here. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

Understanding the results from Spark's KMeans clustering object

2018-05-18 Thread shubham
Hello Everyone,

I am performing clustering on a dataset using PySpark. To find the number of
clusters I performed clustering over a range of values (2,20) and found the
wsse (within-cluster sum of squares) values for each value of k. This where
I found something unusual. According to my understanding when you increase
the number of clusters, the wsse decreases monotonically. But results I got
say otherwise. I 'm displaying wsse for first few clusters only

Results from spark

For k = 002 WSSE is 255318.793358
For k = 003 WSSE is 209788.479560
For k = 004 WSSE is 208498.351074
For k = 005 WSSE is 142573.272672
For k = 006 WSSE is 154419.027612
For k = 007 WSSE is 115092.404604
For k = 008 WSSE is 104753.205635
For k = 009 WSSE is 98000.985547
For k = 010 WSSE is 95134.137071
If you look at the wsse value of for k=5 and k=6, you'll see the wsse has
increased. I turned to sklearn to see if I get similar results. The codes I
used for spark and sklearn are in the appendix section towards the end of
the post. I have tried to use same values for the parameters in spark and
sklearn KMeans model. The following are the results from sklearn and they
are as I expected them to be - monotonically decreasing.

Results from sklearn

For k = 002 WSSE is 245090.224247
For k = 003 WSSE is 201329.888159
For k = 004 WSSE is 166889.044195
For k = 005 WSSE is 142576.895154
For k = 006 WSSE is 123882.070776
For k = 007 WSSE is 112496.692455
For k = 008 WSSE is 102806.001664
For k = 009 WSSE is 95279.837212
For k = 010 WSSE is 89303.574467
I am not sure as to why I the wsse values increase in Spark. I tried using
different datasets and found similar behavior there as well. Is there
someplace I am going wrong? Any clues would be great.

APPENDIX
The dataset is located here.

Read the data and set declare variables

# get data
import pandas as pd
url =
"https://raw.githubusercontent.com/vectosaurus/bb_lite/master/3.0%20data/adult_comp_cont.csv;

df_pandas = pd.read_csv(url)
df_spark = sqlContext(df_pandas)
target_col = 'high_income'
numeric_cols = [i for i in df_pandas.columns if i !=target_col]

k_min = 2 # 2 in inclusive
k_max = 21 # 2i is exlusive. will fit till 20

max_iter = 1000
seed = 42
This is the code I am using for getting the sklearn results:

from sklearn.cluster import KMeans as KMeans_SKL
from sklearn.preprocessing import StandardScaler as StandardScaler_SKL

ss = StandardScaler_SKL(with_std=True, with_mean=True)
ss.fit(df_pandas.loc[:, numeric_cols])
df_pandas_scaled = pd.DataFrame(ss.transform(df_pandas.loc[:,
numeric_cols]))

wsse_collect = []

for i in range(k_min, k_max):
km = KMeans_SKL(random_state=seed, max_iter=max_iter, n_clusters=i)
_ = km.fit(df_pandas_scaled)
wsse = km.inertia_
print('For k = {i:03d} WSSE is {wsse:10f}'.format(i=i, wsse=wsse))
wsse_collect.append(wsse)
This is the code I am using for getting the spark results

from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.clustering import KMeans

standard_scaler_inpt_features = 'ss_features'
kmeans_input_features = 'features'
kmeans_prediction_features = 'prediction'


assembler = VectorAssembler(inputCols=numeric_cols,
outputCol=standard_scaler_inpt_features)
assembled_df = assembler.transform(df_spark)

scaler = StandardScaler(inputCol=standard_scaler_inpt_features,
outputCol=kmeans_input_features, withStd=True, withMean=True)
scaler_model = scaler.fit(assembled_df)
scaled_data = scaler_model.transform(assembled_df)

wsse_collect_spark = []

for i in range(k_min, k_max):
km = KMeans(featuresCol=kmeans_input_features,
predictionCol=kmeans_prediction_col,
k=i, maxIter=max_iter, seed=seed)
km_fit = km.fit(scaled_data)
wsse_spark = km_fit.computeCost(scaled_data)
wsse_collect_spark .append(wsse_spark)
print('For k = {i:03d} WSSE is {wsse:10f}'.format(i=i, wsse=wsse_spark))




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



how to flatten the dataframe

2016-03-06 Thread shubham@celebal
root 
 |-- adultbasefare: long (nullable = true) 
 |-- adultcommission: long (nullable = true) 
 |-- adultservicetax: long (nullable = true) 
 |-- adultsurcharge: long (nullable = true) 
 |-- airline: string (nullable = true) 
 |-- arrdate: string (nullable = true) 
 |-- arrtime: string (nullable = true) 
 |-- cafecommission: long (nullable = true) 
 |-- carrierid: string (nullable = true) 
 |-- class: string (nullable = true) 
 |-- depdate: string (nullable = true) 
 |-- deptime: string (nullable = true) 
 |-- destination: string (nullable = true) 
 |-- discount: long (nullable = true) 
 |-- duration: string (nullable = true) 
 |-- fare: struct (nullable = true) 
 ||-- A: long (nullable = true) 
 ||-- C: long (nullable = true) 
 ||-- I: long (nullable = true) 
 ||-- adultairlinetxncharge: long (nullable = true) 
 ||-- adultairporttax: long (nullable = true) 
 ||-- adultbasefare: long (nullable = true) 
 ||-- adultcommission: double (nullable = true) 
 ||-- adultsurcharge: long (nullable = true) 
 ||-- adulttotalfare: long (nullable = true) 
 ||-- childairlinetxncharge: long (nullable = true) 
 ||-- childairporttax: long (nullable = true) 
 ||-- childbasefare: long (nullable = true) 
 ||-- childcommission: double (nullable = true) 
 ||-- childsurcharge: long (nullable = true) 
 ||-- childtotalfare: long (nullable = true) 
 ||-- discount: long (nullable = true) 
 ||-- infantairlinetxncharge: long (nullable = true) 
 ||-- infantairporttax: long (nullable = true) 
 ||-- infantbasefare: long (nullable = true) 
 ||-- infantcommission: long (nullable = true) 
 ||-- infantsurcharge: long (nullable = true) 
 ||-- infanttotalfare: long (nullable = true) 
 ||-- servicetax: long (nullable = true) 
 ||-- totalbasefare: long (nullable = true) 
 ||-- totalcommission: double (nullable = true) 
 ||-- totalfare: long (nullable = true) 
 ||-- totalsurcharge: long (nullable = true) 
 ||-- transactionfee: long (nullable = true) 
 |-- farebasis: string (nullable = true) 
 |-- farerule: string (nullable = true) 
 |-- flightcode: string (nullable = true) 
 |-- flightno: string (nullable = true) 
 |-- k: string (nullable = true) 
 |-- onwardflights: array (nullable = true) 
 ||-- element: string (containsNull = true) 
 |-- origin: string (nullable = true) 
 |-- promocode: string (nullable = true) 
 |-- promodiscount: long (nullable = true) 
 |-- promotionText: string (nullable = true) 
 |-- stops: string (nullable = true) 
 |-- tickettype: string (nullable = true) 
 |-- totalbasefare: long (nullable = true) 
 |-- totalcommission: long (nullable = true) 
 |-- totalfare: long (nullable = true) 
 |-- totalpriceamount: long (nullable = true) 
 |-- totalsurcharge: long (nullable = true) 
 |-- transactionfee: long (nullable = true) 
 |-- viacharges: long (nullable = true) 
 |-- warnings: string (nullable = true) 



Now i want to flatten it so that the fare field will be removed and
everything will be flatten 

For this i used explode. But i am getting an error: 

org.apache.spark.sql.AnalysisException: cannot resolve 'explode(fare)' due
to data type mismatch: input to function explode should be array or map
type, not StructType(StructField(A,LongType,true),
StructField(C,LongType,true), StructField(I,LongType,true),
StructField(adultairlinetxncharge,LongType,true),
StructField(adultairporttax,LongType,true),
StructField(adultbasefare,LongType,true),
StructField(adultcommission,DoubleType,true),
StructField(adultsurcharge,LongType,true),
StructField(adulttotalfare,LongType,true),
StructField(childairlinetxncharge,LongType,true),
StructField(childairporttax,LongType,true),
StructField(childbasefare,LongType,true),
StructField(childcommission,DoubleType,true),
StructField(childsurcharge,LongType,true),
StructField(childtotalfare,LongType,true),
StructField(discount,LongType,true),
StructField(infantairlinetxncharge,LongType,true),
StructField(infantairporttax,LongType,true),
StructField(infantbasefare,LongType,true),
StructField(infantcommission,LongType,true),
StructField(infantsurcharge,LongType,true),
StructField(infanttotalfare,LongType,true),
StructField(servicetax,LongType,true),
StructField(totalbasefare,LongType,true),
StructField(totalcommission,DoubleType,true),
StructField(totalfare,LongType,true),
StructField(totalsurcharge,LongType,true),
StructField(transactionfee,LongType,true)); 

If not explode how can i flatten it.Your help will be appreciated. Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-flatten-the-dataframe-tp26411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org