Re: Can spark handle this scenario?

2018-02-26 Thread Lian Jiang
Thanks Vijay. After changing the programming model (create a context class
for the workers), it finally worked for me. Cheers.

On Fri, Feb 23, 2018 at 5:42 PM, vijay.bvp  wrote:

> when HTTP connection is opened you are opening a connection between
> specific
> machine (with IP and NIC card) to another specific machine, so this can't
> be
> serialized and used on other machine right!!
>
> This isn't spark limitation.
>
> I made a simple diagram if it helps. The Objects created at driver and
> passed to worker need to be serialized. The objects created at workers need
> not.
>
> In the diagram you have to create HTTPConnection on the executors
> independently of the driver.
> The HTTPConnection created at Executor-1 can be used for partitions P1-P3
> of
> RDD available on that executor.
>
> Spark is tolerant and does allow passing objects from driver to workers,
> but
> in case if it reports "Task not serializable"  it does indicate some object
> is having issue. mark the class as Serializable if you think if the object
> of it can be serialized. As I said in the beginning not everything could
> serializable particularly http connections, JDBC connections etc..
>
>  file/t8878/Picture1.png>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Can spark handle this scenario?

2018-02-23 Thread vijay.bvp
when HTTP connection is opened you are opening a connection between specific
machine (with IP and NIC card) to another specific machine, so this can't be
serialized and used on other machine right!!

This isn't spark limitation. 

I made a simple diagram if it helps. The Objects created at driver and
passed to worker need to be serialized. The objects created at workers need
not. 

In the diagram you have to create HTTPConnection on the executors
independently of the driver.
The HTTPConnection created at Executor-1 can be used for partitions P1-P3 of
RDD available on that executor. 

Spark is tolerant and does allow passing objects from driver to workers, but
in case if it reports "Task not serializable"  it does indicate some object
is having issue. mark the class as Serializable if you think if the object
of it can be serialized. As I said in the beginning not everything could
serializable particularly http connections, JDBC connections etc.. 

 














--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can spark handle this scenario?

2018-02-22 Thread Lian Jiang
Hi Vijay,

Should HTTPConnection() (or any other object created per partition) be
serializable so that your code work? If so, the usage seems to be limited.

Sometimes, the error caused by a non-serializable object can be very
misleading (e.g. "Return statements aren't allowed in Spark closures")
instead of "Task not serializable".

The post shared by Anastasios helps but does not completely resolve the
"need serialization" problem. For example, if I need to create per
partition class object that
relies on other objects which may not be serializable, then wrapping the
object creation in an object making it a static function does not help, not
mentioning
the programming model becomes unintuitive.

I have been played this scenario for some time and still frustrated. Thanks






On Tue, Feb 20, 2018 at 12:47 AM, vijay.bvp  wrote:

