Re: Why so many parquet file part when I store data in Alluxio or File?

2016-06-30 Thread Ted Yu
Looking under Alluxio source, it seems only "fs.hdfs.impl.disable.cache" is
in use.

FYI

On Thu, Jun 30, 2016 at 9:30 PM, Deepak Sharma 
wrote:

> Ok.
> I came across this issue.
> Not sure if you already assessed this:
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-6921
>
> The workaround mentioned may work for you .
>
> Thanks
> Deepak
> On 1 Jul 2016 9:34 am, "Chanh Le"  wrote:
>
>> Hi Deepark,
>> Thank for replying. The way to write into alluxio is
>> df.write.mode(SaveMode.Append).partitionBy("network_id", "time").parquet(
>> "alluxio://master1:1/FACT_ADMIN_HOURLY”)
>>
>>
>> I partition by 2 columns and store. I just want when I write it automatic
>> write a size properly for what I already set in Alluxio 512MB per block.
>>
>>
>> On Jul 1, 2016, at 11:01 AM, Deepak Sharma  wrote:
>>
>> Before writing coalesing your rdd to 1 .
>> It will create only 1 output file .
>> Multiple part file happens as all your executors will be writing their
>> partitions to separate part files.
>>
>> Thanks
>> Deepak
>> On 1 Jul 2016 8:01 am, "Chanh Le"  wrote:
>>
>> Hi everyone,
>> I am using Alluxio for storage. But I am little bit confuse why I am do
>> set block size of alluxio is 512MB and my file part only few KB and too
>> many part.
>> Is that normal? Because I want to read it fast? Is that many part effect
>> the read operation?
>> How to set the size of file part?
>>
>> Thanks.
>> Chanh
>>
>>
>>
>>
>>
>> 
>>
>>
>>


Re: Why so many parquet file part when I store data in Alluxio or File?

2016-06-30 Thread Deepak Sharma
Ok.
I came across this issue.
Not sure if you already assessed this:
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-6921

The workaround mentioned may work for you .

Thanks
Deepak
On 1 Jul 2016 9:34 am, "Chanh Le"  wrote:

> Hi Deepark,
> Thank for replying. The way to write into alluxio is
> df.write.mode(SaveMode.Append).partitionBy("network_id", "time").parquet("
> alluxio://master1:1/FACT_ADMIN_HOURLY”)
>
>
> I partition by 2 columns and store. I just want when I write it automatic
> write a size properly for what I already set in Alluxio 512MB per block.
>
>
> On Jul 1, 2016, at 11:01 AM, Deepak Sharma  wrote:
>
> Before writing coalesing your rdd to 1 .
> It will create only 1 output file .
> Multiple part file happens as all your executors will be writing their
> partitions to separate part files.
>
> Thanks
> Deepak
> On 1 Jul 2016 8:01 am, "Chanh Le"  wrote:
>
> Hi everyone,
> I am using Alluxio for storage. But I am little bit confuse why I am do
> set block size of alluxio is 512MB and my file part only few KB and too
> many part.
> Is that normal? Because I want to read it fast? Is that many part effect
> the read operation?
> How to set the size of file part?
>
> Thanks.
> Chanh
>
>
>
>
>
> 
>
>
>


Re: Why so many parquet file part when I store data in Alluxio or File?

2016-06-30 Thread Chanh Le
Hi Deepark,
Thank for replying. The way to write into alluxio is 
df.write.mode(SaveMode.Append).partitionBy("network_id", 
"time").parquet("alluxio://master1:1/FACT_ADMIN_HOURLY”)


I partition by 2 columns and store. I just want when I write it automatic write 
a size properly for what I already set in Alluxio 512MB per block.


> On Jul 1, 2016, at 11:01 AM, Deepak Sharma  wrote:
> 
> Before writing coalesing your rdd to 1 .
> It will create only 1 output file .
> Multiple part file happens as all your executors will be writing their 
> partitions to separate part files.
> 
> Thanks
> Deepak
> 
> On 1 Jul 2016 8:01 am, "Chanh Le"  > wrote:
> Hi everyone,
> I am using Alluxio for storage. But I am little bit confuse why I am do set 
> block size of alluxio is 512MB and my file part only few KB and too many part.
> Is that normal? Because I want to read it fast? Is that many part effect the 
> read operation?
> How to set the size of file part?
> 
> Thanks.
> Chanh
> 
> 
> 
>  
> 
> 



HiveContext

2016-06-30 Thread manish jaiswal
-- Forwarded message --
From: "manish jaiswal" 
Date: Jun 30, 2016 17:35
Subject: HiveContext
To: , , <
user-h...@spark.apache.org>
Cc:

Hi,


I am new to Spark.I found using HiveContext we can connect to hive and run
HiveQLs. I run it and it worked.

My doubt is when we are using hiveContext and run hive query like(select
distinct column from table).

how it will perform it will take all data stored in hdfs into spark
engine(memory) and perform (select distinct column from table) or
it will give to hive and get result from hive.?



Thanks


Re: One map per folder in spark or Hadoop

2016-06-30 Thread Balachandar R.A.
Thank you very much.  I will try this code and update you

Regards
Bala
On 01-Jul-2016 7:46 am, "Sun Rui"  wrote:

> Say you have got all of your folder paths into a val folders: Seq[String]
>
> val add = sc.parallelize(folders, folders.size).mapPartitions { iter =>
>   val folder = iter.next
>   val status: Int = 
>   Seq(status).toIterator
> }
>
> On Jun 30, 2016, at 16:42, Balachandar R.A. 
> wrote:
>
> Hello,
>
> I have some 100 folders. Each folder contains 5 files. I have an
> executable that process one folder. The executable is a black box and hence
> it cannot be modified.I would like to process 100 folders in parallel using
> Apache spark so that I should be able to span a map task per folder. Can
> anyone give me an idea? I have came across similar questions but with
> Hadoop and answer was to use combineFileInputFormat and pathFilter.
> However, as I said, I want to use Apache spark. Any idea?
>
> Regards
> Bala
>
>
>


Re: One map per folder in spark or Hadoop

2016-06-30 Thread Sun Rui
Say you have got all of your folder paths into a val folders: Seq[String]

val add = sc.parallelize(folders, folders.size).mapPartitions { iter =>
  val folder = iter.next
  val status: Int = 
  Seq(status).toIterator
}

> On Jun 30, 2016, at 16:42, Balachandar R.A.  wrote:
> 
> Hello,
> 
> I have some 100 folders. Each folder contains 5 files. I have an executable 
> that process one folder. The executable is a black box and hence it cannot be 
> modified.I would like to process 100 folders in parallel using Apache spark 
> so that I should be able to span a map task per folder. Can anyone give me an 
> idea? I have came across similar questions but with Hadoop and answer was to 
> use combineFileInputFormat and pathFilter. However, as I said, I want to use 
> Apache spark. Any idea?
> 
> Regards 
> Bala
> 



Re: Looking for help about stackoverflow in spark

2016-06-30 Thread Chanh Le
Hi John,
I think it relates to drivers memory more than the others thing you said.

Can you just increase more memory for driver?




> On Jul 1, 2016, at 9:03 AM, johnzeng  wrote:
> 
> I am trying to load a 1 TB collection into spark cluster from mongo. But I am
> keep getting stack overflow error  after running for a while.
> 
> I have posted a question in stackoverflow.com, and tried all advies they
> have provide, nothing works...
> 
> how to load large database into spark
> 
>   
> 
> I have tried:
> 1, use persist to make it MemoryAndDisk,  same error after running same
> time.
> 2, add more instance,  same error after running same time.
> 3, run this script on another collection which is much smaller, everything
> is good, so I think my codes are all right.
> 4, remove the reduce process, same error after running same time.
> 5, remove the map process,  same error after running same time.
> 6, change the sql I used, it's faster, but  same error after running shorter
> time.
> 7,retrieve "_id" instead of "u_at" and "c_at",  same error after running
> same time.
> 
> Anyone knows how many resources do I need to handle this 1TB database? I
> only retrieve two fields form it, and this field is only 1% of a
> document(because we have an array containing about 90+ embedded documents in
> it.)
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Looking-for-help-about-stackoverflow-in-spark-tp27255.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Looking for help about stackoverflow in spark

2016-06-30 Thread johnzeng
I am trying to load a 1 TB collection into spark cluster from mongo. But I am
keep getting stack overflow error  after running for a while.

I have posted a question in stackoverflow.com, and tried all advies they
have provide, nothing works...

how to load large database into spark
  

I have tried:
1, use persist to make it MemoryAndDisk,  same error after running same
time.
2, add more instance,  same error after running same time.
3, run this script on another collection which is much smaller, everything
is good, so I think my codes are all right.
4, remove the reduce process, same error after running same time.
5, remove the map process,  same error after running same time.
6, change the sql I used, it's faster, but  same error after running shorter
time.
7,retrieve "_id" instead of "u_at" and "c_at",  same error after running
same time.

Anyone knows how many resources do I need to handle this 1TB database? I
only retrieve two fields form it, and this field is only 1% of a
document(because we have an array containing about 90+ embedded documents in
it.)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Looking-for-help-about-stackoverflow-in-spark-tp27255.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Spark 2.0 Continuous Processing

2016-06-30 Thread kmat
In a continuous processing pipeline with dataframes is there any way to 
checkpoint the processing state (by the user) at periodic intervals. The 
thought process behind this is to rewind to any particular checkpoint and then 
fast forward processing thereon.
Date: Wed, 29 Jun 2016 23:17:47 -0700
From: ml-node+s1001560n27250...@n3.nabble.com
To: kurianmath...@hotmail.com
Subject: Re: Spark 2.0 Continuous Processing



 it also supports interactive and batch queries. With Spark 2.0, 
DataFrames and Datasets are being combined and which ensures that an event or 
file is processed once, and only once.Writing resources are guaranteed that 
grammar and spelling all will remain accurate and correct.










If you reply to this email, your message will be added to the 
discussion below:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-Continuous-Processing-tp27226p27250.html



To unsubscribe from Spark 2.0 Continuous Processing, click here.

NAML
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-Continuous-Processing-tp27226p27253.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: Spark jobs

2016-06-30 Thread Joaquin Alzola
HI Sujeet,

Thinking that might not work

Running this:
#!/usr/bin/env python3
from pyspark_cassandra import CassandraSparkContext, Row
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = 
SparkConf().setAppName("test").setMaster("spark://192.168.23.31:7077").set("spark.cassandra.connection.host",
 "192.168.23.31")
sc = CassandraSparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = 
sqlContext.read.format("org.apache.spark.sql.cassandra").options(keyspace="lebara_diameter_codes",
 table="nl_lebara_diameter_codes").load()
list = df.select("errorcode2001").where("errorcode2001 > 1200").collect()
list2 = df.select("date").collect()
print([i for i in list[0]])
print(type(list[0]))

of course show this error:
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: java.lang.ClassNotFoundException: Failed to find data source: 
org.apache.spark.sql.cassandra. Please find packages at 
http://spark-packages.org

Is there a way to load up those jar files into the script

Jo

From: sujeet jog [mailto:sujeet@gmail.com]
Sent: 29 June 2016 14:51
To: Joaquin Alzola ; user 
Subject: Re: Spark jobs

check if this helps,

from multiprocessing import Process

def training() :
print ("Training Workflow")

cmd = spark/bin/spark-submit  ./ml.py & "
os.system(cmd)

w_training  = Process(target = training)



On Wed, Jun 29, 2016 at 6:28 PM, Joaquin Alzola 
> wrote:
Hi,

This is a totally newbie question but I seem not to find the link ….. when I 
create a spark-submit python script to be launch …

how should I call it from the main python script with a subprocess.popen?

BR

Joaquin






This email is confidential and may be subject to privilege. If you are not the 
intended recipient, please do not copy or disclose its content but contact the 
sender immediately upon receipt.

This email is confidential and may be subject to privilege. If you are not the 
intended recipient, please do not copy or disclose its content but contact the 
sender immediately upon receipt.


RE: Remote RPC client disassociated

2016-06-30 Thread Joaquin Alzola
>>> 16/06/30 10:44:34 ERROR util.Utils: Uncaught exception in thread stdout 
>>> writer for python
java.lang.AbstractMethodError: 
pyspark_cassandra.DeferringRowReader.read(Lcom/datastax/driver/core/Row;Lcom/datastax/spark/connector/CassandraRowMetadata;)Ljava/lang/Object;
>> You are trying to call an abstract method.  Please check the method 
>> DeferringRowReader.read

Do not know how to fix this issue.
Have seen in many tutorials around the net and those ones made the same calling 
I am currently doing

from pyspark_cassandra import CassandraSparkContext, Row
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = 
SparkConf().setAppName("test").setMaster("spark://192.168.23.31:7077").set("spark.cassandra.connection.host",
 "192.168.23.31")
sc = CassandraSparkContext(conf=conf)
table = sc.cassandraTable("lebara_diameter_codes","nl_lebara_diameter_codes")
food_count = table.select("errorcode2001").groupBy("errorcode2001").count()
food_count.collect()

I am really new to this psark thing. Was able to configure it correctly nd now 
learning the API.
This email is confidential and may be subject to privilege. If you are not the 
intended recipient, please do not copy or disclose its content but contact the 
sender immediately upon receipt.


Re: Logical Plan

2016-06-30 Thread Mich Talebzadeh
I don't think Spark optimizer supports something like statement cache where
plan is cached and bind variables (like RDBMS) are used for different
values, thus saving the parsing.

What you re stating is that the source and tempTable change but the plan
itself remains the same. I have not seen this in 1.6.1 and as I understand
Spark does yet support CBO yet not even in 2.0


HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 30 June 2016 at 22:53, Darshan Singh  wrote:

> I am using 1.5.2.
>
> I have a data-frame with 10 column and then I pivot 1 column and generate
> the 700 columns.
>
> it is like
>
> val df1 = sqlContext.read.parquet("file1")
> df1.registerTempTable("df1")
> val df2= sqlContext.sql("select col1, col2, sum(case when col3 = 1 then
> col4 else 0.0 end ) as col4_1,,sum(case when col3 = 700 then col4 else
> 0.0 end ) as col4_700 from df1 group by col1, col2")
>
> Now this last statement takes around 20-30 seconds. I run this a number of
> times only difference is that for df1 file is different. Everything else is
> same.
>
> The actual statement takes 2-3 seconds so it is bit frustrating that just
> generating plan for df2 is taking too much time.Worse thing is that this
> run on driver so it is not palatalized.
>
> I have similar issue in another query where from these 700 columns we
> generate more columns by adding or subtracting these and it again takes
> lots of time.
>
> Not sure what could be done here.
>
> Thanks
>
> On Thu, Jun 30, 2016 at 10:10 PM, Reynold Xin  wrote:
>
>> Which version are you using here? If the underlying files change,
>> technically we should go through optimization again.
>>
>> Perhaps the real "fix" is to figure out why is logical plan creation so
>> slow for 700 columns.
>>
>>
>> On Thu, Jun 30, 2016 at 1:58 PM, Darshan Singh 
>> wrote:
>>
>>> Is there a way I can use same Logical plan for a query. Everything will
>>> be same except underlying file will be different.
>>>
>>> Issue is that my query has around 700 columns and Generating logical
>>> plan takes 20 seconds and it happens every 2 minutes but every time
>>> underlying file is different.
>>>
>>> I do not know these files in advance so I cant create the table on
>>> directory level. These files are created and then used in the final query.
>>>
>>> Thanks
>>>
>>
>>
>


Re: Logical Plan

2016-06-30 Thread Darshan Singh
I am using 1.5.2.

I have a data-frame with 10 column and then I pivot 1 column and generate
the 700 columns.

it is like

val df1 = sqlContext.read.parquet("file1")
df1.registerTempTable("df1")
val df2= sqlContext.sql("select col1, col2, sum(case when col3 = 1 then
col4 else 0.0 end ) as col4_1,,sum(case when col3 = 700 then col4 else
0.0 end ) as col4_700 from df1 group by col1, col2")

Now this last statement takes around 20-30 seconds. I run this a number of
times only difference is that for df1 file is different. Everything else is
same.

The actual statement takes 2-3 seconds so it is bit frustrating that just
generating plan for df2 is taking too much time.Worse thing is that this
run on driver so it is not palatalized.

I have similar issue in another query where from these 700 columns we
generate more columns by adding or subtracting these and it again takes
lots of time.

Not sure what could be done here.

Thanks

On Thu, Jun 30, 2016 at 10:10 PM, Reynold Xin  wrote:

> Which version are you using here? If the underlying files change,
> technically we should go through optimization again.
>
> Perhaps the real "fix" is to figure out why is logical plan creation so
> slow for 700 columns.
>
>
> On Thu, Jun 30, 2016 at 1:58 PM, Darshan Singh 
> wrote:
>
>> Is there a way I can use same Logical plan for a query. Everything will
>> be same except underlying file will be different.
>>
>> Issue is that my query has around 700 columns and Generating logical plan
>> takes 20 seconds and it happens every 2 minutes but every time underlying
>> file is different.
>>
>> I do not know these files in advance so I cant create the table on
>> directory level. These files are created and then used in the final query.
>>
>> Thanks
>>
>
>


Re: Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
That was indeed the case, using UTF8Deserializer makes everything work
correctly.

Thanks for the tips!

On Thu, Jun 30, 2016 at 3:32 PM, Pedro Rodriguez 
wrote:

> Quick update, I was able to get most of the plumbing to work thanks to the
> code Holden posted and browsing more source code.
>
> I am running into this error which makes me think that maybe I shouldn't
> be leaving the default python RDD serializer/pickler in place and do
> something else
> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182:
> _pickle.UnpicklingError: A load persistent id instruction was encountered,
> but no persistent_load function was specified.
>
>
> On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez 
> wrote:
>
>> Thanks Jeff and Holden,
>>
>> A little more context here probably helps. I am working on implementing
>> the idea from this article to make reads from S3 faster:
>> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>> (although my name is Pedro, I am not the author of the article). The reason
>> for wrapping SparkContext is so that the code change is from sc.textFile to
>> sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
>> can open source our library, but depends on company). Overall, its a very
>> light wrapper and perhaps calling it a context is not quite the right name
>> because of that.
>>
>> At the end of the day I make a sc.parallelize call and return an
>> RDD[String] as described in that blog post. I found a post from Py4J
>> mailing list that reminded my that the JVM gateway needs the jars in
>> spark.driver/executor.extraClassPath in addition to the spark.jars option.
>> With that, I can see the classes now. Looks like I need to do as you
>> suggest and wrap it using Java in order to go the last mile to calling the
>> method/constructor. I don't know yet how to get the RDD back to pyspark
>> though so any pointers on that would be great.
>>
>> Thanks for the tip on code Holden, I will take a look to see if that can
>> give me some insight on how to write the Python code part.
>>
>> Thanks!
>> Pedro
>>
>> On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau 
>> wrote:
>>
>>> So I'm a little biased - I think the bet bride between the two is using
>>> DataFrames. I've got some examples in my talk and on the high performance
>>> spark GitHub
>>> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
>>> calls some custom scala code.
>>>
>>> Using a custom context is a bit trixie though because of how the
>>> launching is done, as Jeff Zhang points out you would need to wrap it in a
>>> JavaSparkContext and then you could override the _intialize_context
>>> function in context.py
>>>
>>> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang  wrote:
>>>
 Hi Pedro,

 Your use case is interesting.  I think launching java gateway is the
 same as native SparkContext, the only difference is on creating your custom
 SparkContext instead of native SparkContext. You might also need to wrap it
 using java.


 https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172



 On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <
 ski.rodrig...@gmail.com> wrote:

> Hi All,
>
> I have written a Scala package which essentially wraps the
> SparkContext around a custom class that adds some functionality specific 
> to
> our internal use case. I am trying to figure out the best way to call this
> from PySpark.
>
> I would like to do this similarly to how Spark itself calls the JVM
> SparkContext as in:
> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>
> My goal would be something like this:
>
> Scala Code (this is done):
> >>> import com.company.mylibrary.CustomContext
> >>> val myContext = CustomContext(sc)
> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>
> Python Code (I want to be able to do this):
> >>> from company.mylibrary import CustomContext
> >>> myContext = CustomContext(sc)
> >>> rdd = myContext.customTextFile("path")
>
> At the end of each code, I should be working with an ordinary
> RDD[String].
>
> I am trying to access my Scala class through sc._jvm as below, but not
> having any luck so far.
>
> My attempts:
> >>> a = sc._jvm.com.company.mylibrary.CustomContext
> >>> dir(a)
> ['']
>
> Example of what I want::
> >>> a = sc._jvm.PythonRDD
> >>> dir(a)
> ['anonfun$6', 'anonfun$8', 'collectAndServe',
> 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
> 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
> 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
> 'rddToOrderedRDDFunctions', 

Re: Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
Quick update, I was able to get most of the plumbing to work thanks to the
code Holden posted and browsing more source code.

I am running into this error which makes me think that maybe I shouldn't be
leaving the default python RDD serializer/pickler in place and do something
else https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182:
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.


On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez 
wrote:

> Thanks Jeff and Holden,
>
> A little more context here probably helps. I am working on implementing
> the idea from this article to make reads from S3 faster:
> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
> (although my name is Pedro, I am not the author of the article). The reason
> for wrapping SparkContext is so that the code change is from sc.textFile to
> sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
> can open source our library, but depends on company). Overall, its a very
> light wrapper and perhaps calling it a context is not quite the right name
> because of that.
>
> At the end of the day I make a sc.parallelize call and return an
> RDD[String] as described in that blog post. I found a post from Py4J
> mailing list that reminded my that the JVM gateway needs the jars in
> spark.driver/executor.extraClassPath in addition to the spark.jars option.
> With that, I can see the classes now. Looks like I need to do as you
> suggest and wrap it using Java in order to go the last mile to calling the
> method/constructor. I don't know yet how to get the RDD back to pyspark
> though so any pointers on that would be great.
>
> Thanks for the tip on code Holden, I will take a look to see if that can
> give me some insight on how to write the Python code part.
>
> Thanks!
> Pedro
>
> On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau 
> wrote:
>
>> So I'm a little biased - I think the bet bride between the two is using
>> DataFrames. I've got some examples in my talk and on the high performance
>> spark GitHub
>> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
>> calls some custom scala code.
>>
>> Using a custom context is a bit trixie though because of how the
>> launching is done, as Jeff Zhang points out you would need to wrap it in a
>> JavaSparkContext and then you could override the _intialize_context
>> function in context.py
>>
>> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang  wrote:
>>
>>> Hi Pedro,
>>>
>>> Your use case is interesting.  I think launching java gateway is the
>>> same as native SparkContext, the only difference is on creating your custom
>>> SparkContext instead of native SparkContext. You might also need to wrap it
>>> using java.
>>>
>>>
>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>>>
>>>
>>>
>>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <
>>> ski.rodrig...@gmail.com> wrote:
>>>
 Hi All,

 I have written a Scala package which essentially wraps the SparkContext
 around a custom class that adds some functionality specific to our internal
 use case. I am trying to figure out the best way to call this from PySpark.

 I would like to do this similarly to how Spark itself calls the JVM
 SparkContext as in:
 https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py

 My goal would be something like this:

 Scala Code (this is done):
 >>> import com.company.mylibrary.CustomContext
 >>> val myContext = CustomContext(sc)
 >>> val rdd: RDD[String] = myContext.customTextFile("path")

 Python Code (I want to be able to do this):
 >>> from company.mylibrary import CustomContext
 >>> myContext = CustomContext(sc)
 >>> rdd = myContext.customTextFile("path")

 At the end of each code, I should be working with an ordinary
 RDD[String].

 I am trying to access my Scala class through sc._jvm as below, but not
 having any luck so far.

 My attempts:
 >>> a = sc._jvm.com.company.mylibrary.CustomContext
 >>> dir(a)
 ['']

 Example of what I want::
 >>> a = sc._jvm.PythonRDD
 >>> dir(a)
 ['anonfun$6', 'anonfun$8', 'collectAndServe',
 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
 'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
 'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
 'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
 

Re: Logical Plan

2016-06-30 Thread Mich Talebzadeh
A logical plan should not change assuming the same DAG diagram is used
throughout


Have you tried Spark GUI Page under stages? This is Spark 2

example:

[image: Inline images 1]

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 30 June 2016 at 22:10, Reynold Xin  wrote:

> Which version are you using here? If the underlying files change,
> technically we should go through optimization again.
>
> Perhaps the real "fix" is to figure out why is logical plan creation so
> slow for 700 columns.
>
>
> On Thu, Jun 30, 2016 at 1:58 PM, Darshan Singh 
> wrote:
>
>> Is there a way I can use same Logical plan for a query. Everything will
>> be same except underlying file will be different.
>>
>> Issue is that my query has around 700 columns and Generating logical plan
>> takes 20 seconds and it happens every 2 minutes but every time underlying
>> file is different.
>>
>> I do not know these files in advance so I cant create the table on
>> directory level. These files are created and then used in the final query.
>>
>> Thanks
>>
>
>


Re: Logical Plan

2016-06-30 Thread Reynold Xin
Which version are you using here? If the underlying files change,
technically we should go through optimization again.

Perhaps the real "fix" is to figure out why is logical plan creation so
slow for 700 columns.


On Thu, Jun 30, 2016 at 1:58 PM, Darshan Singh 
wrote:

> Is there a way I can use same Logical plan for a query. Everything will be
> same except underlying file will be different.
>
> Issue is that my query has around 700 columns and Generating logical plan
> takes 20 seconds and it happens every 2 minutes but every time underlying
> file is different.
>
> I do not know these files in advance so I cant create the table on
> directory level. These files are created and then used in the final query.
>
> Thanks
>


Logical Plan

2016-06-30 Thread Darshan Singh
Is there a way I can use same Logical plan for a query. Everything will be
same except underlying file will be different.

Issue is that my query has around 700 columns and Generating logical plan
takes 20 seconds and it happens every 2 minutes but every time underlying
file is different.

I do not know these files in advance so I cant create the table on
directory level. These files are created and then used in the final query.

Thanks


RDD to DataFrame question with JsValue in the mix

2016-06-30 Thread Dood

Hello,

I have an RDD[(String,JsValue)] that I want to convert into a DataFrame 
and then run SQL on. What is the easiest way to get the JSON (in form of 
JsValue) "understood" by the process?


Thanks!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
Thanks Jeff and Holden,

A little more context here probably helps. I am working on implementing the
idea from this article to make reads from S3 faster:
http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
(although my name is Pedro, I am not the author of the article). The reason
for wrapping SparkContext is so that the code change is from sc.textFile to
sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
can open source our library, but depends on company). Overall, its a very
light wrapper and perhaps calling it a context is not quite the right name
because of that.

At the end of the day I make a sc.parallelize call and return an
RDD[String] as described in that blog post. I found a post from Py4J
mailing list that reminded my that the JVM gateway needs the jars in
spark.driver/executor.extraClassPath in addition to the spark.jars option.
With that, I can see the classes now. Looks like I need to do as you
suggest and wrap it using Java in order to go the last mile to calling the
method/constructor. I don't know yet how to get the RDD back to pyspark
though so any pointers on that would be great.

Thanks for the tip on code Holden, I will take a look to see if that can
give me some insight on how to write the Python code part.

Thanks!
Pedro

On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau  wrote:

> So I'm a little biased - I think the bet bride between the two is using
> DataFrames. I've got some examples in my talk and on the high performance
> spark GitHub
> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
> calls some custom scala code.
>
> Using a custom context is a bit trixie though because of how the launching
> is done, as Jeff Zhang points out you would need to wrap it in a
> JavaSparkContext and then you could override the _intialize_context
> function in context.py
>
> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang  wrote:
>
>> Hi Pedro,
>>
>> Your use case is interesting.  I think launching java gateway is the same
>> as native SparkContext, the only difference is on creating your custom
>> SparkContext instead of native SparkContext. You might also need to wrap it
>> using java.
>>
>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>>
>>
>>
>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez > > wrote:
>>
>>> Hi All,
>>>
>>> I have written a Scala package which essentially wraps the SparkContext
>>> around a custom class that adds some functionality specific to our internal
>>> use case. I am trying to figure out the best way to call this from PySpark.
>>>
>>> I would like to do this similarly to how Spark itself calls the JVM
>>> SparkContext as in:
>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>>>
>>> My goal would be something like this:
>>>
>>> Scala Code (this is done):
>>> >>> import com.company.mylibrary.CustomContext
>>> >>> val myContext = CustomContext(sc)
>>> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>>>
>>> Python Code (I want to be able to do this):
>>> >>> from company.mylibrary import CustomContext
>>> >>> myContext = CustomContext(sc)
>>> >>> rdd = myContext.customTextFile("path")
>>>
>>> At the end of each code, I should be working with an ordinary
>>> RDD[String].
>>>
>>> I am trying to access my Scala class through sc._jvm as below, but not
>>> having any luck so far.
>>>
>>> My attempts:
>>> >>> a = sc._jvm.com.company.mylibrary.CustomContext
>>> >>> dir(a)
>>> ['']
>>>
>>> Example of what I want::
>>> >>> a = sc._jvm.PythonRDD
>>> >>> dir(a)
>>> ['anonfun$6', 'anonfun$8', 'collectAndServe',
>>> 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
>>> 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
>>> 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
>>> 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
>>> 'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
>>> 'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
>>> 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
>>> 'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
>>> 'writeIteratorToStream', 'writeUTF']
>>>
>>> The next thing I would run into is converting the JVM RDD[String] back
>>> to a Python RDD, what is the easiest way to do this?
>>>
>>> Overall, is this a good approach to calling the same API in Scala and
>>> Python?
>>>
>>> --
>>> Pedro Rodriguez
>>> PhD Student in Distributed Machine Learning | CU Boulder
>>> UC Berkeley AMPLab Alumni
>>>
>>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>>> Github: github.com/EntilZha | LinkedIn:
>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>



-- 
Pedro Rodriguez
PhD 

Re: Call Scala API from PySpark

2016-06-30 Thread Holden Karau
So I'm a little biased - I think the bet bride between the two is using
DataFrames. I've got some examples in my talk and on the high performance
spark GitHub
https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
calls some custom scala code.

Using a custom context is a bit trixie though because of how the launching
is done, as Jeff Zhang points out you would need to wrap it in a
JavaSparkContext and then you could override the _intialize_context
function in context.py

On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang  wrote:

> Hi Pedro,
>
> Your use case is interesting.  I think launching java gateway is the same
> as native SparkContext, the only difference is on creating your custom
> SparkContext instead of native SparkContext. You might also need to wrap it
> using java.
>
> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>
>
>
> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez 
> wrote:
>
>> Hi All,
>>
>> I have written a Scala package which essentially wraps the SparkContext
>> around a custom class that adds some functionality specific to our internal
>> use case. I am trying to figure out the best way to call this from PySpark.
>>
>> I would like to do this similarly to how Spark itself calls the JVM
>> SparkContext as in:
>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>>
>> My goal would be something like this:
>>
>> Scala Code (this is done):
>> >>> import com.company.mylibrary.CustomContext
>> >>> val myContext = CustomContext(sc)
>> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>>
>> Python Code (I want to be able to do this):
>> >>> from company.mylibrary import CustomContext
>> >>> myContext = CustomContext(sc)
>> >>> rdd = myContext.customTextFile("path")
>>
>> At the end of each code, I should be working with an ordinary RDD[String].
>>
>> I am trying to access my Scala class through sc._jvm as below, but not
>> having any luck so far.
>>
>> My attempts:
>> >>> a = sc._jvm.com.company.mylibrary.CustomContext
>> >>> dir(a)
>> ['']
>>
>> Example of what I want::
>> >>> a = sc._jvm.PythonRDD
>> >>> dir(a)
>> ['anonfun$6', 'anonfun$8', 'collectAndServe',
>> 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
>> 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
>> 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
>> 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
>> 'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
>> 'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
>> 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
>> 'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
>> 'writeIteratorToStream', 'writeUTF']
>>
>> The next thing I would run into is converting the JVM RDD[String] back to
>> a Python RDD, what is the easiest way to do this?
>>
>> Overall, is this a good approach to calling the same API in Scala and
>> Python?
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


How to spin up Kafka using docker and use for Spark Streaming Integration tests

2016-06-30 Thread SRK
Hi,

I need to do integration tests using Spark Streaming. My idea is to spin up
kafka using docker locally and use it to feed the stream to my Streaming
Job. Any suggestions on how to do this would be of great help.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-spin-up-Kafka-using-docker-and-use-for-Spark-Streaming-Integration-tests-tp27252.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Remote RPC client disassociated

2016-06-30 Thread Jeff Zhang
>>> 16/06/30 10:44:34 ERROR util.Utils: Uncaught exception in thread stdout
writer for python

java.lang.AbstractMethodError:
pyspark_cassandra.DeferringRowReader.read(Lcom/datastax/driver/core/Row;Lcom/datastax/spark/connector/CassandraRowMetadata;)Ljava/lang/Object;


You are trying to call an abstract method.  Please check the method
DeferringRowReader.read

On Thu, Jun 30, 2016 at 4:34 AM, Joaquin Alzola 
wrote:

> HI List,
>
>
>
> I am launching this spark-submit job:
>
>
>
> hadoop@testbedocg:/mnt/spark> bin/spark-submit --packages
> com.datastax.spark:spark-cassandra-connector_2.10:1.6.0 --jars
> /mnt/spark/lib/TargetHolding_pyspark-cassandra-0.3.5.jar spark_v2.py
>
>
>
> spark_v2.py is:
>
> from pyspark_cassandra import CassandraSparkContext, Row
>
> from pyspark import SparkContext, SparkConf
>
> from pyspark.sql import SQLContext
>
> conf = SparkConf().setAppName("test").setMaster("spark://
> 192.168.23.31:7077").set("spark.cassandra.connection.host",
> "192.168.23.31")
>
> sc = CassandraSparkContext(conf=conf)
>
> table =
> sc.cassandraTable("lebara_diameter_codes","nl_lebara_diameter_codes")
>
> food_count = table.select("errorcode2001").groupBy("errorcode2001").count()
>
> food_count.collect()
>
>
>
>
>
> Error I get when running the above command:
>
>
>
> [Stage 0:>  (0 +
> 3) / 7]16/06/30 10:40:36 ERROR TaskSchedulerImpl: Lost executor 0 on as5:
> Remote RPC client disassociated. Likely due to containers exceeding
> thresholds, or network issues. Check driver logs for WARN messages.
>
> [Stage 0:>  (0 +
> 7) / 7]16/06/30 10:40:40 ERROR TaskSchedulerImpl: Lost executor 1 on as4:
> Remote RPC client disassociated. Likely due to containers exceeding
> thresholds, or network issues. Check driver logs for WARN messages.
>
> [Stage 0:>  (0 +
> 5) / 7]16/06/30 10:40:42 ERROR TaskSchedulerImpl: Lost executor 3 on as5:
> Remote RPC client disassociated. Likely due to containers exceeding
> thresholds, or network issues. Check driver logs for WARN messages.
>
> [Stage 0:>  (0 +
> 4) / 7]16/06/30 10:40:46 ERROR TaskSchedulerImpl: Lost executor 4 on as4:
> Remote RPC client disassociated. Likely due to containers exceeding
> thresholds, or network issues. Check driver logs for WARN messages.
>
> 16/06/30 10:40:46 ERROR TaskSetManager: Task 5 in stage 0.0 failed 4
> times; aborting job
>
> Traceback (most recent call last):
>
>   File "/mnt/spark-1.6.1-bin-hadoop2.6/spark_v2.py", line 11, in 
>
> food_count =
> table.select("errorcode2001").groupBy("errorcode2001").count()
>
>   File "/mnt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in
> count
>
>   File "/mnt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
>
>   File "/mnt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in
> fold
>
>   File "/mnt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 771, in
> collect
>
>   File "/mnt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
> 813, in __call__
>
>   File "/mnt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line
> 308, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 5 in stage 0.0 failed 4 times, most recent failure: Lost task 5.3 in stage
> 0.0 (TID 14, as4): ExecutorLostFailure (executor 4 exited caused by one of
> the running tasks) Reason: Remote RPC client disassociated. Likely due to
> containers exceeding thresholds, or network issues. Check driver logs for
> WARN messages.
>
> Driver stacktrace:
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>
>at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>
> at scala.Option.foreach(Option.scala:236)
>
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>
> at
> 

Re: Call Scala API from PySpark

2016-06-30 Thread Jeff Zhang
Hi Pedro,

Your use case is interesting.  I think launching java gateway is the same
as native SparkContext, the only difference is on creating your custom
SparkContext instead of native SparkContext. You might also need to wrap it
using java.

https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172



On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez 
wrote:

> Hi All,
>
> I have written a Scala package which essentially wraps the SparkContext
> around a custom class that adds some functionality specific to our internal
> use case. I am trying to figure out the best way to call this from PySpark.
>
> I would like to do this similarly to how Spark itself calls the JVM
> SparkContext as in:
> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>
> My goal would be something like this:
>
> Scala Code (this is done):
> >>> import com.company.mylibrary.CustomContext
> >>> val myContext = CustomContext(sc)
> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>
> Python Code (I want to be able to do this):
> >>> from company.mylibrary import CustomContext
> >>> myContext = CustomContext(sc)
> >>> rdd = myContext.customTextFile("path")
>
> At the end of each code, I should be working with an ordinary RDD[String].
>
> I am trying to access my Scala class through sc._jvm as below, but not
> having any luck so far.
>
> My attempts:
> >>> a = sc._jvm.com.company.mylibrary.CustomContext
> >>> dir(a)
> ['']
>
> Example of what I want::
> >>> a = sc._jvm.PythonRDD
> >>> dir(a)
> ['anonfun$6', 'anonfun$8', 'collectAndServe',
> 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
> 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
> 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
> 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
> 'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
> 'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
> 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
> 'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
> 'writeIteratorToStream', 'writeUTF']
>
> The next thing I would run into is converting the JVM RDD[String] back to
> a Python RDD, what is the easiest way to do this?
>
> Overall, is this a good approach to calling the same API in Scala and
> Python?
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


-- 
Best Regards

Jeff Zhang


Re: Can Spark Dataframes preserve order when joining?

2016-06-30 Thread Takeshi Yamamuro
Hi,

Most of join strategies do not preserve the orderings of input dfs
(sort-merge joins
only hold the ordering of a left input df).
So, as said earlier, you need to explicitly sort them if you want ordered
outputs.

// maropu

On Wed, Jun 29, 2016 at 3:38 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> Well I would not assume anything myself. If you want to order it do it
> explicitly.
>
> Let us take a simple case by creating three DFs based on existing tables
>
> val s =
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
>
> now let us join these tables
>
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>
> And do ab order explicitly
>
> val rs1 = rs.*orderBy*
> ("calendar_month_desc","channel_desc").take(5).foreach(println)
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 June 2016 at 14:32, Jestin Ma  wrote:
>
>> If it’s not too much trouble, could I get some pointers/help on this?
>> (see link)
>>
>> http://stackoverflow.com/questions/38085801/can-dataframe-joins-in-spark-preserve-order
>>
>> -also, as a side question, do Dataframes support easy reordering of
>> columns?
>>
>> Thank you!
>> Jestin
>>
>
>


-- 
---
Takeshi Yamamuro


Call Scala API from PySpark

2016-06-30 Thread Pedro Rodriguez
Hi All,

I have written a Scala package which essentially wraps the SparkContext
around a custom class that adds some functionality specific to our internal
use case. I am trying to figure out the best way to call this from PySpark.

I would like to do this similarly to how Spark itself calls the JVM
SparkContext as in:
https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py

My goal would be something like this:

Scala Code (this is done):
>>> import com.company.mylibrary.CustomContext
>>> val myContext = CustomContext(sc)
>>> val rdd: RDD[String] = myContext.customTextFile("path")

Python Code (I want to be able to do this):
>>> from company.mylibrary import CustomContext
>>> myContext = CustomContext(sc)
>>> rdd = myContext.customTextFile("path")

At the end of each code, I should be working with an ordinary RDD[String].

I am trying to access my Scala class through sc._jvm as below, but not
having any luck so far.

My attempts:
>>> a = sc._jvm.com.company.mylibrary.CustomContext
>>> dir(a)
['']

Example of what I want::
>>> a = sc._jvm.PythonRDD
>>> dir(a)
['anonfun$6', 'anonfun$8', 'collectAndServe',
'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
'writeIteratorToStream', 'writeUTF']

The next thing I would run into is converting the JVM RDD[String] back to a
Python RDD, what is the easiest way to do this?

Overall, is this a good approach to calling the same API in Scala and
Python?

-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Possible to broadcast a function?

2016-06-30 Thread Aaron Perrin
That's helpful, thanks. I didn't see that thread earlier. But, it sounds like 
the best solution is to use singletons in the executors, which I'm already 
doing.  (BTW - the reason why I consider that method kind of hack-ish, is 
because the it makes the code a bit more difficult for others to understand.). 

Based on its description, I was hoping that Spark's broadcast mechanism was 
using shared memory between JVMs (memory mapped files or named pipes, etc), in 
which case the data structure would only need to be created once per machine.  
I'll have to take a look at the code.

Most likely, I'll have to implement a service on the node and have each 
executor call it.

Sent from my iPhone

> On Jun 30, 2016, at 8:45 AM, Yong Zhang  wrote:
> 
> How about this old discussion related to similar problem as yours.
> 
> http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-td3203.html
> 
> Yong
> 
> From: aper...@timerazor.com
> Date: Wed, 29 Jun 2016 14:00:07 +
> Subject: Possible to broadcast a function?
> To: user@spark.apache.org
> 
> The user guide describes a broadcast as a way to move a large dataset to each 
> node:
> 
> "Broadcast variables allow the programmer to keep a read-only variable cached 
> on each machine rather than shipping a copy of it with tasks. They can be 
> used, for example, to give every node a copy of a large input dataset in an 
> efficient manner."
> 
> And the broadcast example shows it being used with a variable.
> 
> But, is it somehow possible to instead broadcast a function that can be 
> executed once, per node?
> 
> My use case is the following:
> 
> I have a large data structure that I currently create on each executor.  The 
> way that I create it is a hack.  That is, when the RDD function is executed 
> on the executor, I block, load a bunch of data (~250 GiB) from an external 
> data source, create the data structure as a static object in the JVM, and 
> then resume execution.  This works, but it ends up costing me a lot of extra 
> memory (i.e. a few TiB when I have a lot of executors).
> 
> What I'd like to do is use the broadcast mechanism to load the data structure 
> once, per node.  But, I can't serialize the data structure from the driver.
> 
> Any ideas?
> 
> Thanks!
> 
> Aaron
> 


Re: how to add a column according to an existing column of a dataframe?

2016-06-30 Thread nguyen duc tuan
About spark issue that you refer to, it's is not related to your problem :D
In this case, you only have to to is using withColumn function. For example:
import org.apache.spark.sql.functions._
val getRange = udf((x: Int) => get price range code ...)
val priceRange = resultPrice.withColumn("range", getRange($"price"))

About code efficiency, this function can be as simply as:
val getRange = udf((x: String) =>
  if (x == null || x.length == 0) 0
else if (x.contains('万')) 22
else {
  val intVal = x.toInt
  if (intVal > 2) 21
else (x / 5000) + 1
}
)

In general, you can store all the boundary values of the ranges in an array
(in order) then loop through all the values (or more efficency using binary
search) to find what range that value belongs to.

2016-06-30 20:08 GMT+07:00 :

> hi guys,
>  I have a dataframe with 3 columns, id(int) ,type(string)
> ,price(string) , and I want to add a column "price range", according to the
> value of price.
>  I checked the SPARK-15383
> , however in my code I
> just want to append a column, which is transforming from the original
> dataframe "resultprice",  to resultprice
>  Is there a better way to do this? No matter the implements or code
> efficiency.
>  Will pattern matching help ? and How?
>  Thank you guys.
>
> here is my code:
>
> val priceRange = resultprice.select("price").map { x =>
>   if (x.getString(0).trim == "null"||x.getString(0).trim == "")
> x(0).toString().trim.+("|0") else
>   if (x.getString(0).trim.contains('万'))
> x(0).toString().trim.replaceAll("万", "").+("|22") else
>   if (x.getString(0).trim.toInt < 5000) x(0).toString().+("|1") else
>   if (x.getString(0).trim.toInt >= 5000  && x.getString(0).trim.toInt
> < 1) x(0).toString().trim+("|2") else
>   if (x.getString(0).trim.toInt >= 1 && x.getString(0).trim.toInt
> < 15000) x(0).toString().trim.+("|3") else
>   if (x.getString(0).trim.toInt >= 15000 && x.getString(0).trim.toInt
> < 2) x(0).toString().trim.+("|4") else
>   if (x.getString(0).trim.toInt >= 2 && x.getString(0).trim.toInt
> < 25000) x(0).toString().trim.+("|5") else
>   if (x.getString(0).trim.toInt >= 25000 && x.getString(0).trim.toInt
> < 3) x(0).toString().trim.+("|6") else
>   if (x.getString(0).trim.toInt >= 3 && x.getString(0).trim.toInt
> < 35000) x(0).toString().trim.+("|7") else
>   if (x.getString(0).trim.toInt >= 35000 && x.getString(0).trim.toInt
> < 4) x(0).toString().trim.+("|8") else
>   if (x.getString(0).trim.toInt >= 4 && x.getString(0).trim.toInt
> < 45000) x(0).toString().trim.+("|9") else
>   if (x.getString(0).trim.toInt >= 45000 && x.getString(0).trim.toInt
> < 5) x(0).toString().trim.+("|10") else
>   if (x.getString(0).trim.toInt >= 5 && x.getString(0).trim.toInt
> < 55000) x(0).toString().trim.+("|11") else
>   if (x.getString(0).trim.toInt >= 55000 && x.getString(0).trim.toInt
> < 6) x(0).toString().trim.+("|12") else
>   if (x.getString(0).trim.toInt >= 6 && x.getString(0).trim.toInt
> < 65000) x(0).toString().trim.+("|13") else
>   if (x.getString(0).trim.toInt >= 65000 && x.getString(0).trim.toInt
> < 7) x(0).toString().trim.+("|14") else
>   if (x.getString(0).trim.toInt >= 7 && x.getString(0).trim.toInt
> < 75000) x(0).toString().trim.+("|15") else
>   if (x.getString(0).trim.toInt >= 75000 && x.getString(0).trim.toInt
> < 8) x(0).toString().trim.+("|16") else
>   if (x.getString(0).trim.toInt >= 8 && x.getString(0).trim.toInt
> < 85000) x(0).toString().trim.+("|17") else
>   if (x.getString(0).trim.toInt >= 85000 && x.getString(0).trim.toInt
> < 9) x(0).toString().trim.+("|18") else
>   if (x.getString(0).trim.toInt >= 9 && x.getString(0).trim.toInt
> < 95000) x(0).toString().trim.+("|19") else
>   if (x.getString(0).trim.toInt >= 95000 && x.getString(0).trim.toInt
> < 10) x(0).toString().trim.+("|20") else
>   if (x.getString(0).trim.toInt >= 10)
> x(0).toString().trim.+("|21")
> }
> priceRange.collect().foreach(println)
> case class PriceRange(price:String,priceRange:Int)
> val priceRange2 = priceRange.map(_.toString().split("|")).map { p =>
> PriceRange(p(0), p(1).trim.toInt)}.toDF()
> val priceRangeCol = priceRange2.apply("priceRange")
> val finalPrice = resultprice.withColumn("priceRange", priceRangeCol)
>
> here is the stacktrace:
> scala> val finalPrice = resultprice.withColumn("priceRange",
> priceRangeCol)
> org.apache.spark.sql.AnalysisException: resolved attribute(s)
> priceRange#2629 missing from lp_loupan_id#1,price_type#26,price#101 in
> operator !Project [lp_loupan_id#1,price_type#26,price#101,priceRange#2629
> AS priceRange#2630];
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> at
> 

how to add a column according to an existing column of a dataframe?

2016-06-30 Thread luohui20001
hi guys, I have a dataframe with 3 columns, id(int) ,type(string) 
,price(string) , and I want to add a column "price range", according to the 
value of price.  I checked the SPARK-15383, however in my code I just want 
to append a column, which is transforming from the original dataframe 
"resultprice",  to resultprice Is there a better way to do this? No matter 
the implements or code efficiency. Will pattern matching help ? and How?
 Thank you guys.
here is my code:
val priceRange = resultprice.select("price").map { x =>
  if (x.getString(0).trim == "null"||x.getString(0).trim == "") 
x(0).toString().trim.+("|0") else
  if (x.getString(0).trim.contains('万')) 
x(0).toString().trim.replaceAll("万", "").+("|22") else
  if (x.getString(0).trim.toInt < 5000) x(0).toString().+("|1") else
  if (x.getString(0).trim.toInt >= 5000  && x.getString(0).trim.toInt < 
1) x(0).toString().trim+("|2") else
  if (x.getString(0).trim.toInt >= 1 && x.getString(0).trim.toInt < 
15000) x(0).toString().trim.+("|3") else
  if (x.getString(0).trim.toInt >= 15000 && x.getString(0).trim.toInt < 
2) x(0).toString().trim.+("|4") else
  if (x.getString(0).trim.toInt >= 2 && x.getString(0).trim.toInt < 
25000) x(0).toString().trim.+("|5") else
  if (x.getString(0).trim.toInt >= 25000 && x.getString(0).trim.toInt < 
3) x(0).toString().trim.+("|6") else
  if (x.getString(0).trim.toInt >= 3 && x.getString(0).trim.toInt < 
35000) x(0).toString().trim.+("|7") else
  if (x.getString(0).trim.toInt >= 35000 && x.getString(0).trim.toInt < 
4) x(0).toString().trim.+("|8") else
  if (x.getString(0).trim.toInt >= 4 && x.getString(0).trim.toInt < 
45000) x(0).toString().trim.+("|9") else
  if (x.getString(0).trim.toInt >= 45000 && x.getString(0).trim.toInt < 
5) x(0).toString().trim.+("|10") else
  if (x.getString(0).trim.toInt >= 5 && x.getString(0).trim.toInt < 
55000) x(0).toString().trim.+("|11") else
  if (x.getString(0).trim.toInt >= 55000 && x.getString(0).trim.toInt < 
6) x(0).toString().trim.+("|12") else
  if (x.getString(0).trim.toInt >= 6 && x.getString(0).trim.toInt < 
65000) x(0).toString().trim.+("|13") else
  if (x.getString(0).trim.toInt >= 65000 && x.getString(0).trim.toInt < 
7) x(0).toString().trim.+("|14") else
  if (x.getString(0).trim.toInt >= 7 && x.getString(0).trim.toInt < 
75000) x(0).toString().trim.+("|15") else
  if (x.getString(0).trim.toInt >= 75000 && x.getString(0).trim.toInt < 
8) x(0).toString().trim.+("|16") else
  if (x.getString(0).trim.toInt >= 8 && x.getString(0).trim.toInt < 
85000) x(0).toString().trim.+("|17") else
  if (x.getString(0).trim.toInt >= 85000 && x.getString(0).trim.toInt < 
9) x(0).toString().trim.+("|18") else
  if (x.getString(0).trim.toInt >= 9 && x.getString(0).trim.toInt < 
95000) x(0).toString().trim.+("|19") else
  if (x.getString(0).trim.toInt >= 95000 && x.getString(0).trim.toInt < 
10) x(0).toString().trim.+("|20") else
  if (x.getString(0).trim.toInt >= 10) x(0).toString().trim.+("|21")
}priceRange.collect().foreach(println)
case class PriceRange(price:String,priceRange:Int)
val priceRange2 = priceRange.map(_.toString().split("|")).map { p => 
PriceRange(p(0), p(1).trim.toInt)}.toDF()
val priceRangeCol = priceRange2.apply("priceRange")
val finalPrice = resultprice.withColumn("priceRange", priceRangeCol)
here is the stacktrace:scala> val finalPrice = 
resultprice.withColumn("priceRange", priceRangeCol)
org.apache.spark.sql.AnalysisException: resolved attribute(s) priceRange#2629 
missing from lp_loupan_id#1,price_type#26,price#101 in operator !Project 
[lp_loupan_id#1,price_type#26,price#101,priceRange#2629 AS priceRange#2630];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:183)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:707)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1188)

RE: Possible to broadcast a function?

2016-06-30 Thread Yong Zhang
How about this old discussion related to similar problem as yours.
http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-td3203.html
Yong

From: aper...@timerazor.com
Date: Wed, 29 Jun 2016 14:00:07 +
Subject: Possible to broadcast a function?
To: user@spark.apache.org

The user guide describes a broadcast as a way to move a large dataset to each 
node:

"Broadcast variables allow the programmer to keep a read-only variable cached 
on each machine rather
than shipping a copy of it with tasks. They can be used, for example, to give 
every node a copy of a
large input dataset in an efficient manner."

And the broadcast example shows it being used with a variable.

But, is it somehow possible to instead broadcast a function that can be 
executed once, per node?

My use case is the following:

I have a large data structure that I currently create on each executor.  The 
way that I create it is a hack.  That is, when the RDD function is executed on 
the executor, I block, load a bunch of data (~250 GiB) from an external data 
source, create the data structure as a static object in the JVM, and then 
resume execution.  This works, but it ends up costing me a lot of extra memory 
(i.e. a few TiB when I have a lot of executors).

What I'd like to do is use the broadcast mechanism to load the data structure 
once, per node.  But, I can't serialize the data structure from the driver.

Any ideas?

Thanks!

Aaron

  

Re: Error report file is deleted automatically after spark application finished

2016-06-30 Thread dhruve ashar
There could be multiple of them, why its not being generated even after
setting the ulimit appropriately.

Try out the options listed on this thread:
http://stackoverflow.com/questions/7732983/core-dump-file-is-not-generated


On Thu, Jun 30, 2016 at 2:25 AM, prateek arora 
wrote:

> Thanks for the information. My problem is resolved now .
>
>
>
> I have one more issue.
>
>
>
> I am not able to save core dump file. Always shows *“# Failed to write
> core dump. Core dumps have been disabled. To enable core dumping, try
> "ulimit -c unlimited" before starting Java again"*
>
>
>
> I set core dump limit to unlimited in all nodes. Using below settings
>Edit /etc/security/limits.conf file and add  " * soft core unlimited "
> line.
>
> I rechecked  using :  $ ulimit -all
>
> core file size  (blocks, -c) unlimited
> data seg size   (kbytes, -d) unlimited
> scheduling priority (-e) 0
> file size   (blocks, -f) unlimited
> pending signals (-i) 241204
> max locked memory   (kbytes, -l) 64
> max memory size (kbytes, -m) unlimited
> open files  (-n) 1024
> pipe size(512 bytes, -p) 8
> POSIX message queues (bytes, -q) 819200
> real-time priority  (-r) 0
> stack size  (kbytes, -s) 8192
> cpu time   (seconds, -t) unlimited
> max user processes  (-u) 241204
> virtual memory  (kbytes, -v) unlimited
> file locks  (-x) unlimited
>
> but when my spark application  crash ,  show error " Failed to
> write core dump. Core dumps have been disabled. To enablecore dumping, try
> "ulimit -c unlimited" before starting Java again”.
>
>
> Regards
>
> Prateek
>
>
>
>
>
> On Wed, Jun 29, 2016 at 9:30 PM, dhruve ashar 
> wrote:
>
>> You can look at the yarn-default configuration file.
>>
>> Check your log related settings to see if log aggregation is enabled or
>> also the log retention duration to see if its too small and files are being
>> deleted.
>>
>> On Wed, Jun 29, 2016 at 4:47 PM, prateek arora <
>> prateek.arora...@gmail.com> wrote:
>>
>>>
>>> Hi
>>>
>>> My Spark application was crashed and show information
>>>
>>> LogType:stdout
>>> Log Upload Time:Wed Jun 29 14:38:03 -0700 2016
>>> LogLength:1096
>>> Log Contents:
>>> #
>>> # A fatal error has been detected by the Java Runtime Environment:
>>> #
>>> #  SIGILL (0x4) at pc=0x7f67baa0d221, pid=12207, tid=140083473176320
>>> #
>>> # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build
>>> 1.7.0_67-b01)
>>> # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode
>>> linux-amd64 compressed oops)
>>> # Problematic frame:
>>> # C  [libcaffe.so.1.0.0-rc3+0x786221]  sgemm_kernel+0x21
>>> #
>>> # Failed to write core dump. Core dumps have been disabled. To enable
>>> core
>>> dumping, try "ulimit -c unlimited" before starting Java again
>>> #
>>> # An error report file with more information is saved as:
>>> #
>>>
>>> /yarn/nm/usercache/ubuntu/appcache/application_1467236060045_0001/container_1467236060045_0001_01_03/hs_err_pid12207.log
>>>
>>>
>>>
>>> but I am not able to found
>>>
>>> "/yarn/nm/usercache/ubuntu/appcache/application_1467236060045_0001/container_1467236060045_0001_01_03/hs_err_pid12207.log"
>>> file . its deleted  automatically after Spark application
>>>  finished
>>>
>>>
>>> how  to retain report file , i am running spark with yarn .
>>>
>>> Regards
>>> Prateek
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-report-file-is-deleted-automatically-after-spark-application-finished-tp27247.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> -Dhruve Ashar
>>
>>
>


-- 
-Dhruve Ashar


Remote RPC client disassociated

2016-06-30 Thread Joaquin Alzola
HI List,

I am launching this spark-submit job:

hadoop@testbedocg:/mnt/spark> bin/spark-submit --packages 
com.datastax.spark:spark-cassandra-connector_2.10:1.6.0 --jars 
/mnt/spark/lib/TargetHolding_pyspark-cassandra-0.3.5.jar spark_v2.py

spark_v2.py is:
from pyspark_cassandra import CassandraSparkContext, Row
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = 
SparkConf().setAppName("test").setMaster("spark://192.168.23.31:7077").set("spark.cassandra.connection.host",
 "192.168.23.31")
sc = CassandraSparkContext(conf=conf)
table = sc.cassandraTable("lebara_diameter_codes","nl_lebara_diameter_codes")
food_count = table.select("errorcode2001").groupBy("errorcode2001").count()
food_count.collect()


Error I get when running the above command:

[Stage 0:>  (0 + 3) / 
7]16/06/30 10:40:36 ERROR TaskSchedulerImpl: Lost executor 0 on as5: Remote RPC 
client disassociated. Likely due to containers exceeding thresholds, or network 
issues. Check driver logs for WARN messages.
[Stage 0:>  (0 + 7) / 
7]16/06/30 10:40:40 ERROR TaskSchedulerImpl: Lost executor 1 on as4: Remote RPC 
client disassociated. Likely due to containers exceeding thresholds, or network 
issues. Check driver logs for WARN messages.
[Stage 0:>  (0 + 5) / 
7]16/06/30 10:40:42 ERROR TaskSchedulerImpl: Lost executor 3 on as5: Remote RPC 
client disassociated. Likely due to containers exceeding thresholds, or network 
issues. Check driver logs for WARN messages.
[Stage 0:>  (0 + 4) / 
7]16/06/30 10:40:46 ERROR TaskSchedulerImpl: Lost executor 4 on as4: Remote RPC 
client disassociated. Likely due to containers exceeding thresholds, or network 
issues. Check driver logs for WARN messages.
16/06/30 10:40:46 ERROR TaskSetManager: Task 5 in stage 0.0 failed 4 times; 
aborting job
Traceback (most recent call last):
  File "/mnt/spark-1.6.1-bin-hadoop2.6/spark_v2.py", line 11, in 
food_count = table.select("errorcode2001").groupBy("errorcode2001").count()
  File "/mnt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
  File "/mnt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
  File "/mnt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
  File "/mnt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 771, in collect
  File "/mnt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, 
in __call__
  File "/mnt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in 
get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in 
stage 0.0 failed 4 times, most recent failure: Lost task 5.3 in stage 0.0 (TID 
14, as4): ExecutorLostFailure (executor 4 exited caused by one of the running 
tasks) Reason: Remote RPC client disassociated. Likely due to containers 
exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at 

Re: Spark master shuts down when one of zookeeper dies

2016-06-30 Thread Ted Yu
Looking at Master.scala, I don't see code that would bring master back up
automatically.
Probably you can implement monitoring tool so that you get some alert when
master goes down.

e.g.
http://stackoverflow.com/questions/12896998/how-to-set-up-alerts-on-ganglia

More experienced users may have better suggestion.

On Thu, Jun 30, 2016 at 2:09 AM, vimal dinakaran 
wrote:

> Hi Ted,
>  Thanks for the pointers. I had a three node zookeeper setup . Now the
> master alone dies when  a zookeeper instance is down and a new master is
> elected as leader and the cluster is up.
> But the master that was down , never comes up.
>
> Is this the expected ? Is there a way to get alert when a master is down ?
> How to make sure that there is atleast one back up master is up always ?
>
> Thanks
> Vimal
>
>
>
>
> On Tue, Jun 28, 2016 at 7:24 PM, Ted Yu  wrote:
>
>> Please see some blog w.r.t. the number of nodes in the quorum:
>>
>>
>> http://stackoverflow.com/questions/13022244/zookeeper-reliability-three-versus-five-nodes
>>
>> http://www.ibm.com/developerworks/library/bd-zookeeper/
>>   the paragraph starting with 'A quorum is represented by a strict
>> majority of nodes'
>>
>> FYI
>>
>> On Tue, Jun 28, 2016 at 5:52 AM, vimal dinakaran 
>> wrote:
>>
>>> I am using zookeeper for providing HA for spark cluster.  We have two
>>> nodes zookeeper cluster.
>>>
>>> When one of the zookeeper dies then the entire spark cluster goes down .
>>>
>>> Is this expected behaviour ?
>>> Am I missing something in config ?
>>>
>>> Spark version - 1.6.1.
>>> Zookeeper version - 3.4.6
>>> // spark-env.sh
>>> SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
>>> -Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181"
>>>
>>> Below is the log from spark master:
>>> ZooKeeperLeaderElectionAgent: We have lost leadership
>>> 16/06/27 09:39:30 ERROR Master: Leadership has been revoked -- master
>>> shutting down.
>>>
>>> Thanks
>>> Vimal
>>>
>>>
>>>
>>>
>>
>


Re: Spark master shuts down when one of zookeeper dies

2016-06-30 Thread vimal dinakaran
Hi Ted,
 Thanks for the pointers. I had a three node zookeeper setup . Now the
master alone dies when  a zookeeper instance is down and a new master is
elected as leader and the cluster is up.
But the master that was down , never comes up.

Is this the expected ? Is there a way to get alert when a master is down ?
How to make sure that there is atleast one back up master is up always ?

Thanks
Vimal




On Tue, Jun 28, 2016 at 7:24 PM, Ted Yu  wrote:

> Please see some blog w.r.t. the number of nodes in the quorum:
>
>
> http://stackoverflow.com/questions/13022244/zookeeper-reliability-three-versus-five-nodes
>
> http://www.ibm.com/developerworks/library/bd-zookeeper/
>   the paragraph starting with 'A quorum is represented by a strict
> majority of nodes'
>
> FYI
>
> On Tue, Jun 28, 2016 at 5:52 AM, vimal dinakaran 
> wrote:
>
>> I am using zookeeper for providing HA for spark cluster.  We have two
>> nodes zookeeper cluster.
>>
>> When one of the zookeeper dies then the entire spark cluster goes down .
>>
>> Is this expected behaviour ?
>> Am I missing something in config ?
>>
>> Spark version - 1.6.1.
>> Zookeeper version - 3.4.6
>> // spark-env.sh
>> SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
>> -Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181"
>>
>> Below is the log from spark master:
>> ZooKeeperLeaderElectionAgent: We have lost leadership
>> 16/06/27 09:39:30 ERROR Master: Leadership has been revoked -- master
>> shutting down.
>>
>> Thanks
>> Vimal
>>
>>
>>
>>
>


One map per folder in spark or Hadoop

2016-06-30 Thread Balachandar R.A.
Hello,

I have some 100 folders. Each folder contains 5 files. I have an executable
that process one folder. The executable is a black box and hence it cannot
be modified.I would like to process 100 folders in parallel using Apache
spark so that I should be able to span a map task per folder. Can anyone
give me an idea? I have came across similar questions but with Hadoop and
answer was to use combineFileInputFormat and pathFilter. However, as I
said, I want to use Apache spark. Any idea?

Regards
Bala


Re: How to use scala.tools.nsc.interpreter.IMain in Spark, just like calling eval in Perl.

2016-06-30 Thread Jayant Shekhar
Hi Fanchao,

This is because it is unable to find the anonymous classes generated.

Adding the below code worked for me. I found the details here :
https://github.com/cloudera/livy/blob/master/repl/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala

// Spark 1.6 does not have "classServerUri"; instead, the local
directory where class files
// are stored needs to be registered in SparkConf. See comment in
// SparkILoop::createSparkContext().

Try(sparkIMain.getClass().getMethod("classServerUri")) match {
  case Success(method) =>
method.setAccessible(true)
conf.set("spark.repl.class.uri",
method.invoke(sparkIMain).asInstanceOf[String])

  case Failure(_) =>
val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory")
outputDir.setAccessible(true)
conf.set("spark.repl.class.outputDir",
  outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath())
}


Thanks,

Jayant



On Thu, Jun 30, 2016 at 12:34 AM, Fanchao Meng 
wrote:

> Hi Spark Community,
>
> I am trying to dynamically interpret code given as a String in Spark, just
> like calling the eval in Perl language. However, I got problem when running
> the program. Really appreciate for your help.
>
> **Requirement:**
>
> The requirement is to make the spark processing chain configurable. For
> example, customer could set the processing steps in configuration file as
> below. Steps:
>  1) textFile("files///")
>  2) flatMap(line => line.split(" "))
>  3) map(word => (word, 1))
>  4) reduceByKey(_ + _)
>  5) foreach(println)
>
> All above steps are defined in a configuration file.
> Then, the spark driver will load the configuration file and make the
> processing steps as a string, such as:
>
>  val processFlow =
>  """
>  sc.textFile("file:///input.txt").flatMap(line => line.split("
> ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
>  """
>
> Then, Spark will execute the piece of code defined in above variable
> processFlow.
>
> **Here is my Spark source code:**
>
> It is from word count sample, I just make the RDD methods invoked by
> interpreter as a string.
>
>  import org.apache.spark.SparkConf
>  import org.apache.spark.SparkContext
>  import scala.collection.mutable.{Map, ArraySeq}
>  import scala.tools.nsc.GenericRunnerSettings
>  import scala.tools.nsc.interpreter.IMain
>  class TestMain {
>def exec(): Unit = {
>  val out = System.out
>  val flusher = new java.io.PrintWriter(out)
>  val interpreter = {
>val settings = new GenericRunnerSettings( println _ )
>settings.usejavacp.value = true
>new IMain(settings, flusher)
>  }
>  val conf = new SparkConf().setAppName("TestMain")
>  val sc = new SparkContext(conf)
>  val methodChain =
>"""
>val textFile = sc.textFile("file:///input.txt")
>textFile.flatMap(line => line.split(" ")).map(word => (word,
> 1)).reduceByKey(_ + _).foreach(println)
>"""
>  interpreter.bind("sc", sc);
>  val resultFlag = interpreter.interpret(methodChain)
>}
>  }
>  object TestMain {
>def main(args: Array[String]) {
>  val testMain = new TestMain()
>  testMain.exec()
>  System.exit(0)
>}
>  }
>
> **Problem:**
>
> However, I got an error when running above Spark code (master=local), logs
> as below.
>
>  sc: org.apache.spark.SparkContext =
> org.apache.spark.SparkContext@7d87addd
>  org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: $anonfun$1
>  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>  at java.lang.Class.forName0(Native Method)
>  at java.lang.Class.forName(Class.java:270)
>  at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>  at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>  at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>  at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>  at
> 

