check is empty effieciently

2019-06-26 Thread SNEHASISH DUTTA
Hi,
which is more efficient?

this is already defined since 2.4.0


*def isEmpty: Boolean = withAction("isEmpty",
limit(1).groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0) == 0}*

or

* df.head(1).isEmpty*


I am checking if a DF is empty and it is taking forever

Regards,
Snehasish


Generic Dataset[T] Query

2019-05-08 Thread SNEHASISH DUTTA
Hi ,

I am trying to write a generic method which will return custom type
datasets as well as spark.sql.Row

def read[T](params: Map[String, Any])(implicit encoder: Encoder[T]): Dataset[T]

is my method signature, which is working fine for custom types but when I
am trying to obtain a Dataset[Row] it errors out with the following message

" Unable to find encoder for type org.apache.spark.sql.Row. An implicit
Encoder[org.apache.spark.sql.Row] is needed to store
org.apache.spark.sql.Row instances in a Dataset. Primitive types (Int,
String, etc) and Product types (case classes) are supported by importing
spark.implicits._ "

Is it possible to make some changes so that it can process both custom
types and Row type.

Regards,
Snehasish


Re: Handle Null Columns in Spark Structured Streaming Kafka

2019-04-30 Thread SNEHASISH DUTTA
Hi

NA function will replace null with some default value and not all my
columns are of type string, so for some other data types  (long/int etc) I
have to provide some default value

But ideally those values should be null
Actually this null column drop is happening in this step


df.selectExpr( "to_json(struct(*)) AS value")

Is it possible to retain those columns where all the values are null in
this step without using na functions ?

Regards,
Snehasish

On Tue, Apr 30, 2019, 4:58 AM Jason Nerothin 
wrote:

> See also here:
> https://stackoverflow.com/questions/44671597/how-to-replace-null-values-with-a-specific-value-in-dataframe-using-spark-in-jav
>
> On Mon, Apr 29, 2019 at 5:27 PM Jason Nerothin 
> wrote:
>
>> Spark SQL has had an na.fill function on it since at least 2.1. Would
>> that work for you?
>>
>>
>> https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameNaFunctions.html
>>
>> On Mon, Apr 29, 2019 at 4:57 PM Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Snehasish,
>>>
>>> Do you have a reproducer for this issue?
>>>
>>> Best Regards,
>>> Ryan
>>>
>>>
>>> On Wed, Apr 24, 2019 at 7:24 AM SNEHASISH DUTTA <
>>> info.snehas...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> While writing to kafka using spark structured streaming , if all the
>>>> values in certain column are Null it gets dropped
>>>> Is there any way to override this , other than using na.fill functions
>>>>
>>>> Regards,
>>>> Snehasish
>>>>
>>>
>>
>> --
>> Thanks,
>> Jason
>>
>
>
> --
> Thanks,
> Jason
>


Handle Null Columns in Spark Structured Streaming Kafka

2019-04-24 Thread SNEHASISH DUTTA
Hi,

While writing to kafka using spark structured streaming , if all the values
in certain column are Null it gets dropped
Is there any way to override this , other than using na.fill functions

Regards,
Snehasish


Shuffling Data After Union and Write

2018-04-13 Thread SNEHASISH DUTTA
Hi,

I am currently facing an issue , while performing union on three data fames
say df1,df2,df3 once the operation is performed and I am trying to save the
data , the data is getting shuffled so the ordering of data in df1,df2,df3
are not maintained.

When I save the data as text/csv file the content of the data gets shuffled
within.
There is no way to order the dataframe as these 3 dataframes don't share
any common field/constraint.

Let me know if there is a work around to maintain the ordering of the
dataframes after union and write.

Regards,
Snehasish


Access Table with Spark Dataframe

2018-03-20 Thread SNEHASISH DUTTA
Hi,