> I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
> with some token passed, in the code provided so far if you have 2000
> symbols, it will make 2000 new connections!! and 2000 API calls
> connection objects can't/shouldn't be serialized and send to executors,
> they
> should rather be created at executors.
>
> the philosophy given below is nicely documented on Spark Streaming, look at
> Design Patterns for using foreachRDD
> https://spark.apache.org/docs/latest/streaming-programming-
> guide.html#output-operations-on-dstreams
>
>
> case class Symbol(symbol: String, sector: String)
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
> //assume symbolDs is rdd of symbol and tick dataset/dataframe can be
> converted to RDD
> symbolRdd.foreachPartition(partition => {
>//this code runs at executor
>   //open a connection here -
>   val connectionToYahoo = new HTTPConnection()
>
>   partition.foreach(k => {
>   pullSymbolFromYahoo(k.symbol, k.sector,connectionToYahoo)
>   }
> }
>
> with the above code if the dataset has 10 partitions (2000 symbols), only
> 10
> connections will be opened though it makes 2000 API calls.
> you should also be looking at sending and receiving results for large
> number
> of symbols, because of the amount of parallelism that spark provides you
> might run into rate limit on the APIs. if you are bulk sending symbols
> above
> pattern also very much useful
>
> thanks
> Vijay
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Can spark handle this scenario?

2018-02-20 Thread Lian Jiang
Thanks Vijay! This is very clear.

On Tue, Feb 20, 2018 at 12:47 AM, vijay.bvp  wrote:

> I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
> with some token passed, in the code provided so far if you have 2000
> symbols, it will make 2000 new connections!! and 2000 API calls
> connection objects can't/shouldn't be serialized and send to executors,
> they
> should rather be created at executors.
>
> the philosophy given below is nicely documented on Spark Streaming, look at
> Design Patterns for using foreachRDD
> https://spark.apache.org/docs/latest/streaming-programming-
> guide.html#output-operations-on-dstreams
>
>
> case class Symbol(symbol: String, sector: String)
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
> //assume symbolDs is rdd of symbol and tick dataset/dataframe can be
> converted to RDD
> symbolRdd.foreachPartition(partition => {
>//this code runs at executor
>   //open a connection here -
>   val connectionToYahoo = new HTTPConnection()
>
>   partition.foreach(k => {
>   pullSymbolFromYahoo(k.symbol, k.sector,connectionToYahoo)
>   }
> }
>
> with the above code if the dataset has 10 partitions (2000 symbols), only
> 10
> connections will be opened though it makes 2000 API calls.
> you should also be looking at sending and receiving results for large
> number
> of symbols, because of the amount of parallelism that spark provides you
> might run into rate limit on the APIs. if you are bulk sending symbols
> above
> pattern also very much useful
>
> thanks
> Vijay
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Can spark handle this scenario?

2018-02-20 Thread vijay.bvp
I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
with some token passed, in the code provided so far if you have 2000
symbols, it will make 2000 new connections!! and 2000 API calls
connection objects can't/shouldn't be serialized and send to executors, they
should rather be created at executors. 

the philosophy given below is nicely documented on Spark Streaming, look at
Design Patterns for using foreachRDD
https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams


case class Symbol(symbol: String, sector: String)
case class Tick(symbol: String, sector: String, open: Double, close: Double)
//assume symbolDs is rdd of symbol and tick dataset/dataframe can be
converted to RDD
symbolRdd.foreachPartition(partition => {
   //this code runs at executor
  //open a connection here - 
  val connectionToYahoo = new HTTPConnection()

  partition.foreach(k => {
  pullSymbolFromYahoo(k.symbol, k.sector,connectionToYahoo)
  }
}

with the above code if the dataset has 10 partitions (2000 symbols), only 10
connections will be opened though it makes 2000 API calls. 
you should also be looking at sending and receiving results for large number
of symbols, because of the amount of parallelism that spark provides you
might run into rate limit on the APIs. if you are bulk sending symbols above
pattern also very much useful

thanks
Vijay







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can spark handle this scenario?

2018-02-17 Thread Lian Jiang
Thanks Anastasios. This link is helpful!

On Sat, Feb 17, 2018 at 11:05 AM, Anastasios Zouzias 
wrote:

> Hi Lian,
>
> The remaining problem is:
>
>
> Spark need all classes used in the fn() serializable for t.rdd.map{ k=>
> fn(k) } to work. This could be hard since some classes in third party
> libraries are not serializable. This restricts the power of using spark to
> parallel an operation on multiple machines. Hope this is clear.
>
>
> This is not entirely true. You can bypass the serialisation issue in most
> cases, see the link below for an example.
>
> https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-
> apache-spark/
>
> In a nutshell, the non-serialisable code is available to all executors, so
> there is no need for Spark to serialise from the driver to the executors.
>
> Best regards,
> Anastasios
>
>
>
>
> On Sat, Feb 17, 2018 at 6:13 PM, Lian Jiang  wrote:
>
>> *Snehasish,*
>>
>> I got this in spark-shell 2.11.8:
>>
>> case class My(name:String, age:Int)
>>
>> import spark.implicits._
>>
>> val t = List(new My("lian", 20), new My("sh", 3)).toDS
>>
>> t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])
>>
>>
>> :31: error: type getClass is not a member of object My
>>
>>t.map{ k=> print(My) }(org.apache.spark.sql.Encoder
>> s.kryo[My.getClass])
>>
>>
>>
>> Using RDD can workaround this issue as mentioned in previous emails:
>>
>>
>>  t.rdd.map{ k=> print(k) }
>>
>>
>> *Holden,*
>>
>>
>> The remaining problem is:
>>
>>
>> Spark need all classes used in the fn() serializable for t.rdd.map{ k=>
>> fn(k) } to work. This could be hard since some classes in third party
>> libraries are not serializable. This restricts the power of using spark to
>> parallel an operation on multiple machines. Hope this is clear.
>>
>>
>> On Sat, Feb 17, 2018 at 12:04 AM, SNEHASISH DUTTA <
>> info.snehas...@gmail.com> wrote:
>>
>>> Hi  Lian,
>>>
>>> This could be the solution
>>>
>>>
>>> case class Symbol(symbol: String, sector: String)
>>>
>>> case class Tick(symbol: String, sector: String, open: Double, close:
>>> Double)
>>>
>>>
>>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
>>> Dataset[Tick]
>>>
>>>
>>> symbolDs.map { k =>
>>>
>>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>>
>>> }(org.apache.spark.sql.Encoders.kryo[Tick.getClass])
>>>
>>>
>>> Thanks,
>>>
>>> Snehasish
>>>
>>>
>>> Regards,
>>> Snehasish
>>>
>>> On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau 
>>> wrote:
>>>
 I'm not sure what you mean by it could be hard to serialize complex
 operations?

 Regardless I think the question is do you want to parallelize this on
 multiple machines or just one?

 On Feb 17, 2018 4:20 PM, "Lian Jiang"  wrote:

> Thanks Ayan. RDD may support map better than Dataset/DataFrame.
> However, it could be hard to serialize complex operation for Spark to
> execute in parallel. IMHO, spark does not fit this scenario. Hope this
> makes sense.
>
> On Fri, Feb 16, 2018 at 8:58 PM, ayan guha 
> wrote:
>
>> ** You do NOT need dataframes, I mean.
>>
>> On Sat, Feb 17, 2018 at 3:58 PM, ayan guha 
>> wrote:
>>
>>> Hi
>>>
>>> Couple of suggestions:
>>>
>>> 1. Do not use Dataset, use Dataframe in this scenario. There is no
>>> benefit of dataset features here. Using Dataframe, you can write an
>>> arbitrary UDF which can do what you want to do.
>>> 2. In fact you do need dataframes here. You would be better off with
>>> RDD here. just create a RDD of symbols and use map to do the processing.
>>>
>>> On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <
>>> irving.du...@gmail.com> wrote:
>>>
 Do you only want to use Scala? Because otherwise, I think with
 pyspark and pandas read table you should be able to accomplish what you
 want to accomplish.

 Thank you,

 Irving Duran

 On 02/16/2018 06:10 PM, Lian Jiang wrote:

 Hi,

 I have a user case:

 I want to download S stock data from Yahoo API in parallel
 using Spark. I have got all stock symbols as a Dataset. Then I used 
 below
 code to call Yahoo API for each symbol:



 case class Symbol(symbol: String, sector: String)

 case class Tick(symbol: String, sector: String, open: Double,
 close: Double)


 // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
 Dataset[Tick]


 symbolDs.map { k =>

   pullSymbolFromYahoo(k.symbol, k.sector)

 }


 This statement cannot compile:


 Unable to find encoder for 

Re: Can spark handle this scenario?

2018-02-17 Thread Anastasios Zouzias
Hi Lian,

The remaining problem is:


Spark need all classes used in the fn() serializable for t.rdd.map{ k=>
fn(k) } to work. This could be hard since some classes in third party
libraries are not serializable. This restricts the power of using spark to
parallel an operation on multiple machines. Hope this is clear.


This is not entirely true. You can bypass the serialisation issue in most
cases, see the link below for an example.

https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/

In a nutshell, the non-serialisable code is available to all executors, so
there is no need for Spark to serialise from the driver to the executors.

Best regards,
Anastasios




On Sat, Feb 17, 2018 at 6:13 PM, Lian Jiang  wrote:

> *Snehasish,*
>
> I got this in spark-shell 2.11.8:
>
> case class My(name:String, age:Int)
>
> import spark.implicits._
>
> val t = List(new My("lian", 20), new My("sh", 3)).toDS
>
> t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])
>
>
> :31: error: type getClass is not a member of object My
>
>t.map{ k=> print(My) }(org.apache.spark.sql.
> Encoders.kryo[My.getClass])
>
>
>
> Using RDD can workaround this issue as mentioned in previous emails:
>
>
>  t.rdd.map{ k=> print(k) }
>
>
> *Holden,*
>
>
> The remaining problem is:
>
>
> Spark need all classes used in the fn() serializable for t.rdd.map{ k=>
> fn(k) } to work. This could be hard since some classes in third party
> libraries are not serializable. This restricts the power of using spark to
> parallel an operation on multiple machines. Hope this is clear.
>
>
> On Sat, Feb 17, 2018 at 12:04 AM, SNEHASISH DUTTA <
> info.snehas...@gmail.com> wrote:
>
>> Hi  Lian,
>>
>> This could be the solution
>>
>>
>> case class Symbol(symbol: String, sector: String)
>>
>> case class Tick(symbol: String, sector: String, open: Double, close:
>> Double)
>>
>>
>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>>
>>
>> symbolDs.map { k =>
>>
>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>
>> }(org.apache.spark.sql.Encoders.kryo[Tick.getClass])
>>
>>
>> Thanks,
>>
>> Snehasish
>>
>>
>> Regards,
>> Snehasish
>>
>> On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau 
>> wrote:
>>
>>> I'm not sure what you mean by it could be hard to serialize complex
>>> operations?
>>>
>>> Regardless I think the question is do you want to parallelize this on
>>> multiple machines or just one?
>>>
>>> On Feb 17, 2018 4:20 PM, "Lian Jiang"  wrote:
>>>
 Thanks Ayan. RDD may support map better than Dataset/DataFrame.
 However, it could be hard to serialize complex operation for Spark to
 execute in parallel. IMHO, spark does not fit this scenario. Hope this
 makes sense.

 On Fri, Feb 16, 2018 at 8:58 PM, ayan guha  wrote:

> ** You do NOT need dataframes, I mean.
>
> On Sat, Feb 17, 2018 at 3:58 PM, ayan guha 
> wrote:
>
>> Hi
>>
>> Couple of suggestions:
>>
>> 1. Do not use Dataset, use Dataframe in this scenario. There is no
>> benefit of dataset features here. Using Dataframe, you can write an
>> arbitrary UDF which can do what you want to do.
>> 2. In fact you do need dataframes here. You would be better off with
>> RDD here. just create a RDD of symbols and use map to do the processing.
>>
>> On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <
>> irving.du...@gmail.com> wrote:
>>
>>> Do you only want to use Scala? Because otherwise, I think with
>>> pyspark and pandas read table you should be able to accomplish what you
>>> want to accomplish.
>>>
>>> Thank you,
>>>
>>> Irving Duran
>>>
>>> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>>>
>>> Hi,
>>>
>>> I have a user case:
>>>
>>> I want to download S stock data from Yahoo API in parallel
>>> using Spark. I have got all stock symbols as a Dataset. Then I used 
>>> below
>>> code to call Yahoo API for each symbol:
>>>
>>>
>>>
>>> case class Symbol(symbol: String, sector: String)
>>>
>>> case class Tick(symbol: String, sector: String, open: Double, close:
>>> Double)
>>>
>>>
>>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
>>> Dataset[Tick]
>>>
>>>
>>> symbolDs.map { k =>
>>>
>>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>>
>>> }
>>>
>>>
>>> This statement cannot compile:
>>>
>>>
>>> Unable to find encoder for type stored in a Dataset.  Primitive
>>> types (Int, String, etc) and Product types (case classes) are supported 
>>> by
>>> importing spark.implicits._  Support for serializing other types
>>> will be added in future releases.
>>>
>>>
>>> My questions are:
>>>
>>>
>>> 1. As you can 

Re: Can spark handle this scenario?

2018-02-17 Thread Lian Jiang
Agreed. Thanks.

On Sat, Feb 17, 2018 at 9:53 AM, Jörn Franke  wrote:

> You may want to think about separating the import step from the processing
> step. It is not very economical to download all the data again every time
> you want to calculate something. So download it first and store it on a
> distributed file system. Schedule to download newest information every day/
> hour etc. you can store it using a query optimized format such as ORC or
> Parquet. Then you can run queries over it.
>
> On 17. Feb 2018, at 01:10, Lian Jiang  wrote:
>
> Hi,
>
> I have a user case:
>
> I want to download S stock data from Yahoo API in parallel using
> Spark. I have got all stock symbols as a Dataset. Then I used below code to
> call Yahoo API for each symbol:
>
>
>
> case class Symbol(symbol: String, sector: String)
>
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
>
>
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>
>
> symbolDs.map { k =>
>
>   pullSymbolFromYahoo(k.symbol, k.sector)
>
> }
>
>
> This statement cannot compile:
>
>
> Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.
>
>
> My questions are:
>
>
> 1. As you can see, this scenario is not traditional dataset handling such
> as count, sql query... Instead, it is more like a UDF which apply random
> operation on each record. Is Spark good at handling such scenario?
>
>
> 2. Regarding the compilation error, any fix? I did not find a satisfactory
> solution online.
>
>
> Thanks for help!
>
>
>
>
>


Re: Can spark handle this scenario?

2018-02-17 Thread Jörn Franke
You may want to think about separating the import step from the processing 
step. It is not very economical to download all the data again every time you 
want to calculate something. So download it first and store it on a distributed 
file system. Schedule to download newest information every day/ hour etc. you 
can store it using a query optimized format such as ORC or Parquet. Then you 
can run queries over it.

> On 17. Feb 2018, at 01:10, Lian Jiang  wrote:
> 
> Hi,
> 
> I have a user case:
> 
> I want to download S stock data from Yahoo API in parallel using Spark. 
> I have got all stock symbols as a Dataset. Then I used below code to call 
> Yahoo API for each symbol:
> 
>
> case class Symbol(symbol: String, sector: String)
> case class Tick(symbol: String, sector: String, open: Double, close: Double)
> 
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
> 
> symbolDs.map { k =>
>   pullSymbolFromYahoo(k.symbol, k.sector)
> }
> 
> This statement cannot compile:
> 
> Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
> String, etc) and Product types (case classes) are supported by importing 
> spark.implicits._  Support for serializing other types will be added in 
> future releases.
> 
> 
> My questions are:
> 
> 1. As you can see, this scenario is not traditional dataset handling such as 
> count, sql query... Instead, it is more like a UDF which apply random 
> operation on each record. Is Spark good at handling such scenario?
> 
> 2. Regarding the compilation error, any fix? I did not find a satisfactory 
> solution online.
> 
> Thanks for help!
> 
> 
> 


Re: Can spark handle this scenario?

2018-02-17 Thread Lian Jiang
*Snehasish,*

I got this in spark-shell 2.11.8:

case class My(name:String, age:Int)

import spark.implicits._

val t = List(new My("lian", 20), new My("sh", 3)).toDS

t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])