Re: Set the node the spark driver will be started

2016-06-30 Thread Felix Massem
Hey Bryan,

yes this definitely sounds like the issue I have :-)

Thx a lot and best regards
Felix

Felix Massem | IT-Consultant | Karlsruhe
mobil: +49 (0) 172.2919848 <>

www.codecentric.de  | blog.codecentric.de 
 | www.meettheexperts.de 
 | www.more4fi.de 

Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche 
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige 
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie 
bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter 
Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter 
Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet.

> Am 29.06.2016 um 20:12 schrieb Bryan Cutler :
> 
> Hi Felix,
> 
> I think the problem you are describing has been fixed in later versions, 
> check out this JIRA https://issues.apache.org/jira/browse/SPARK-13803 
> 
> 
> 
> On Wed, Jun 29, 2016 at 9:27 AM, Mich Talebzadeh  > wrote:
> Fine. in standalone mode spark uses its own scheduling as opposed to Yarn or 
> anything else.
> 
> As a matter of interest can you start spark-submit from any node in the 
> cluster? Are these all have the same or similar CPU and RAM?
> 
> 
> HTH
> 
> Dr Mich Talebzadeh
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
> 
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
> 
> 
> On 29 June 2016 at 10:54, Felix Massem  > wrote:
> In addition we are not using Yarn we are using the standalone mode and the 
> driver will be started with the deploy-mode cluster
> 
> Thx Felix
> Felix Massem | IT-Consultant | Karlsruhe
> mobil: +49 (0) 172.2919848 <>
> 
> www.codecentric.de  | blog.codecentric.de 
>  | www.meettheexperts.de 
>  | www.more4fi.de 
> 
> Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
> 
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche 
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige 
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie 
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter 
> Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. 
> beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht 
> gestattet.
> 
>> Am 29.06.2016 um 11:13 schrieb Felix Massem > >:
>> 
>> Hey Mich,
>> 
>> the distribution is like not given. Just right now I have 15 applications 
>> and all 15 drivers are running on one node. This is just after giving all 
>> machines a little more memory.
>> Before I had like 15 applications and about 13 driver where running on one 
>> machine. While trying to submit a new job I got OOM exceptions which took 
>> down my cassandra service only to start the driver on the same node where  
>> all the other 13 drivers where running.
>> 
>> Thx and best regards
>> Felix
>> 
>> 
>> Felix Massem | IT-Consultant | Karlsruhe
>> mobil: +49 (0) 172.2919848 <>
>> 
>> www.codecentric.de  | blog.codecentric.de 
>>  | www.meettheexperts.de 
>>  | www.more4fi.de 
>> 
>> Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>> 
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche 
>> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige 
>> Adressat sind oder diese E-Mail irrtümlich erhalten haben, 