I am using Spark 2.2 , a table fetched from database contains a (.) dot in
one of the column names.
Whenever I am trying to select that particular column I am getting query
analysis exception.


I have tried creating a temporary table , using createOrReplaceTempView()
and fetch the column's data but same was the outcome.

How can this ('.') be escaped,while querying ?


Thanks and Regards,
Snehasish


CSV use case

2018-02-21 Thread SNEHASISH DUTTA
Hi,

I am using spark 2.2 csv reader

I have data in following format

123|123|"abc"||""|"xyz"

Where || is null
And "" is one blank character as per the requirement

I was using option sep as pipe
And option quote as ""
Parsed the data and using regex I was able to fulfill all the mentioned
conditions.
It started failing when I started column values like this "|" and """ ,
i.e. separator itself has become a column value,quote has become a value in
column and spark started using this value and made extra columns.

After this I used the escape option on "|", but results are similar.

I then tried dataset with split on "\\|" which had similar outcome

Is there any way to resolve this.

Thanks and Regards,
Snehasish


Re: Serialize a DataFrame with Vector values into text/csv file

2018-02-20 Thread SNEHASISH DUTTA
Hi Mina,

Even text won't work you may try this  df.coalesce(1).write.option("h
eader","true").mode("overwrite").save("output",format=text)
Else convert to an rdd and use saveAsTextFile

Regards,
Snehasish

On Wed, Feb 21, 2018 at 3:38 AM, SNEHASISH DUTTA 
wrote:

> Hi Mina,
> This might work then
>
> df.coalesce(1).write.option("header","true").mode("overwrite
> ").text("output")
>
> Regards,
> Snehasish
>
> On Wed, Feb 21, 2018 at 3:21 AM, Mina Aslani  wrote:
>
>> Hi Snehasish,
>>
>> Using df.coalesce(1).write.option("header","true").mode("overwrite
>> ").csv("output") throws
>>
>> java.lang.UnsupportedOperationException: CSV data source does not
>> support struct<...> data type.
>>
>>
>> Regards,
>> Mina
>>
>>
>>
>>
>> On Tue, Feb 20, 2018 at 4:36 PM, SNEHASISH DUTTA <
>> info.snehas...@gmail.com> wrote:
>>
>>> Hi Mina,
>>> This might help
>>> df.coalesce(1).write.option("header","true").mode("overwrite
>>> ").csv("output")
>>>
>>> Regards,
>>> Snehasish
>>>
>>> On Wed, Feb 21, 2018 at 1:53 AM, Mina Aslani 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I would like to serialize a dataframe with vector values into a
>>>> text/csv in pyspark.
>>>>
>>>> Using below line, I can write the dataframe(e.g. df) as parquet,
>>>> however I cannot open it in excel/as text.
>>>> df.coalesce(1).write.option("header","true").mode("overwrite
>>>> ").save("output")
>>>>
>>>> Best regards,
>>>> Mina
>>>>
>>>>
>>>
>>
>


Re: Serialize a DataFrame with Vector values into text/csv file

2018-02-20 Thread SNEHASISH DUTTA
 Hi Mina,
This might work then

df.coalesce(1).write.option("header","true").mode("overwrite
").text("output")

Regards,
Snehasish

On Wed, Feb 21, 2018 at 3:21 AM, Mina Aslani  wrote:

> Hi Snehasish,
>
> Using df.coalesce(1).write.option("header","true").mode("overwrite
> ").csv("output") throws
>
> java.lang.UnsupportedOperationException: CSV data source does not support
> struct<...> data type.
>
>
> Regards,
> Mina
>
>
>
>
> On Tue, Feb 20, 2018 at 4:36 PM, SNEHASISH DUTTA  > wrote:
>
>> Hi Mina,
>> This might help
>> df.coalesce(1).write.option("header","true").mode("overwrite
>> ").csv("output")
>>
>> Regards,
>> Snehasish
>>
>> On Wed, Feb 21, 2018 at 1:53 AM, Mina Aslani 
>> wrote:
>>
>>> Hi,
>>>
>>> I would like to serialize a dataframe with vector values into a text/csv
>>> in pyspark.
>>>
>>> Using below line, I can write the dataframe(e.g. df) as parquet, however
>>> I cannot open it in excel/as text.
>>> df.coalesce(1).write.option("header","true").mode("overwrite
>>> ").save("output")
>>>
>>> Best regards,
>>> Mina
>>>
>>>
>>
>


Re: Serialize a DataFrame with Vector values into text/csv file

2018-02-20 Thread SNEHASISH DUTTA
Hi Mina,
This might help
df.coalesce(1).write.option("header","true").mode("overwrite").csv("output")

Regards,
Snehasish

On Wed, Feb 21, 2018 at 1:53 AM, Mina Aslani  wrote:

> Hi,
>
> I would like to serialize a dataframe with vector values into a text/csv
> in pyspark.
>
> Using below line, I can write the dataframe(e.g. df) as parquet, however I
> cannot open it in excel/as text.
> df.coalesce(1).write.option("header","true").mode("overwrite
> ").save("output")
>
> Best regards,
> Mina
>
>


Re: Can spark handle this scenario?

2018-02-17 Thread SNEHASISH DUTTA
Hi  Lian,

This could be the solution


case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


symbolDs.map { k =>

  pullSymbolFromYahoo(k.symbol, k.sector)

}(org.apache.spark.sql.Encoders.kryo[Tick.getClass])


Thanks,

Snehasish


Regards,
Snehasish

On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau 
wrote:

> I'm not sure what you mean by it could be hard to serialize complex
> operations?
>
> Regardless I think the question is do you want to parallelize this on
> multiple machines or just one?
>
> On Feb 17, 2018 4:20 PM, "Lian Jiang"  wrote:
>
>> Thanks Ayan. RDD may support map better than Dataset/DataFrame. However,
>> it could be hard to serialize complex operation for Spark to execute in
>> parallel. IMHO, spark does not fit this scenario. Hope this makes sense.
>>
>> On Fri, Feb 16, 2018 at 8:58 PM, ayan guha  wrote:
>>
>>> ** You do NOT need dataframes, I mean.
>>>
>>> On Sat, Feb 17, 2018 at 3:58 PM, ayan guha  wrote:
>>>
 Hi

 Couple of suggestions:

 1. Do not use Dataset, use Dataframe in this scenario. There is no
 benefit of dataset features here. Using Dataframe, you can write an
 arbitrary UDF which can do what you want to do.
 2. In fact you do need dataframes here. You would be better off with
 RDD here. just create a RDD of symbols and use map to do the processing.

 On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran 
 wrote:

> Do you only want to use Scala? Because otherwise, I think with pyspark
> and pandas read table you should be able to accomplish what you want to
> accomplish.
>
> Thank you,
>
> Irving Duran
>
> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>
> Hi,
>
> I have a user case:
>
> I want to download S&P500 stock data from Yahoo API in parallel using
> Spark. I have got all stock symbols as a Dataset. Then I used below code 
> to
> call Yahoo API for each symbol:
>
>
>
> case class Symbol(symbol: String, sector: String)
>
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
>
>
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
> Dataset[Tick]
>
>
> symbolDs.map { k =>
>
>   pullSymbolFromYahoo(k.symbol, k.sector)
>
> }
>
>
> This statement cannot compile:
>
>
> Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will
> be added in future releases.
>
>
> My questions are:
>
>
> 1. As you can see, this scenario is not traditional dataset handling
> such as count, sql query... Instead, it is more like a UDF which apply
> random operation on each record. Is Spark good at handling such scenario?
>
>
> 2. Regarding the compilation error, any fix? I did not find a
> satisfactory solution online.
>
>
> Thanks for help!
>
>
>
>
>
>


 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>