Re: confirm subscribe to user@spark.apache.org

2016-11-26 Thread Arthur Țițeică


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: type-safe join in the new DataSet API?

2016-11-26 Thread Koert Kuipers
although this is correct, KeyValueGroupedDataset.coGroup requires one to
implement their own join logic with Iterator functions. its fun to do that,
and i appreciate the flexibility it gives, but i would not consider it a
good solution for someone that just wants to do a typed join

On Thu, Nov 10, 2016 at 2:18 PM, Michael Armbrust 
wrote:

> You can groupByKey and then cogroup.
>
> On Thu, Nov 10, 2016 at 10:44 AM, Yang  wrote:
>
>> the new DataSet API is supposed to provide type safety and type checks at
>> compile time https://spark.apache.org/docs/latest/structured-streami
>> ng-programming-guide.html#join-operations
>>
>> It does this indeed for a lot of places, but I found it still doesn't
>> have a type safe join:
>>
>> val ds1 = hc.sql("select col1, col2 from mytable")
>>
>> val ds2 = hc.sql("select col3 , col4 from mytable2")
>>
>> val ds3 = ds1.joinWith(ds2, ds1.col("col1") === ds2.col("col2"))
>>
>> here spark has no way to make sure (at compile time) that the two columns
>> being joined together
>> , "col1" and "col2" are of matching types. This is contrast to rdd join,
>> where it would be detected at compile time.
>>
>> am I missing something?
>>
>> thanks
>>
>>
>


Re: Third party library

2016-11-26 Thread kant kodali
I would say instead of LD_LIBRARY_PATH you might want to use java.library.
path

in the following way

java -Djava.library.path=/path/to/my/library or pass java.library.path
along with spark-submit

On Sat, Nov 26, 2016 at 6:44 PM, Gmail  wrote:

> Maybe you've already checked these out. Some basic questions that come to
> my mind are:
> 1) is this library "foolib" or "foo-C-library" available on the worker
> node?
> 2) if yes, is it accessible by the user/program (rwx)?
>
> Thanks,
> Vasu.
>
> On Nov 26, 2016, at 5:08 PM, kant kodali  wrote:
>
> If it is working for standalone program I would think you can apply the
> same settings across all the spark worker  and client machines and give
> that a try. Lets start with that.
>
> On Sat, Nov 26, 2016 at 11:59 AM, vineet chadha 
> wrote:
>
>> Just subscribed to  Spark User.  So, forwarding message again.
>>
>> On Sat, Nov 26, 2016 at 11:50 AM, vineet chadha 
>> wrote:
>>
>>> Thanks Kant. Can you give me a sample program which allows me to call
>>> jni from executor task ?   I have jni working in standalone program in
>>> scala/java.
>>>
>>> Regards,
>>> Vineet
>>>
>>> On Sat, Nov 26, 2016 at 11:43 AM, kant kodali 
>>> wrote:
>>>
 Yes this is a Java JNI question. Nothing to do with Spark really.

  java.lang.UnsatisfiedLinkError typically would mean the way you setup 
 LD_LIBRARY_PATH
 is wrong unless you tell us that it is working for other cases but not this
 one.

 On Sat, Nov 26, 2016 at 11:23 AM, Reynold Xin 
 wrote:

> That's just standard JNI and has nothing to do with Spark, does it?
>
>
> On Sat, Nov 26, 2016 at 11:19 AM, vineet chadha <
> start.vin...@gmail.com> wrote:
>
>> Thanks Reynold for quick reply.
>>
>>  I have tried following:
>>
>> class MySimpleApp {
>>  // ---Native methods
>>   @native def fooMethod (foo: String): String
>> }
>>
>> object MySimpleApp {
>>   val flag = false
>>   def loadResources() {
>> System.loadLibrary("foo-C-library")
>>   val flag = true
>>   }
>>   def main() {
>> sc.parallelize(1 to 10).mapPartitions ( iter => {
>>   if(flag == false){
>>   MySimpleApp.loadResources()
>>  val SimpleInstance = new MySimpleApp
>>   }
>>   SimpleInstance.fooMethod ("fooString")
>>   iter
>> })
>>   }
>> }
>>
>> I don't see way to invoke fooMethod which is implemented in
>> foo-C-library. Is I am missing something ? If possible, can you point me 
>> to
>> existing implementation which i can refer to.
>>
>> Thanks again.
>>
>> ~
>>
>> On Fri, Nov 25, 2016 at 3:32 PM, Reynold Xin 
>> wrote:
>>
>>> bcc dev@ and add user@
>>>
>>>
>>> This is more a user@ list question rather than a dev@ list
>>> question. You can do something like this:
>>>
>>> object MySimpleApp {
>>>   def loadResources(): Unit = // define some idempotent way to load
>>> resources, e.g. with a flag or lazy val
>>>
>>>   def main() = {
>>> ...
>>>
>>> sc.parallelize(1 to 10).mapPartitions { iter =>
>>>   MySimpleApp.loadResources()
>>>
>>>   // do whatever you want with the iterator
>>> }
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha <
>>> start.vin...@gmail.com> wrote:
>>>
 Hi,

 I am trying to invoke C library from the Spark Stack using JNI
 interface (here is sample  application code)


 class SimpleApp {
  // ---Native methods
 @native def foo (Top: String): String
 }

 object SimpleApp  {
def main(args: Array[String]) {

 val conf = new SparkConf().setAppName("Simple
 Application").set("SPARK_LIBRARY_PATH", "lib")
 val sc = new SparkContext(conf)
  System.loadLibrary("foolib")
 //instantiate the class
  val SimpleAppInstance = new SimpleApp
 //String passing - Working
 val ret = SimpleAppInstance.foo("fooString")
   }

 Above code work fines.

 I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath,
 spark.executor.extraLibraryPath at worker node

 How can i invoke JNI library from worker node ? Where should i load
 it in executor ?
 Calling  System.loadLibrary("foolib") inside the work node gives
 me following error :

 Exception in thread "main" java.lang.UnsatisfiedLinkError:

 Any help would be really appreciated.







Re: Third party library

2016-11-26 Thread Gmail
Maybe you've already checked these out. Some basic questions that come to my 
mind are:
1) is this library "foolib" or "foo-C-library" available on the worker node?
2) if yes, is it accessible by the user/program (rwx)?

Thanks,
Vasu. 

> On Nov 26, 2016, at 5:08 PM, kant kodali  wrote:
> 
> If it is working for standalone program I would think you can apply the same 
> settings across all the spark worker  and client machines and give that a 
> try. Lets start with that.
> 
>> On Sat, Nov 26, 2016 at 11:59 AM, vineet chadha  
>> wrote:
>> Just subscribed to  Spark User.  So, forwarding message again.
>> 
>>> On Sat, Nov 26, 2016 at 11:50 AM, vineet chadha  
>>> wrote:
>>> Thanks Kant. Can you give me a sample program which allows me to call jni 
>>> from executor task ?   I have jni working in standalone program in 
>>> scala/java. 
>>> 
>>> Regards,
>>> Vineet
>>> 
 On Sat, Nov 26, 2016 at 11:43 AM, kant kodali  wrote:
 Yes this is a Java JNI question. Nothing to do with Spark really.
 
  java.lang.UnsatisfiedLinkError typically would mean the way you setup 
 LD_LIBRARY_PATH is wrong unless you tell us that it is working for other 
 cases but not this one.
 
> On Sat, Nov 26, 2016 at 11:23 AM, Reynold Xin  wrote:
 
> That's just standard JNI and has nothing to do with Spark, does it?
> 
> 
>> On Sat, Nov 26, 2016 at 11:19 AM, vineet chadha  
>> wrote:
>> Thanks Reynold for quick reply.
>> 
>>  I have tried following: 
>> 
>> class MySimpleApp {
>>  // ---Native methods
>>   @native def fooMethod (foo: String): String
>> }
>> 
>> object MySimpleApp {
>>   val flag = false
>>   def loadResources() {
>>   System.loadLibrary("foo-C-library")
>>   val flag = true
>>   }
>>   def main() {
>> sc.parallelize(1 to 10).mapPartitions ( iter => {
>>   if(flag == false){
>>  MySimpleApp.loadResources()
>>val SimpleInstance = new MySimpleApp
>>   }
>>   SimpleInstance.fooMethod ("fooString") 
>>   iter
>> })
>>   }
>> }
>> 
>> I don't see way to invoke fooMethod which is implemented in 
>> foo-C-library. Is I am missing something ? If possible, can you point me 
>> to existing implementation which i can refer to.
>> 
>> Thanks again. 
>> ~
>> 
>> 
>>> On Fri, Nov 25, 2016 at 3:32 PM, Reynold Xin  
>>> wrote:
>>> bcc dev@ and add user@
>>> 
>>> 
>>> This is more a user@ list question rather than a dev@ list question. 
>>> You can do something like this:
>>> 
>>> object MySimpleApp {
>>>   def loadResources(): Unit = // define some idempotent way to load 
>>> resources, e.g. with a flag or lazy val
>>> 
>>>   def main() = {
>>> ...
>>>
>>> sc.parallelize(1 to 10).mapPartitions { iter =>
>>>   MySimpleApp.loadResources()
>>>   
>>>   // do whatever you want with the iterator
>>> }
>>>   }
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
 On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha 
  wrote:
 Hi,
 
 I am trying to invoke C library from the Spark Stack using JNI 
 interface (here is sample  application code)
 
 
 class SimpleApp {
  // ---Native methods
 @native def foo (Top: String): String
 }
 
 object SimpleApp  {
def main(args: Array[String]) {
  
 val conf = new 
 SparkConf().setAppName("SimpleApplication").set("SPARK_LIBRARY_PATH", 
 "lib")
 val sc = new SparkContext(conf)
  System.loadLibrary("foolib")
 //instantiate the class
  val SimpleAppInstance = new SimpleApp
 //String passing - Working
 val ret = SimpleAppInstance.foo("fooString")
   }
 
 Above code work fines. 
 
 I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath,  
 spark.executor.extraLibraryPath at worker node
 
 How can i invoke JNI library from worker node ? Where should i load it 
 in executor ?
 Calling  System.loadLibrary("foolib") inside the work node gives me 
 following error :
 
 Exception in thread "main" java.lang.UnsatisfiedLinkError: 
 Any help would be really appreciated.
 
 
 
 
 
 
 
 
 
 
 
 
 
>>> 
>> 
> 
 
>>> 
>> 
> 


Re: Third party library

2016-11-26 Thread kant kodali
If it is working for standalone program I would think you can apply the
same settings across all the spark worker  and client machines and give
that a try. Lets start with that.

On Sat, Nov 26, 2016 at 11:59 AM, vineet chadha 
wrote:

> Just subscribed to  Spark User.  So, forwarding message again.
>
> On Sat, Nov 26, 2016 at 11:50 AM, vineet chadha 
> wrote:
>
>> Thanks Kant. Can you give me a sample program which allows me to call jni
>> from executor task ?   I have jni working in standalone program in
>> scala/java.
>>
>> Regards,
>> Vineet
>>
>> On Sat, Nov 26, 2016 at 11:43 AM, kant kodali  wrote:
>>
>>> Yes this is a Java JNI question. Nothing to do with Spark really.
>>>
>>>  java.lang.UnsatisfiedLinkError typically would mean the way you setup 
>>> LD_LIBRARY_PATH
>>> is wrong unless you tell us that it is working for other cases but not this
>>> one.
>>>
>>> On Sat, Nov 26, 2016 at 11:23 AM, Reynold Xin 
>>> wrote:
>>>
 That's just standard JNI and has nothing to do with Spark, does it?


 On Sat, Nov 26, 2016 at 11:19 AM, vineet chadha  wrote:

> Thanks Reynold for quick reply.
>
>  I have tried following:
>
> class MySimpleApp {
>  // ---Native methods
>   @native def fooMethod (foo: String): String
> }
>
> object MySimpleApp {
>   val flag = false
>   def loadResources() {
> System.loadLibrary("foo-C-library")
>   val flag = true
>   }
>   def main() {
> sc.parallelize(1 to 10).mapPartitions ( iter => {
>   if(flag == false){
>   MySimpleApp.loadResources()
>  val SimpleInstance = new MySimpleApp
>   }
>   SimpleInstance.fooMethod ("fooString")
>   iter
> })
>   }
> }
>
> I don't see way to invoke fooMethod which is implemented in
> foo-C-library. Is I am missing something ? If possible, can you point me 
> to
> existing implementation which i can refer to.
>
> Thanks again.
>
> ~
>
> On Fri, Nov 25, 2016 at 3:32 PM, Reynold Xin 
> wrote:
>
>> bcc dev@ and add user@
>>
>>
>> This is more a user@ list question rather than a dev@ list question.
>> You can do something like this:
>>
>> object MySimpleApp {
>>   def loadResources(): Unit = // define some idempotent way to load
>> resources, e.g. with a flag or lazy val
>>
>>   def main() = {
>> ...
>>
>> sc.parallelize(1 to 10).mapPartitions { iter =>
>>   MySimpleApp.loadResources()
>>
>>   // do whatever you want with the iterator
>> }
>>   }
>> }
>>
>>
>>
>>
>>
>> On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha <
>> start.vin...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to invoke C library from the Spark Stack using JNI
>>> interface (here is sample  application code)
>>>
>>>
>>> class SimpleApp {
>>>  // ---Native methods
>>> @native def foo (Top: String): String
>>> }
>>>
>>> object SimpleApp  {
>>>def main(args: Array[String]) {
>>>
>>> val conf = new SparkConf().setAppName("Simple
>>> Application").set("SPARK_LIBRARY_PATH", "lib")
>>> val sc = new SparkContext(conf)
>>>  System.loadLibrary("foolib")
>>> //instantiate the class
>>>  val SimpleAppInstance = new SimpleApp
>>> //String passing - Working
>>> val ret = SimpleAppInstance.foo("fooString")
>>>   }
>>>
>>> Above code work fines.
>>>
>>> I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath,
>>> spark.executor.extraLibraryPath at worker node
>>>
>>> How can i invoke JNI library from worker node ? Where should i load
>>> it in executor ?
>>> Calling  System.loadLibrary("foolib") inside the work node gives me
>>> following error :
>>>
>>> Exception in thread "main" java.lang.UnsatisfiedLinkError:
>>>
>>> Any help would be really appreciated.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

>>>
>>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Anton Okolnychyi
Hi guys,

I also experienced a situation when Spark 1.6.2 ignored my hint to do a
broadcast join (i.e. broadcast(df)) with a small dataset. However, this
happened only in 1 of 3 cases. Setting the
"spark.sql.autoBroadcastJoinThreshold" property did not have any impact as
well. All 3 cases work fine in Spark 2.0.

Is there any chance that Spark can neglect manually specified broadcast
operation? In other words, is Spark forced to perform a broadcast if one
specifies "df1.join(broadcast(df2), ...)"?

Best regards,
Anton



2016-11-26 21:04 GMT+01:00 Swapnil Shinde :

> I am using Spark 1.6.3 and below is the real plan (a,b,c in above were
> just for illustration purpose)
>
> == Physical Plan ==
> Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN
> mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
> +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801],
> [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
>:- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
>:  +- TungstenExchange 
> hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200),
> None
>: +- Project [_1#3797 AS ltt#3800,_2#3798 AS
> mr_demo_id#3801,_3#3799 AS mr_demoname#3802]
>:+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
>+- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
>   +- TungstenExchange 
> hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200),
> None
>  +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
> +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804
> AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806
> AS etv_demo_id#3813]
>+- Filter ((map_type#3809 = master_roster_to_etv) && NOT
> (demogroup#3803 = gender_age_id))
>   +- Scan ExistingRDD[demogroup#3803,
> demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,
> demovalue_old_map#3808,map_type#3809]
>
>
> Thanks
> Swapnil
>
> On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang  wrote:
>
>> Could you post the result of explain `c.explain`? If it is broadcast
>> join, you will see it in explain.
>>
>> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde <
>> swapnilushi...@gmail.com> wrote:
>>
>>> Hello
>>> I am trying a broadcast join on dataframes but it is still doing
>>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
>>> higher but still no luck.
>>>
>>> Related piece of code-
>>>   val c = a.join(braodcast(b), "id")
>>>
>>> On a side note, if I do SizeEstimator.estimate(b) and it is really
>>> high(460956584 bytes) compared to data it contains. b has just 85 rows and
>>> around 4964 bytes.
>>> Help is very much appreciated!!
>>>
>>> Thanks
>>> Swapnil
>>>
>>>
>>>
>>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Benyi Wang
I think your dataframes are converted from RDDs, Are those RDDs computed or
read from files directly? I guess it might affect how spark compute the
execution plan.

Try this: save your data frame which will be broadcasted to HDFS, and read
it back into a dataframe. Then do the join and check the explain plan.

On Sat, Nov 26, 2016 at 12:04 PM, Swapnil Shinde 
wrote:

> I am using Spark 1.6.3 and below is the real plan (a,b,c in above were
> just for illustration purpose)
>
> == Physical Plan ==
> Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN
> mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
> +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801],
> [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
>:- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
>:  +- TungstenExchange 
> hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200),
> None
>: +- Project [_1#3797 AS ltt#3800,_2#3798 AS
> mr_demo_id#3801,_3#3799 AS mr_demoname#3802]
>:+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
>+- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
>   +- TungstenExchange 
> hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200),
> None
>  +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
> +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804
> AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806
> AS etv_demo_id#3813]
>+- Filter ((map_type#3809 = master_roster_to_etv) && NOT
> (demogroup#3803 = gender_age_id))
>   +- Scan ExistingRDD[demogroup#3803,
> demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,
> demovalue_old_map#3808,map_type#3809]
>
>
> Thanks
> Swapnil
>
> On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang  wrote:
>
>> Could you post the result of explain `c.explain`? If it is broadcast
>> join, you will see it in explain.
>>
>> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde <
>> swapnilushi...@gmail.com> wrote:
>>
>>> Hello
>>> I am trying a broadcast join on dataframes but it is still doing
>>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
>>> higher but still no luck.
>>>
>>> Related piece of code-
>>>   val c = a.join(braodcast(b), "id")
>>>
>>> On a side note, if I do SizeEstimator.estimate(b) and it is really
>>> high(460956584 bytes) compared to data it contains. b has just 85 rows and
>>> around 4964 bytes.
>>> Help is very much appreciated!!
>>>
>>> Thanks
>>> Swapnil
>>>
>>>
>>>
>>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Swapnil Shinde
I am using Spark 1.6.3 and below is the real plan (a,b,c in above were just
for illustration purpose)

== Physical Plan ==
Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN
mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
+- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801],
[mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
   :- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
   :  +- TungstenExchange
hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), None
   : +- Project [_1#3797 AS ltt#3800,_2#3798 AS mr_demo_id#3801,_3#3799
AS mr_demoname#3802]
   :+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
   +- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
  +- TungstenExchange
hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), None
 +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
+- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804
AS mr_demo_id#3811,demoname#3805 AS
mr_demo_value#3812,demovalue_etv_map#3806 AS etv_demo_id#3813]
   +- Filter ((map_type#3809 = master_roster_to_etv) && NOT
(demogroup#3803 = gender_age_id))
  +- Scan
ExistingRDD[demogroup#3803,demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,demovalue_old_map#3808,map_type#3809]


Thanks
Swapnil

On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang  wrote:

> Could you post the result of explain `c.explain`? If it is broadcast join,
> you will see it in explain.
>
> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde  > wrote:
>
>> Hello
>> I am trying a broadcast join on dataframes but it is still doing
>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
>> higher but still no luck.
>>
>> Related piece of code-
>>   val c = a.join(braodcast(b), "id")
>>
>> On a side note, if I do SizeEstimator.estimate(b) and it is really
>> high(460956584 bytes) compared to data it contains. b has just 85 rows and
>> around 4964 bytes.
>> Help is very much appreciated!!
>>
>> Thanks
>> Swapnil
>>
>>
>>
>


Re: Third party library

2016-11-26 Thread vineet chadha
Just subscribed to  Spark User.  So, forwarding message again.

On Sat, Nov 26, 2016 at 11:50 AM, vineet chadha 
wrote:

> Thanks Kant. Can you give me a sample program which allows me to call jni
> from executor task ?   I have jni working in standalone program in
> scala/java.
>
> Regards,
> Vineet
>
> On Sat, Nov 26, 2016 at 11:43 AM, kant kodali  wrote:
>
>> Yes this is a Java JNI question. Nothing to do with Spark really.
>>
>>  java.lang.UnsatisfiedLinkError typically would mean the way you setup 
>> LD_LIBRARY_PATH
>> is wrong unless you tell us that it is working for other cases but not this
>> one.
>>
>> On Sat, Nov 26, 2016 at 11:23 AM, Reynold Xin 
>> wrote:
>>
>>> That's just standard JNI and has nothing to do with Spark, does it?
>>>
>>>
>>> On Sat, Nov 26, 2016 at 11:19 AM, vineet chadha 
>>> wrote:
>>>
 Thanks Reynold for quick reply.

  I have tried following:

 class MySimpleApp {
  // ---Native methods
   @native def fooMethod (foo: String): String
 }

 object MySimpleApp {
   val flag = false
   def loadResources() {
 System.loadLibrary("foo-C-library")
   val flag = true
   }
   def main() {
 sc.parallelize(1 to 10).mapPartitions ( iter => {
   if(flag == false){
   MySimpleApp.loadResources()
  val SimpleInstance = new MySimpleApp
   }
   SimpleInstance.fooMethod ("fooString")
   iter
 })
   }
 }

 I don't see way to invoke fooMethod which is implemented in
 foo-C-library. Is I am missing something ? If possible, can you point me to
 existing implementation which i can refer to.

 Thanks again.

 ~

 On Fri, Nov 25, 2016 at 3:32 PM, Reynold Xin 
 wrote:

> bcc dev@ and add user@
>
>
> This is more a user@ list question rather than a dev@ list question.
> You can do something like this:
>
> object MySimpleApp {
>   def loadResources(): Unit = // define some idempotent way to load
> resources, e.g. with a flag or lazy val
>
>   def main() = {
> ...
>
> sc.parallelize(1 to 10).mapPartitions { iter =>
>   MySimpleApp.loadResources()
>
>   // do whatever you want with the iterator
> }
>   }
> }
>
>
>
>
>
> On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha  > wrote:
>
>> Hi,
>>
>> I am trying to invoke C library from the Spark Stack using JNI
>> interface (here is sample  application code)
>>
>>
>> class SimpleApp {
>>  // ---Native methods
>> @native def foo (Top: String): String
>> }
>>
>> object SimpleApp  {
>>def main(args: Array[String]) {
>>
>> val conf = new SparkConf().setAppName("Simple
>> Application").set("SPARK_LIBRARY_PATH", "lib")
>> val sc = new SparkContext(conf)
>>  System.loadLibrary("foolib")
>> //instantiate the class
>>  val SimpleAppInstance = new SimpleApp
>> //String passing - Working
>> val ret = SimpleAppInstance.foo("fooString")
>>   }
>>
>> Above code work fines.
>>
>> I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath,
>> spark.executor.extraLibraryPath at worker node
>>
>> How can i invoke JNI library from worker node ? Where should i load
>> it in executor ?
>> Calling  System.loadLibrary("foolib") inside the work node gives me
>> following error :
>>
>> Exception in thread "main" java.lang.UnsatisfiedLinkError:
>>
>> Any help would be really appreciated.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

>>>
>>
>


Re: Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading multiple partitions in parallel.

2016-11-26 Thread kant kodali
https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/sql/DataFrameReader.html#json(org.apache.spark.rdd.RDD)

You can pass a rdd to spark.read.json. // Spark here is SparkSession

Also it works completely fine with smaller dataset in a table but with 1B
records it takes forever and more importantly the network throughput is
only 2.2 KB/s which is too low. It should be somewhere in MB/s

On Sat, Nov 26, 2016 at 10:09 AM, Anastasios Zouzias 
wrote:

> Hi there,
>
> spark.read.json usually takes a filesystem path (usually HDFS) where there
> is a file containing JSON per new line. See also
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html
>
> Hence, in your case
>
> val df4 = spark.read.json(rdd) // This line takes forever
>
> seems wrong. I guess you might want to first store rdd as a text file on
> HDFS and then read it using spark.read.json .
>
> Cheers,
> Anastasios
>
>
>
> On Sat, Nov 26, 2016 at 9:34 AM, kant kodali  wrote:
>
>> up vote
>> down votefavorite
>> 
>>
>> Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading
>> multiple partitions in parallel.
>>
>> Here is my code using spark-shell
>>
>> import org.apache.spark.sql._
>> import org.apache.spark.sql.types.StringType
>> spark.sql("""CREATE TEMPORARY VIEW hello USING 
>> org.apache.spark.sql.cassandra OPTIONS (table "hello", keyspace "db", 
>> cluster "Test Cluster", pushdown "true")""")
>> val df = spark.sql("SELECT test from hello")
>> val df2 = df.select(df("test").cast(StringType).as("test"))
>> val rdd = df2.rdd.map { case Row(j: String) => j }
>> val df4 = spark.read.json(rdd) // This line takes forever
>>
>> I have about 700 million rows each row is about 1KB and this line
>>
>> val df4 = spark.read.json(rdd) takes forever as I get the following
>> output after 1hr 30 mins
>>
>> Stage 1:==> (4866 + 2) / 25256]
>>
>> so at this rate it will probably take days.
>>
>> I measured the network throughput rate of spark worker nodes using iftop
>> and it is about 2.2KB/s (kilobytes per second) which is too low so that
>> tells me it not reading partitions in parallel or at very least it is not
>> reading good chunk of data else it would be in MB/s. Any ideas on how to
>> fix it?
>>
>>
>
>
> --
> -- Anastasios Zouzias
> 
>


Re: Third party library

2016-11-26 Thread kant kodali
Yes this is a Java JNI question. Nothing to do with Spark really.

 java.lang.UnsatisfiedLinkError typically would mean the way you setup
LD_LIBRARY_PATH
is wrong unless you tell us that it is working for other cases but not this
one.

On Sat, Nov 26, 2016 at 11:23 AM, Reynold Xin  wrote:

> That's just standard JNI and has nothing to do with Spark, does it?
>
>
> On Sat, Nov 26, 2016 at 11:19 AM, vineet chadha 
> wrote:
>
>> Thanks Reynold for quick reply.
>>
>>  I have tried following:
>>
>> class MySimpleApp {
>>  // ---Native methods
>>   @native def fooMethod (foo: String): String
>> }
>>
>> object MySimpleApp {
>>   val flag = false
>>   def loadResources() {
>> System.loadLibrary("foo-C-library")
>>   val flag = true
>>   }
>>   def main() {
>> sc.parallelize(1 to 10).mapPartitions ( iter => {
>>   if(flag == false){
>>   MySimpleApp.loadResources()
>>  val SimpleInstance = new MySimpleApp
>>   }
>>   SimpleInstance.fooMethod ("fooString")
>>   iter
>> })
>>   }
>> }
>>
>> I don't see way to invoke fooMethod which is implemented in
>> foo-C-library. Is I am missing something ? If possible, can you point me to
>> existing implementation which i can refer to.
>>
>> Thanks again.
>>
>> ~
>>
>> On Fri, Nov 25, 2016 at 3:32 PM, Reynold Xin  wrote:
>>
>>> bcc dev@ and add user@
>>>
>>>
>>> This is more a user@ list question rather than a dev@ list question.
>>> You can do something like this:
>>>
>>> object MySimpleApp {
>>>   def loadResources(): Unit = // define some idempotent way to load
>>> resources, e.g. with a flag or lazy val
>>>
>>>   def main() = {
>>> ...
>>>
>>> sc.parallelize(1 to 10).mapPartitions { iter =>
>>>   MySimpleApp.loadResources()
>>>
>>>   // do whatever you want with the iterator
>>> }
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha 
>>> wrote:
>>>
 Hi,

 I am trying to invoke C library from the Spark Stack using JNI
 interface (here is sample  application code)


 class SimpleApp {
  // ---Native methods
 @native def foo (Top: String): String
 }

 object SimpleApp  {
def main(args: Array[String]) {

 val conf = new SparkConf().setAppName("Simple
 Application").set("SPARK_LIBRARY_PATH", "lib")
 val sc = new SparkContext(conf)
  System.loadLibrary("foolib")
 //instantiate the class
  val SimpleAppInstance = new SimpleApp
 //String passing - Working
 val ret = SimpleAppInstance.foo("fooString")
   }

 Above code work fines.

 I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath,
 spark.executor.extraLibraryPath at worker node

 How can i invoke JNI library from worker node ? Where should i load it
 in executor ?
 Calling  System.loadLibrary("foolib") inside the work node gives me
 following error :

 Exception in thread "main" java.lang.UnsatisfiedLinkError:

 Any help would be really appreciated.













>>>
>>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Benyi Wang
Could you post the result of explain `c.explain`? If it is broadcast join,
you will see it in explain.

On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde 
wrote:

> Hello
> I am trying a broadcast join on dataframes but it is still doing
> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
> higher but still no luck.
>
> Related piece of code-
>   val c = a.join(braodcast(b), "id")
>
> On a side note, if I do SizeEstimator.estimate(b) and it is really
> high(460956584 bytes) compared to data it contains. b has just 85 rows and
> around 4964 bytes.
> Help is very much appreciated!!
>
> Thanks
> Swapnil
>
>
>


Re: Third party library

2016-11-26 Thread Reynold Xin
That's just standard JNI and has nothing to do with Spark, does it?


On Sat, Nov 26, 2016 at 11:19 AM, vineet chadha 
wrote:

> Thanks Reynold for quick reply.
>
>  I have tried following:
>
> class MySimpleApp {
>  // ---Native methods
>   @native def fooMethod (foo: String): String
> }
>
> object MySimpleApp {
>   val flag = false
>   def loadResources() {
> System.loadLibrary("foo-C-library")
>   val flag = true
>   }
>   def main() {
> sc.parallelize(1 to 10).mapPartitions ( iter => {
>   if(flag == false){
>   MySimpleApp.loadResources()
>  val SimpleInstance = new MySimpleApp
>   }
>   SimpleInstance.fooMethod ("fooString")
>   iter
> })
>   }
> }
>
> I don't see way to invoke fooMethod which is implemented in foo-C-library.
> Is I am missing something ? If possible, can you point me to existing
> implementation which i can refer to.
>
> Thanks again.
>
> ~
>
> On Fri, Nov 25, 2016 at 3:32 PM, Reynold Xin  wrote:
>
>> bcc dev@ and add user@
>>
>>
>> This is more a user@ list question rather than a dev@ list question. You
>> can do something like this:
>>
>> object MySimpleApp {
>>   def loadResources(): Unit = // define some idempotent way to load
>> resources, e.g. with a flag or lazy val
>>
>>   def main() = {
>> ...
>>
>> sc.parallelize(1 to 10).mapPartitions { iter =>
>>   MySimpleApp.loadResources()
>>
>>   // do whatever you want with the iterator
>> }
>>   }
>> }
>>
>>
>>
>>
>>
>> On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to invoke C library from the Spark Stack using JNI interface
>>> (here is sample  application code)
>>>
>>>
>>> class SimpleApp {
>>>  // ---Native methods
>>> @native def foo (Top: String): String
>>> }
>>>
>>> object SimpleApp  {
>>>def main(args: Array[String]) {
>>>
>>> val conf = new SparkConf().setAppName("Simple
>>> Application").set("SPARK_LIBRARY_PATH", "lib")
>>> val sc = new SparkContext(conf)
>>>  System.loadLibrary("foolib")
>>> //instantiate the class
>>>  val SimpleAppInstance = new SimpleApp
>>> //String passing - Working
>>> val ret = SimpleAppInstance.foo("fooString")
>>>   }
>>>
>>> Above code work fines.
>>>
>>> I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath,
>>> spark.executor.extraLibraryPath at worker node
>>>
>>> How can i invoke JNI library from worker node ? Where should i load it
>>> in executor ?
>>> Calling  System.loadLibrary("foolib") inside the work node gives me
>>> following error :
>>>
>>> Exception in thread "main" java.lang.UnsatisfiedLinkError:
>>>
>>> Any help would be really appreciated.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Selvam Raman
Hi,

Which version of spark you are using.

Less than 10Mb automatically converted as broadcast join in spark.

\Thanks,
selvam R

On Sat, Nov 26, 2016 at 6:51 PM, Swapnil Shinde 
wrote:

> Hello
> I am trying a broadcast join on dataframes but it is still doing
> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
> higher but still no luck.
>
> Related piece of code-
>   val c = a.join(braodcast(b), "id")
>
> On a side note, if I do SizeEstimator.estimate(b) and it is really
> high(460956584 bytes) compared to data it contains. b has just 85 rows and
> around 4964 bytes.
> Help is very much appreciated!!
>
> Thanks
> Swapnil
>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Dataframe broadcast join hint not working

2016-11-26 Thread Swapnil Shinde
Hello
I am trying a broadcast join on dataframes but it is still doing
SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
higher but still no luck.

Related piece of code-
  val c = a.join(braodcast(b), "id")

On a side note, if I do SizeEstimator.estimate(b) and it is really
high(460956584 bytes) compared to data it contains. b has just 85 rows and
around 4964 bytes.
Help is very much appreciated!!

Thanks
Swapnil


Re: Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading multiple partitions in parallel.

2016-11-26 Thread Anastasios Zouzias
Hi there,

spark.read.json usually takes a filesystem path (usually HDFS) where there
is a file containing JSON per new line. See also

http://spark.apache.org/docs/latest/sql-programming-guide.html

Hence, in your case

val df4 = spark.read.json(rdd) // This line takes forever

seems wrong. I guess you might want to first store rdd as a text file on
HDFS and then read it using spark.read.json .

Cheers,
Anastasios



On Sat, Nov 26, 2016 at 9:34 AM, kant kodali  wrote:

> up vote
> down votefavorite
> 
>
> Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading
> multiple partitions in parallel.
>
> Here is my code using spark-shell
>
> import org.apache.spark.sql._
> import org.apache.spark.sql.types.StringType
> spark.sql("""CREATE TEMPORARY VIEW hello USING org.apache.spark.sql.cassandra 
> OPTIONS (table "hello", keyspace "db", cluster "Test Cluster", pushdown 
> "true")""")
> val df = spark.sql("SELECT test from hello")
> val df2 = df.select(df("test").cast(StringType).as("test"))
> val rdd = df2.rdd.map { case Row(j: String) => j }
> val df4 = spark.read.json(rdd) // This line takes forever
>
> I have about 700 million rows each row is about 1KB and this line
>
> val df4 = spark.read.json(rdd) takes forever as I get the following
> output after 1hr 30 mins
>
> Stage 1:==> (4866 + 2) / 25256]
>
> so at this rate it will probably take days.
>
> I measured the network throughput rate of spark worker nodes using iftop
> and it is about 2.2KB/s (kilobytes per second) which is too low so that
> tells me it not reading partitions in parallel or at very least it is not
> reading good chunk of data else it would be in MB/s. Any ideas on how to
> fix it?
>
>


-- 
-- Anastasios Zouzias



Re: UDF for gradient ascent

2016-11-26 Thread Meeraj Kunnumpurath
One thing I noticed inside the UDF is that original column names from the
data frame have disappeared and the columns are called col1, col2 etc.

Regards
Meeraj

On Sat, Nov 26, 2016 at 7:31 PM, Meeraj Kunnumpurath <
mee...@servicesymphony.com> wrote:

> Hello,
>
> I have a dataset of features on which I want to compute the likelihood
> value for implementing gradient ascent for estimating coefficients. I have
> written a UDF that compute the probability function on each feature as
> shown below.
>
> def getLikelihood(cfs : List[(String, Double)], df: DataFrame) = {
>   val pr = udf((r: Row) => {
> cfs.foldLeft(0.0)((x, y) => x * 1 / Math.pow(Math.E, 
> r.getAs[Double](y._1) * y._2))
>   })
>   df.withColumn("probabibility", pr(struct(df.columns.map(df(_)) : 
> _*))).agg(sum('probabibility)).first.get(0)
> }
>
> When I run it I get a long exception trace listing some generated code, as
> shown below.
>
> org.codehaus.commons.compiler.CompileException: File 'generated.java',
> Line 2445, Column 34: Expression "scan_isNull1" is not an rvalue
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10174)
> at org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(
> UnitCompiler.java:6036)
> at org.codehaus.janino.UnitCompiler.getConstantValue2(
> UnitCompiler.java:4440)
> at org.codehaus.janino.UnitCompiler.access$9900(UnitCompiler.java:185)
> at org.codehaus.janino.UnitCompiler$11.visitAmbiguousName(
> UnitCompiler.java:4417)
>
> This is line 2445 in the generated code,
>
> /* 2445 */ Object project_arg = scan_isNull1 ? null :
> project_converter2.apply(scan_value1);
>
> Many thanks
>
>
>
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 0169mee...@servicesymphony.com *
>



-- 
*Meeraj Kunnumpurath*


*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*

*00 971 50 409 0169mee...@servicesymphony.com *


Re: OS killing Executor due to high (possibly off heap) memory usage

2016-11-26 Thread Koert Kuipers
i agree that offheap memory usage is unpredictable.

when we used rdds the memory was mostly on heap and total usage
predictable, and we almost never had yarn killing executors.

now with dataframes the memory usage is both on and off heap, and we have
no way of limiting the off heap memory usage by spark, yet yarn requires a
maximum total memory usage and if you go over it yarn kills the executor.

On Fri, Nov 25, 2016 at 12:14 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Thanks Rohit, Roddick and Shreya. I tried changing 
> spark.yarn.executor.memoryOverhead
> to be 10GB and lowering executor memory to 30 GB and both of these didn't
> work. I finally had to reduce the number of cores per executor to be 18
> (from 36) in addition to setting higher spark.yarn.executor.memoryOverhead
> and lower executor memory size. I had to trade off performance for
> reliability.
>
> Unfortunately, spark does a poor job reporting off heap memory usage. From
> the profiler, it seems that the job's heap usage is pretty static but the
> off heap memory fluctuates quiet a lot. It looks like bulk of off heap is
> used by io.netty.buffer.UnpooledUnsafeDirectByteBuf while the shuffle
> client is trying to read block from shuffle service. It looks
> like org.apache.spark.network.util.TransportFrameDecoder retains them
> in buffers field while decoding responses from the shuffle service. So far,
> it's not clear why it needs to hold multiple GBs in the buffers. Perhaps
> increasing the number of partitions may help with this.
>
> Thanks,
> Aniket
>
> On Fri, Nov 25, 2016 at 1:09 AM Shreya Agarwal 
> wrote:
>
> I don’t think it’s just memory overhead. It might be better to use an
> execute with lesser heap space(30GB?). 46 GB would mean more data load into
> memory and more GC, which can cause issues.
>
>
>
> Also, have you tried to persist data in any way? If so, then that might be
> causing an issue.
>
>
>
> Lastly, I am not sure if your data has a skew and if that is forcing a lot
> of data to be on one executor node.
>
>
>
> Sent from my Windows 10 phone
>
>
>
> *From: *Rodrick Brown 
> *Sent: *Friday, November 25, 2016 12:25 AM
> *To: *Aniket Bhatnagar 
> *Cc: *user 
> *Subject: *Re: OS killing Executor due to high (possibly off heap) memory
> usage
>
>
> Try setting spark.yarn.executor.memoryOverhead 1
>
> On Thu, Nov 24, 2016 at 11:16 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
> Hi Spark users
>
> I am running a job that does join of a huge dataset (7 TB+) and the
> executors keep crashing randomly, eventually causing the job to crash.
> There are no out of memory exceptions in the log and looking at the dmesg
> output, it seems like the OS killed the JVM because of high memory usage.
> My suspicion is towards off heap usage of executor is causing this as I am
> limiting the on heap usage of executor to be 46 GB and each host running
> the executor has 60 GB of RAM. After the executor crashes, I can see that
> the external shuffle manager 
> (org.apache.spark.network.server.TransportRequestHandler)
> logs a lot of channel closed exceptions in yarn node manager logs. This
> leads me to believe that something triggers out of memory during shuffle
> read. Is there a configuration to completely disable usage of off heap
> memory? I have tried setting spark.shuffle.io.preferDirectBufs=false but
> the executor is still getting killed by the same error.
>
> Cluster details:
> 10 AWS c4.8xlarge hosts
> RAM on each host - 60 GB
> Number of cores on each host - 36
> Additional hard disk on each host - 8 TB
>
> Spark configuration:
> dynamic allocation enabled
> external shuffle service enabled
> spark.driver.memory 1024M
> spark.executor.memory 47127M
> Spark master yarn-cluster
>
> Sample error in yarn node manager:
> 2016-11-24 10:34:06,507 ERROR 
> org.apache.spark.network.server.TransportRequestHandler
> (shuffle-server-50): Error sending result ChunkFetchSuccess{
> streamChunkId=StreamChunkId{streamId=919299554123, chunkIndex=0}, buffer=
> FileSegmentManagedBuffer{file=/mnt3/yarn/usercache/hadoop/
> appcache/application_1479898345621_0006/blockmgr-ad5301a9-e1e9-4723-a8c4-
> 9276971b2259/2c/shuffle_3_963_0.data, offset=0, length=669014456}} to /
> 10.192.108.170:52782; closing connection
> java.nio.channels.ClosedChannelException
>
> Error in dmesg:
> [799873.309897] Out of memory: Kill process 50001 (java) score 927 or
> sacrifice child
> [799873.314439] Killed process 50001 (java) total-vm:65652448kB,
> anon-rss:57246528kB, file-rss:0kB
>
> Thanks,
> Aniket
>
>
>
>
> --
>
> [image: Orchard Platform] 
>
> *Rodrick Brown */ *DevOPs*
>
> 9174456839 / rodr...@orchardplatform.com
>
> Orchard Platform
> 101 5th Avenue, 4th Floor, New York, NY
>
> *NOTICE TO RECIPIENTS*: This communication is confidential and intended
> for the use of the addressee only. If you 

UDF for gradient ascent

2016-11-26 Thread Meeraj Kunnumpurath
Hello,

I have a dataset of features on which I want to compute the likelihood
value for implementing gradient ascent for estimating coefficients. I have
written a UDF that compute the probability function on each feature as
shown below.

def getLikelihood(cfs : List[(String, Double)], df: DataFrame) = {
  val pr = udf((r: Row) => {
cfs.foldLeft(0.0)((x, y) => x * 1 / Math.pow(Math.E,
r.getAs[Double](y._1) * y._2))
  })
  df.withColumn("probabibility", pr(struct(df.columns.map(df(_)) :
_*))).agg(sum('probabibility)).first.get(0)
}

When I run it I get a long exception trace listing some generated code, as
shown below.

org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
2445, Column 34: Expression "scan_isNull1" is not an rvalue
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10174)
at
org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:6036)
at
org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:4440)
at org.codehaus.janino.UnitCompiler.access$9900(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$11.visitAmbiguousName(UnitCompiler.java:4417)

This is line 2445 in the generated code,

/* 2445 */ Object project_arg = scan_isNull1 ? null :
project_converter2.apply(scan_value1);

Many thanks



-- 
*Meeraj Kunnumpurath*


*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*

*00 971 50 409 0169mee...@servicesymphony.com *


Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading multiple partitions in parallel.

2016-11-26 Thread kant kodali
up vote
down votefavorite


Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading
multiple partitions in parallel.

Here is my code using spark-shell

import org.apache.spark.sql._
import org.apache.spark.sql.types.StringType
spark.sql("""CREATE TEMPORARY VIEW hello USING
org.apache.spark.sql.cassandra OPTIONS (table "hello", keyspace "db",
cluster "Test Cluster", pushdown "true")""")
val df = spark.sql("SELECT test from hello")
val df2 = df.select(df("test").cast(StringType).as("test"))
val rdd = df2.rdd.map { case Row(j: String) => j }
val df4 = spark.read.json(rdd) // This line takes forever

I have about 700 million rows each row is about 1KB and this line

val df4 = spark.read.json(rdd) takes forever as I get the following output
after 1hr 30 mins

Stage 1:==> (4866 + 2) / 25256]

so at this rate it will probably take days.

I measured the network throughput rate of spark worker nodes using iftop
and it is about 2.2KB/s (kilobytes per second) which is too low so that
tells me it not reading partitions in parallel or at very least it is not
reading good chunk of data else it would be in MB/s. Any ideas on how to
fix it?