答复: deploy-mode flag in spark-sql cli

2016-06-30 Thread Huang Meilong

Thank you, I got it.


发件人: Mich Talebzadeh 
发送时间: 2016年6月30日 14:52
收件人: Saisai Shao
抄送: Huang Meilong; user@spark.apache.org
主题: Re: deploy-mode flag in spark-sql cli

Yes I forgot that anything with REPL both spark-sql and spark-shell are simple 
convenience interfaces.

Thanks Saisai for pointing out.


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 30 June 2016 at 06:01, Saisai Shao 
> wrote:
I think you cannot use sql client in the cluster mode, also for 
spark-shell/pyspark which has a repl, all these application can only be started 
with client deploy mode.

On Thu, Jun 30, 2016 at 12:46 PM, Mich Talebzadeh 
> wrote:
Hi,


When you use spark-shell or for that matter spark-sql, you are staring 
spark-submit under the bonnet. These two shells are created to make life easier 
to work on Spark.


However, if you look at what $SPARK_HOME/bin/spark-sql does in the script, you 
will notice my point:



exec "${SPARK_HOME}"/bin/spark-submit --class 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"


 So that is basically spark-submit JVM



Since it is using spark-submit it takes all the parameters related to 
spark-submit. You can test it using your own customised shell script with 
parameters passed.