:31: error: type getClass is not a member of object My

   t.map{ k=> print(My)
}(org.apache.spark.sql.Encoders.kryo[My.getClass])



Using RDD can workaround this issue as mentioned in previous emails:


 t.rdd.map{ k=> print(k) }


*Holden,*


The remaining problem is:


Spark need all classes used in the fn() serializable for t.rdd.map{ k=>
fn(k) } to work. This could be hard since some classes in third party
libraries are not serializable. This restricts the power of using spark to
parallel an operation on multiple machines. Hope this is clear.


On Sat, Feb 17, 2018 at 12:04 AM, SNEHASISH DUTTA 
wrote:

> Hi  Lian,
>
> This could be the solution
>
>
> case class Symbol(symbol: String, sector: String)
>
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
>
>
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>
>
> symbolDs.map { k =>
>
>   pullSymbolFromYahoo(k.symbol, k.sector)
>
> }(org.apache.spark.sql.Encoders.kryo[Tick.getClass])
>
>
> Thanks,
>
> Snehasish
>
>
> Regards,
> Snehasish
>
> On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau 
> wrote:
>
>> I'm not sure what you mean by it could be hard to serialize complex
>> operations?
>>
>> Regardless I think the question is do you want to parallelize this on
>> multiple machines or just one?
>>
>> On Feb 17, 2018 4:20 PM, "Lian Jiang"  wrote:
>>
>>> Thanks Ayan. RDD may support map better than Dataset/DataFrame. However,
>>> it could be hard to serialize complex operation for Spark to execute in
>>> parallel. IMHO, spark does not fit this scenario. Hope this makes sense.
>>>
>>> On Fri, Feb 16, 2018 at 8:58 PM, ayan guha  wrote:
>>>
 ** You do NOT need dataframes, I mean.

 On Sat, Feb 17, 2018 at 3:58 PM, ayan guha  wrote:

> Hi
>
> Couple of suggestions:
>
> 1. Do not use Dataset, use Dataframe in this scenario. There is no
> benefit of dataset features here. Using Dataframe, you can write an
> arbitrary UDF which can do what you want to do.
> 2. In fact you do need dataframes here. You would be better off with
> RDD here. just create a RDD of symbols and use map to do the processing.
>
> On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran  > wrote:
>
>> Do you only want to use Scala? Because otherwise, I think with
>> pyspark and pandas read table you should be able to accomplish what you
>> want to accomplish.
>>
>> Thank you,
>>
>> Irving Duran
>>
>> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>>
>> Hi,
>>
>> I have a user case:
>>
>> I want to download S stock data from Yahoo API in parallel using
>> Spark. I have got all stock symbols as a Dataset. Then I used below code 
>> to
>> call Yahoo API for each symbol:
>>
>>
>>
>> case class Symbol(symbol: String, sector: String)
>>
>> case class Tick(symbol: String, sector: String, open: Double, close:
>> Double)
>>
>>
>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
>> Dataset[Tick]
>>
>>
>> symbolDs.map { k =>
>>
>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>
>> }
>>
>>
>> This statement cannot compile:
>>
>>
>> Unable to find encoder for type stored in a Dataset.  Primitive
>> types (Int, String, etc) and Product types (case classes) are supported 
>> by
>> importing spark.implicits._  Support for serializing other types
>> will be added in future releases.
>>
>>
>> My questions are:
>>
>>
>> 1. As you can see, this scenario is not traditional dataset handling
>> such as count, sql query... Instead, it is more like a UDF which apply
>> random operation on each record. Is Spark good at handling such scenario?
>>
>>
>> 2. Regarding the compilation error, any fix? I did not find a
>> satisfactory solution online.
>>
>>
>> Thanks for help!
>>
>>
>>
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



 --
 Best Regards,
 Ayan Guha