${SPARK_HOME}/bin/spark-submit \
--driver-memory xG \
--num-executors n \
--executor-memory xG \
--executor-cores m \
--master yarn \
--deploy-mode cluster \

--class 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"


Does your version of spark support cluster mode?


HTH




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 30 June 2016 at 05:16, Huang Meilong 
> wrote:

Hello,


I added deploy-mode flag in spark-sql cli like this:


$ spark-sql --deploy-mode cluster --master yarn -e "select * from mx"


It showed error saying "Cluster deploy mode is not applicable to Spark SQL 
shell", but "spark-sql --help" shows "--deploy-mode" option. Is this a bug?





How to use scala.tools.nsc.interpreter.IMain in Spark, just like calling eval in Perl.

2016-06-30 Thread Fanchao Meng
Hi Spark Community,

I am trying to dynamically interpret code 
given as a String in Spark, just like calling the eval in Perl language.
 However, I got problem when running the program. Really appreciate for 
your help.

**Requirement:**

The requirement is to make the
 spark processing chain configurable. For example, customer could set 
the processing steps in configuration file as below. Steps:
 1) textFile("files///") 
 2) flatMap(line => line.split(" ")) 
 3) map(word => (word, 1)) 
 4) reduceByKey(_ + _) 
 5) foreach(println)

All above steps are defined in a configuration file.
Then, the spark driver will load the configuration file and make the processing 
steps as a string, such as:

 val processFlow = 
 """

 sc.textFile("file:///input.txt").flatMap(line => line.split(" 
")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
 """

Then, Spark will execute the piece of code defined in above variable 
processFlow.

**Here is my Spark source code:**

It is from word count sample, I just make the RDD methods invoked by 
interpreter as a string.

 import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import scala.collection.mutable.{Map, ArraySeq}
 import scala.tools.nsc.GenericRunnerSettings
 import scala.tools.nsc.interpreter.IMain
 class TestMain {
   def exec(): Unit = {
 val out = System.out
 val flusher = new java.io.PrintWriter(out)
 val interpreter = {
   val settings = new GenericRunnerSettings( println _ )
   settings.usejavacp.value = true
   new IMain(settings, flusher)
 }
 val conf = new SparkConf().setAppName("TestMain")
 val sc = new SparkContext(conf)
 val methodChain =
   """
   val textFile = sc.textFile("file:///input.txt")
   textFile.flatMap(line => line.split(" ")).map(word => (word, 
1)).reduceByKey(_ + _).foreach(println)
   """
 interpreter.bind("sc", sc);
 val resultFlag = interpreter.interpret(methodChain)
   }
 }
 object TestMain {
   def main(args: Array[String]) {
 val testMain = new TestMain()
 testMain.exec()
 System.exit(0)
   }
 }