>>>
>>>
>


Re: Can spark handle this scenario?

2018-02-17 Thread SNEHASISH DUTTA
Hi  Lian,

This could be the solution


case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


symbolDs.map { k =>

  pullSymbolFromYahoo(k.symbol, k.sector)

}(org.apache.spark.sql.Encoders.kryo[Tick.getClass])


Thanks,

Snehasish


Regards,
Snehasish

On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau 
wrote:

> I'm not sure what you mean by it could be hard to serialize complex
> operations?
>
> Regardless I think the question is do you want to parallelize this on
> multiple machines or just one?
>
> On Feb 17, 2018 4:20 PM, "Lian Jiang"  wrote:
>
>> Thanks Ayan. RDD may support map better than Dataset/DataFrame. However,
>> it could be hard to serialize complex operation for Spark to execute in
>> parallel. IMHO, spark does not fit this scenario. Hope this makes sense.
>>
>> On Fri, Feb 16, 2018 at 8:58 PM, ayan guha  wrote:
>>
>>> ** You do NOT need dataframes, I mean.
>>>
>>> On Sat, Feb 17, 2018 at 3:58 PM, ayan guha  wrote:
>>>
 Hi

 Couple of suggestions:

 1. Do not use Dataset, use Dataframe in this scenario. There is no
 benefit of dataset features here. Using Dataframe, you can write an
 arbitrary UDF which can do what you want to do.
 2. In fact you do need dataframes here. You would be better off with
 RDD here. just create a RDD of symbols and use map to do the processing.

 On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran 
 wrote:

> Do you only want to use Scala? Because otherwise, I think with pyspark
> and pandas read table you should be able to accomplish what you want to
> accomplish.
>
> Thank you,
>
> Irving Duran
>
> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>
> Hi,
>
> I have a user case:
>
> I want to download S stock data from Yahoo API in parallel using
> Spark. I have got all stock symbols as a Dataset. Then I used below code 
> to
> call Yahoo API for each symbol:
>
>
>
> case class Symbol(symbol: String, sector: String)
>
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
>
>
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
> Dataset[Tick]
>
>
> symbolDs.map { k =>
>
>   pullSymbolFromYahoo(k.symbol, k.sector)
>
> }
>
>
> This statement cannot compile:
>
>
> Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will
> be added in future releases.
>
>
> My questions are:
>
>
> 1. As you can see, this scenario is not traditional dataset handling
> such as count, sql query... Instead, it is more like a UDF which apply
> random operation on each record. Is Spark good at handling such scenario?
>
>
> 2. Regarding the compilation error, any fix? I did not find a
> satisfactory solution online.
>
>
> Thanks for help!
>
>
>
>
>
>


 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>


Re: Can spark handle this scenario?

2018-02-16 Thread Holden Karau
I'm not sure what you mean by it could be hard to serialize complex
operations?

Regardless I think the question is do you want to parallelize this on
multiple machines or just one?

On Feb 17, 2018 4:20 PM, "Lian Jiang"  wrote:

> Thanks Ayan. RDD may support map better than Dataset/DataFrame. However,
> it could be hard to serialize complex operation for Spark to execute in
> parallel. IMHO, spark does not fit this scenario. Hope this makes sense.
>
> On Fri, Feb 16, 2018 at 8:58 PM, ayan guha  wrote:
>
>> ** You do NOT need dataframes, I mean.
>>
>> On Sat, Feb 17, 2018 at 3:58 PM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> Couple of suggestions:
>>>
>>> 1. Do not use Dataset, use Dataframe in this scenario. There is no
>>> benefit of dataset features here. Using Dataframe, you can write an
>>> arbitrary UDF which can do what you want to do.
>>> 2. In fact you do need dataframes here. You would be better off with RDD
>>> here. just create a RDD of symbols and use map to do the processing.
>>>
>>> On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran 
>>> wrote:
>>>
 Do you only want to use Scala? Because otherwise, I think with pyspark
 and pandas read table you should be able to accomplish what you want to
 accomplish.

 Thank you,

 Irving Duran

 On 02/16/2018 06:10 PM, Lian Jiang wrote:

 Hi,

 I have a user case:

 I want to download S stock data from Yahoo API in parallel using
 Spark. I have got all stock symbols as a Dataset. Then I used below code to
 call Yahoo API for each symbol:



 case class Symbol(symbol: String, sector: String)

 case class Tick(symbol: String, sector: String, open: Double, close:
 Double)


 // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
 Dataset[Tick]


 symbolDs.map { k =>

   pullSymbolFromYahoo(k.symbol, k.sector)

 }


 This statement cannot compile:


 Unable to find encoder for type stored in a Dataset.  Primitive types
 (Int, String, etc) and Product types (case classes) are supported by
 importing spark.implicits._  Support for serializing other types will
 be added in future releases.


 My questions are:


 1. As you can see, this scenario is not traditional dataset handling
 such as count, sql query... Instead, it is more like a UDF which apply
 random operation on each record. Is Spark good at handling such scenario?


 2. Regarding the compilation error, any fix? I did not find a
 satisfactory solution online.


 Thanks for help!






>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Can spark handle this scenario?

2018-02-16 Thread Lian Jiang
Thanks Ayan. RDD may support map better than Dataset/DataFrame. However, it
could be hard to serialize complex operation for Spark to execute in
parallel. IMHO, spark does not fit this scenario. Hope this makes sense.

On Fri, Feb 16, 2018 at 8:58 PM, ayan guha  wrote:

> ** You do NOT need dataframes, I mean.
>
> On Sat, Feb 17, 2018 at 3:58 PM, ayan guha  wrote:
>
>> Hi
>>
>> Couple of suggestions:
>>
>> 1. Do not use Dataset, use Dataframe in this scenario. There is no
>> benefit of dataset features here. Using Dataframe, you can write an
>> arbitrary UDF which can do what you want to do.
>> 2. In fact you do need dataframes here. You would be better off with RDD
>> here. just create a RDD of symbols and use map to do the processing.
>>
>> On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran 
>> wrote:
>>
>>> Do you only want to use Scala? Because otherwise, I think with pyspark
>>> and pandas read table you should be able to accomplish what you want to
>>> accomplish.
>>>
>>> Thank you,
>>>
>>> Irving Duran
>>>
>>> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>>>
>>> Hi,
>>>
>>> I have a user case:
>>>
>>> I want to download S stock data from Yahoo API in parallel using
>>> Spark. I have got all stock symbols as a Dataset. Then I used below code to
>>> call Yahoo API for each symbol:
>>>
>>>
>>>
>>> case class Symbol(symbol: String, sector: String)
>>>
>>> case class Tick(symbol: String, sector: String, open: Double, close:
>>> Double)
>>>
>>>
>>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
>>> Dataset[Tick]
>>>
>>>
>>> symbolDs.map { k =>
>>>
>>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>>
>>> }
>>>
>>>
>>> This statement cannot compile:
>>>
>>>
>>> Unable to find encoder for type stored in a Dataset.  Primitive types
>>> (Int, String, etc) and Product types (case classes) are supported by
>>> importing spark.implicits._  Support for serializing other types will
>>> be added in future releases.
>>>
>>>
>>> My questions are:
>>>
>>>
>>> 1. As you can see, this scenario is not traditional dataset handling
>>> such as count, sql query... Instead, it is more like a UDF which apply
>>> random operation on each record. Is Spark good at handling such scenario?
>>>
>>>
>>> 2. Regarding the compilation error, any fix? I did not find a
>>> satisfactory solution online.
>>>
>>>
>>> Thanks for help!
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Can spark handle this scenario?

2018-02-16 Thread ayan guha
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit
of dataset features here. Using Dataframe, you can write an arbitrary UDF
which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD
here. just create a RDD of symbols and use map to do the processing.

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran 
wrote:

> Do you only want to use Scala? Because otherwise, I think with pyspark and
> pandas read table you should be able to accomplish what you want to
> accomplish.
>
> Thank you,
>
> Irving Duran
>
> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>
> Hi,
>
> I have a user case:
>
> I want to download S stock data from Yahoo API in parallel using
> Spark. I have got all stock symbols as a Dataset. Then I used below code to
> call Yahoo API for each symbol:
>
>
>
> case class Symbol(symbol: String, sector: String)
>
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
>
>
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>
>
> symbolDs.map { k =>
>
>   pullSymbolFromYahoo(k.symbol, k.sector)
>
> }
>
>
> This statement cannot compile:
>
>
> Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.
>
>
> My questions are:
>
>
> 1. As you can see, this scenario is not traditional dataset handling such
> as count, sql query... Instead, it is more like a UDF which apply random
> operation on each record. Is Spark good at handling such scenario?
>
>
> 2. Regarding the compilation error, any fix? I did not find a satisfactory
> solution online.
>
>
> Thanks for help!
>
>
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Can spark handle this scenario?

2018-02-16 Thread ayan guha
** You do NOT need dataframes, I mean.

On Sat, Feb 17, 2018 at 3:58 PM, ayan guha  wrote:

> Hi
>
> Couple of suggestions:
>
> 1. Do not use Dataset, use Dataframe in this scenario. There is no benefit
> of dataset features here. Using Dataframe, you can write an arbitrary UDF
> which can do what you want to do.
> 2. In fact you do need dataframes here. You would be better off with RDD
> here. just create a RDD of symbols and use map to do the processing.
>
> On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran 
> wrote:
>
>> Do you only want to use Scala? Because otherwise, I think with pyspark
>> and pandas read table you should be able to accomplish what you want to
>> accomplish.
>>
>> Thank you,
>>
>> Irving Duran
>>
>> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>>
>> Hi,
>>
>> I have a user case:
>>
>> I want to download S stock data from Yahoo API in parallel using
>> Spark. I have got all stock symbols as a Dataset. Then I used below code to
>> call Yahoo API for each symbol:
>>
>>
>>
>> case class Symbol(symbol: String, sector: String)
>>
>> case class Tick(symbol: String, sector: String, open: Double, close:
>> Double)
>>
>>
>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>>
>>
>> symbolDs.map { k =>
>>
>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>
>> }
>>
>>
>> This statement cannot compile:
>>
>>
>> Unable to find encoder for type stored in a Dataset.  Primitive types
>> (Int, String, etc) and Product types (case classes) are supported by
>> importing spark.implicits._  Support for serializing other types will be
>> added in future releases.
>>
>>
>> My questions are:
>>
>>
>> 1. As you can see, this scenario is not traditional dataset handling such
>> as count, sql query... Instead, it is more like a UDF which apply random
>> operation on each record. Is Spark good at handling such scenario?
>>
>>
>> 2. Regarding the compilation error, any fix? I did not find a
>> satisfactory solution online.
>>
>>
>> Thanks for help!
>>
>>
>>
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


Re: Can spark handle this scenario?

2018-02-16 Thread Irving Duran
Do you only want to use Scala? Because otherwise, I think with pyspark
and pandas read table you should be able to accomplish what you want to
accomplish.

Thank you,

Irving Duran

On 02/16/2018 06:10 PM, Lian Jiang wrote:
> Hi,
>
> I have a user case:
>
> I want to download S stock data from Yahoo API in parallel using
> Spark. I have got all stock symbols as a Dataset. Then I used below
> code to call Yahoo API for each symbol:
>
>        
>
> case class Symbol(symbol: String, sector: String)
>
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
>
>
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>
>
>     symbolDs.map { k =>
>
>       pullSymbolFromYahoo(k.symbol, k.sector)
>
>     }
>
>
> This statement cannot compile:
>
>
> Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will
> be added in future releases.
>
>
>
> My questions are:
>
>
> 1. As you can see, this scenario is not traditional dataset handling
> such as count, sql query... Instead, it is more like a UDF which apply
> random operation on each record. Is Spark good at handling such scenario?
>
>
> 2. Regarding the compilation error, any fix? I did not find a
> satisfactory solution online.
>
>
> Thanks for help!
>
>
>
>



signature.asc
Description: OpenPGP digital signature


Can spark handle this scenario?

2018-02-16 Thread Lian Jiang
Hi,

I have a user case:

I want to download S stock data from Yahoo API in parallel using
Spark. I have got all stock symbols as a Dataset. Then I used below code to
call Yahoo API for each symbol:



case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


symbolDs.map { k =>

  pullSymbolFromYahoo(k.symbol, k.sector)

}


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int,
String, etc) and Product types (case classes) are supported by importing
spark.implicits._  Support for serializing other types will be added in
future releases.


My questions are:


1. As you can see, this scenario is not traditional dataset handling such
as count, sql query... Instead, it is more like a UDF which apply random
operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory
solution online.


Thanks for help!