**Problem:**

However, I got an error when running above Spark code (master=local), logs as 
below. 

 sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7d87addd

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in 
stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: 
$anonfun$1
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
 at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
 at 

Change spark dataframe to LabeledPoint in Java

2016-06-30 Thread Abhishek Anand
Hi ,

I have a dataframe which i want to convert to labeled point.

DataFrame labeleddf = model.transform(newdf).select("label","features");

How can I convert this to a LabeledPoint to use in my Logistic Regression
model.

I could do this in scala using
val trainData = labeleddf.map(row =>
LabeledPoint(row.getDouble(0), row(1).asInstanceOf[Vector])).cache()


How to achieve the same in Java,

Thanks,
Abhi


Re: Error report file is deleted automatically after spark application finished

2016-06-30 Thread prateek arora
Thanks for the information. My problem is resolved now .



I have one more issue.



I am not able to save core dump file. Always shows *“# Failed to write core
dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c
unlimited" before starting Java again"*



I set core dump limit to unlimited in all nodes. Using below settings
   Edit /etc/security/limits.conf file and add  " * soft core unlimited "
line.

I rechecked  using :  $ ulimit -all

core file size  (blocks, -c) unlimited
data seg size   (kbytes, -d) unlimited
scheduling priority (-e) 0
file size   (blocks, -f) unlimited
pending signals (-i) 241204
max locked memory   (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files  (-n) 1024
pipe size(512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority  (-r) 0
stack size  (kbytes, -s) 8192
cpu time   (seconds, -t) unlimited
max user processes  (-u) 241204
virtual memory  (kbytes, -v) unlimited
file locks  (-x) unlimited

but when my spark application  crash ,  show error " Failed to
write core dump. Core dumps have been disabled. To enablecore dumping, try
"ulimit -c unlimited" before starting Java again”.


Regards

Prateek





On Wed, Jun 29, 2016 at 9:30 PM, dhruve ashar  wrote:

> You can look at the yarn-default configuration file.
>
> Check your log related settings to see if log aggregation is enabled or
> also the log retention duration to see if its too small and files are being
> deleted.
>
> On Wed, Jun 29, 2016 at 4:47 PM, prateek arora  > wrote:
>
>>
>> Hi
>>
>> My Spark application was crashed and show information
>>
>> LogType:stdout
>> Log Upload Time:Wed Jun 29 14:38:03 -0700 2016
>> LogLength:1096
>> Log Contents:
>> #
>> # A fatal error has been detected by the Java Runtime Environment:
>> #
>> #  SIGILL (0x4) at pc=0x7f67baa0d221, pid=12207, tid=140083473176320
>> #
>> # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build
>> 1.7.0_67-b01)
>> # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode
>> linux-amd64 compressed oops)
>> # Problematic frame:
>> # C  [libcaffe.so.1.0.0-rc3+0x786221]  sgemm_kernel+0x21
>> #
>> # Failed to write core dump. Core dumps have been disabled. To enable core
>> dumping, try "ulimit -c unlimited" before starting Java again
>> #
>> # An error report file with more information is saved as:
>> #
>>
>> /yarn/nm/usercache/ubuntu/appcache/application_1467236060045_0001/container_1467236060045_0001_01_03/hs_err_pid12207.log
>>
>>
>>
>> but I am not able to found
>>
>> "/yarn/nm/usercache/ubuntu/appcache/application_1467236060045_0001/container_1467236060045_0001_01_03/hs_err_pid12207.log"
>> file . its deleted  automatically after Spark application
>>  finished
>>
>>
>> how  to retain report file , i am running spark with yarn .
>>
>> Regards
>> Prateek
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-report-file-is-deleted-automatically-after-spark-application-finished-tp27247.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> -Dhruve Ashar
>
>


Re: deploy-mode flag in spark-sql cli

2016-06-30 Thread Mich Talebzadeh
Yes I forgot that anything with REPL both spark-sql and spark-shell are
simple convenience interfaces.

Thanks Saisai for pointing out.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 30 June 2016 at 06:01, Saisai Shao  wrote:

> I think you cannot use sql client in the cluster mode, also for
> spark-shell/pyspark which has a repl, all these application can only be
> started with client deploy mode.
>
> On Thu, Jun 30, 2016 at 12:46 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> When you use spark-shell or for that matter spark-sql, you are staring
>> spark-submit under the bonnet. These two shells are created to make life
>> easier to work on Spark.
>>
>>
>> However, if you look at what $SPARK_HOME/bin/spark-sql does in the
>> script, you will notice my point:
>>
>>
>>
>> exec "${SPARK_HOME}"/bin/spark-submit --class
>> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
>>
>>  So that is basically spark-submit JVM
>>
>>
>>
>> Since it is using spark-submit it takes all the parameters related to
>> spark-submit. You can test it using your own customised shell script with
>> parameters passed.
>>
>>
>> ${SPARK_HOME}/bin/spark-submit \
>> --driver-memory xG \
>> --num-executors n \
>> --executor-memory xG \
>> --executor-cores m \
>> --master yarn \
>> --deploy-mode cluster \
>>
>> --class
>> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
>>
>>
>> Does your version of spark support cluster mode?
>>
>>
>> HTH
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 30 June 2016 at 05:16, Huang Meilong  wrote:
>>
>>> Hello,
>>>
>>>
>>> I added deploy-mode flag in spark-sql cli like this:
>>>
>>> $ spark-sql --deploy-mode cluster --master yarn -e "select * from mx"
>>>
>>>
>>> It showed error saying "Cluster deploy mode is not applicable to Spark
>>> SQL shell", but "spark-sql --help" shows "--deploy-mode" option. Is
>>> this a bug?
>>>
>>
>>
>


Re: Using R code as part of a Spark Application

2016-06-30 Thread Sun Rui
I would guess that the technology behind Azure R Server is about Revolution 
Enterprise DistributedR/ScaleR. I don’t know the details, but the statement in 
the “Step 6. Install R packages” section in the given documentation page.
However, if you need to install R packages on the worker nodes of the 
cluster, you must use a Script Action.

That implies that R should be installed on each worker node.

> On Jun 30, 2016, at 05:53, John Aherne  > wrote:
> 
> I don't think R server requires R on the executor nodes. I originally set up 
> a SparkR cluster for our Data Scientist on Azure which required that I 
> install R on each node, but for the R Server set up, there is an extra edge 
> node with R server that they connect to. From what little research I was able 
> to do, it seems that there are some special functions in R Server that can 
> distribute the work to the cluster. 
> 
> Documentation is light, and hard to find but I found this helpful:
> https://blogs.msdn.microsoft.com/uk_faculty_connection/2016/05/10/r-server-for-hdinsight-running-on-microsoft-azure-cloud-data-science-challenges/
>  
> 
> 
> 
> 
> On Wed, Jun 29, 2016 at 3:29 PM, Sean Owen  > wrote:
> Oh, interesting: does this really mean the return of distributing R
> code from driver to executors and running it remotely, or do I
> misunderstand? this would require having R on the executor nodes like
> it used to?
> 
> On Wed, Jun 29, 2016 at 5:53 PM, Xinh Huynh  > wrote:
> > There is some new SparkR functionality coming in Spark 2.0, such as
> > "dapply". You could use SparkR to load a Parquet file and then run "dapply"
> > to apply a function to each partition of a DataFrame.
> >
> > Info about loading Parquet file:
> > http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/sparkr.html#from-data-sources
> >  
> > 
> >
> > API doc for "dapply":
> > http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/api/R/index.html
> >  
> > 
> >
> > Xinh
> >
> > On Wed, Jun 29, 2016 at 6:54 AM, sujeet jog  > > wrote:
> >>
> >> try Spark pipeRDD's , you can invoke the R script from pipe , push  the
> >> stuff you want to do on the Rscript stdin,  p
> >>
> >>
> >> On Wed, Jun 29, 2016 at 7:10 PM, Gilad Landau  >> >
> >> wrote:
> >>>
> >>> Hello,
> >>>
> >>>
> >>>
> >>> I want to use R code as part of spark application (the same way I would
> >>> do with Scala/Python).  I want to be able to run an R syntax as a map
> >>> function on a big Spark dataframe loaded from a parquet file.
> >>>
> >>> Is this even possible or the only way to use R is as part of RStudio
> >>> orchestration of our Spark  cluster?
> >>>
> >>>
> >>>
> >>> Thanks for the help!
> >>>
> >>>
> >>>
> >>> Gilad
> >>>
> >>>
> >>
> >>
> >
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> John Aherne
> Big Data and SQL Developer
> 
> 
> Cell:
> Email:
> Skype:
> Web:
> 
> +1 (303) 809-9718
> john.ahe...@justenough.com 
> john.aherne.je 
> www.justenough.com 
> 
> Confidentiality Note: The information contained in this email and document(s) 
> attached are for the exclusive use of the addressee and may contain 
> confidential, privileged and non-disclosable information. If the recipient of 
> this email is not the addressee, such recipient is strictly prohibited from 
> reading, photocopying, distribution or otherwise using this email or its 
> contents in any way.
> 



Re: Using R code as part of a Spark Application

2016-06-30 Thread sujeet jog
Thanks for the link Sun,  I believe running external Scripts like R code in
Data Frames is a much needed facility,  for example for the algorithms that
are not available in MLLIB, invoking such from a R script would definitely
be a powerful feature when your APP is Scala/Python based,  you don;t have
to use Spark-R for this sake when much of your application code is in
Scala/python.

On Thu, Jun 30, 2016 at 8:25 AM, Sun Rui  wrote:

> Hi, Gilad,
>
> You can try the dapply() and gapply() function in SparkR in Spark 2.0.
> Yes, it is required that R installed on each worker node.
>
> However, if your Spark application is Scala/Java based, it is not
> supported for now to run R code in DataFrames. There is closed lira
> https://issues.apache.org/jira/browse/SPARK-14746 which remains
> discussion purpose. You have to convert DataFrames to RDDs, and use pipe()
> on RDDs to launch external R processes and run R code.
>
> On Jun 30, 2016, at 07:08, Xinh Huynh  wrote:
>
> It looks like it. "DataFrame UDFs in R" is resolved in Spark 2.0:
> https://issues.apache.org/jira/browse/SPARK-6817
>
> Here's some of the code:
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
>
> /**
> * A function wrapper that applies the given R function to each partition.
> */
> private[sql] case class MapPartitionsRWrapper(
> func: Array[Byte],
> packageNames: Array[Byte],
> broadcastVars: Array[Broadcast[Object]],
> inputSchema: StructType,
> outputSchema: StructType) extends (Iterator[Any] => Iterator[Any])
>
> Xinh
>
> On Wed, Jun 29, 2016 at 2:59 PM, Sean Owen  wrote:
>
>> Here we (or certainly I) am not talking about R Server, but plain vanilla
>> R, as used with Spark and SparkR. Currently, SparkR doesn't distribute R
>> code at all (it used to, sort of), so I'm wondering if that is changing
>> back.
>>
>> On Wed, Jun 29, 2016 at 10:53 PM, John Aherne > > wrote:
>>
>>> I don't think R server requires R on the executor nodes. I originally
>>> set up a SparkR cluster for our Data Scientist on Azure which required that
>>> I install R on each node, but for the R Server set up, there is an extra
>>> edge node with R server that they connect to. From what little research I
>>> was able to do, it seems that there are some special functions in R Server
>>> that can distribute the work to the cluster.
>>>
>>> Documentation is light, and hard to find but I found this helpful:
>>>
>>> https://blogs.msdn.microsoft.com/uk_faculty_connection/2016/05/10/r-server-for-hdinsight-running-on-microsoft-azure-cloud-data-science-challenges/
>>>
>>>
>>>
>>> On Wed, Jun 29, 2016 at 3:29 PM, Sean Owen  wrote:
>>>
 Oh, interesting: does this really mean the return of distributing R
 code from driver to executors and running it remotely, or do I
 misunderstand? this would require having R on the executor nodes like
 it used to?

 On Wed, Jun 29, 2016 at 5:53 PM, Xinh Huynh 
 wrote:
 > There is some new SparkR functionality coming in Spark 2.0, such as
 > "dapply". You could use SparkR to load a Parquet file and then run
 "dapply"
 > to apply a function to each partition of a DataFrame.
 >
 > Info about loading Parquet file:
 >
 http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/sparkr.html#from-data-sources
 >
 > API doc for "dapply":
 >
 http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/api/R/index.html
 >
 > Xinh
 >
 > On Wed, Jun 29, 2016 at 6:54 AM, sujeet jog 
 wrote:
 >>
 >> try Spark pipeRDD's , you can invoke the R script from pipe , push
 the
 >> stuff you want to do on the Rscript stdin,  p
 >>
 >>
 >> On Wed, Jun 29, 2016 at 7:10 PM, Gilad Landau <
 gilad.lan...@clicktale.com>
 >> wrote:
 >>>
 >>> Hello,
 >>>
 >>>
 >>>
 >>> I want to use R code as part of spark application (the same way I
 would
 >>> do with Scala/Python).  I want to be able to run an R syntax as a
 map
 >>> function on a big Spark dataframe loaded from a parquet file.
 >>>
 >>> Is this even possible or the only way to use R is as part of RStudio
 >>> orchestration of our Spark  cluster?
 >>>
 >>>
 >>>
 >>> Thanks for the help!
 >>>
 >>>
 >>>
 >>> Gilad
 >>>
 >>>
 >>
 >>
 >

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>>
>>> --
>>>
>>> John Aherne
>>> Big Data and SQL Developer
>>>
>>> [image: JustEnough Logo]
>>>
>>> Cell:
>>> Email:
>>> Skype:
>>> Web:
>>>
>>> +1 (303) 809-9718
>>> john.ahe...@justenough.com
>>> john.aherne.je
>>> www.justenough.com
>>>
>>>
>>>