Re: Speed Benchmark

2015-02-27 Thread Davies Liu
No. It should not be that slow. In my Mac, it took 1.4 minutes to do
`rdd.count()` on 4.3G text file ( 25M / s / CPU).

Could you turn on profile in pyspark to see what happened in Python process?

spark.python.profile = true

On Fri, Feb 27, 2015 at 4:14 PM, Guillaume Guy
 wrote:
> It is a simple text file.
>
> I'm not using SQL. just doing a rdd.count() on it. Does the bug affect it?
>
>
> On Friday, February 27, 2015, Davies Liu  wrote:
>>
>> What is this dataset? text file or parquet file?
>>
>> There is an issue with serialization in Spark SQL, which will make it
>> very slow, see https://issues.apache.org/jira/browse/SPARK-6055, will
>> be fixed very soon.
>>
>> Davies
>>
>> On Fri, Feb 27, 2015 at 1:59 PM, Guillaume Guy
>>  wrote:
>> > Hi Sean:
>> >
>> > Thanks for your feedback. Scala is much faster. The count is performed
>> > in ~1
>> > minutes (vs 17min). I would expect scala to be 2-5X faster but this gap
>> > seems to be more than that. Is that also your conclusion?
>> >
>> > Thanks.
>> >
>> >
>> > Best,
>> >
>> > Guillaume Guy
>> >  +1 919 - 972 - 8750
>> >
>> > On Fri, Feb 27, 2015 at 9:12 AM, Sean Owen  wrote:
>> >>
>> >> That's very slow, and there are a lot of possible explanations. The
>> >> first one that comes to mind is: I assume your YARN and HDFS are on
>> >> the same machines, but are you running executors on all HDFS nodes
>> >> when you run this? if not, a lot of these reads could be remote.
>> >>
>> >> You have 6 executor slots, but your data exists in 96 blocks on HDFS.
>> >> You could read with up to 96-way parallelism. You say you're CPU-bound
>> >> though, but normally I'd wonder if this was simply a case of
>> >> under-using parallelism.
>> >>
>> >> I also wonder if the bottleneck is something to do with pyspark in
>> >> this case; might be good to just try it in the spark-shell to check.
>> >>
>> >> On Fri, Feb 27, 2015 at 2:00 PM, Guillaume Guy
>> >>  wrote:
>> >> > Dear Spark users:
>> >> >
>> >> > I want to see if anyone has an idea of the performance for a small
>> >> > cluster.
>> >> >
>> >> > Reading from HDFS, what should be the performance of  a count()
>> >> > operation on
>> >> > an 10GB RDD with 100M rows using pyspark. I looked into the CPU
>> >> > usage,
>> >> > all 6
>> >> > are at 100%.
>> >> >
>> >> > Details:
>> >> >
>> >> > master yarn-client
>> >> > num-executors 3
>> >> > executor-cores 2
>> >> > driver-memory 5g
>> >> > executor-memory 2g
>> >> > Distribution: Cloudera
>> >> >
>> >> > I also attached the screenshot.
>> >> >
>> >> > Right now, I'm at 17 minutes which seems quite slow. Any idea how a
>> >> > decent
>> >> > performance with similar configuration?
>> >> >
>> >> > If it's way off, I would appreciate any pointers as to ways to
>> >> > improve
>> >> > performance.
>> >> >
>> >> > Thanks.
>> >> >
>> >> > Best,
>> >> >
>> >> > Guillaume
>> >> >
>> >> >
>> >> > -
>> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>> >
>
>
>
> --
>
> Best,
>
> Guillaume Guy
>  +1 919 - 972 - 8750
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Number of cores per executor on Spark Standalone

2015-02-27 Thread bit1...@163.com
Hi ,

I know that spark on yarn has a configuration parameter(executor-cores NUM) to  
specify the number of cores per executor. 
How about spark standalone? I can specify the total cores, but how could I know 
how many cores each executor will take(presume one node one executor)?




bit1...@163.com


Some questions after playing a little with the new ml.Pipeline.

2015-02-27 Thread Jaonary Rabarisoa
Dear all,


We mainly do large scale computer vision task (image classification,
retrieval, ...). The pipeline is really great stuff for that. We're trying
to reproduce the tutorial given on that topic during the latest spark
summit (
http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
)
using the master version of spark pipeline and dataframe. The tutorial
shows different examples of feature extraction stages before running
machine learning algorithms. Even the tutorial is straightforward to
reproduce with this new API, we still have some questions :

   - Can one use external tools (e.g via pipe) as a pipeline stage ? An
   example of use case is to extract feature learned with convolutional neural
   network. In our case, this corresponds to a pre-trained neural network with
   Caffe library (http://caffe.berkeleyvision.org/) .


   - The second question is about the performance of the pipeline. Library
   such as Caffe processes the data in batch and instancing one Caffe network
   can be time consuming when this network is very deep. So, we can gain
   performance if we minimize the number of Caffe network creation and give
   data in batch to the network. In the pipeline, this corresponds to run
   transformers that work on a partition basis and give the whole partition to
   a single caffe network. How can we create such a transformer ?



Best,

Jao


Re: Spark partial data in memory/and partial in disk

2015-02-27 Thread Akhil Das
You can use persist(StorageLevel.MEMORY_AND_DISK) if you are not having
sufficient memory to cache everything.

Thanks
Best Regards

On Fri, Feb 27, 2015 at 7:20 PM, Siddharth Ubale <
siddharth.ub...@syncoms.com> wrote:

>  Hi,
>
>
>
> How do we manage putting partial data in to memory and partial into disk
> where data resides in hive table ?
>
> We have tried using the available documentation but unable to go ahead
> with above approach , we are only able to cache the entire table or uncache
> it.
>
>
>
> Thanks,
>
> Siddharth Ubale,
>
> *Synchronized Communications *
>
> *#43, Velankani Tech Park, Block No. II, *
>
> *3rd Floor, Electronic City Phase I,*
>
> *Bangalore – 560 100*
>
> *Tel : +91 80 3202 4060*
>
> *Web:* *www.syncoms.com* 
>
> *[image: LogoNEWmohLARGE]*
>
> *London*|*Bangalore*|*Orlando*
>
>
>
> *we innovate, plan, execute, and transform the business​*
>
>
>


Re: Errors in spark

2015-02-27 Thread Yana Kadiyska
I was actually just able to reproduce the  issue. I do wonder if this is a
bug -- the docs say "When not configured by the hive-site.xml, the context
automatically creates metastore_db and warehouse in the current directory."
But as you can see in from the message warehouse is not in the current
directory, it is under /user/hive. In my case this directory was owned by
'root' and noone else had write permissions. Changing the permissions works
if you need to get unblocked quickly...But it does seem like a bug to me...


On Fri, Feb 27, 2015 at 11:21 AM, sandeep vura 
wrote:

> Hi yana,
>
> I have removed hive-site.xml from spark/conf directory but still getting
> the same errors. Anyother way to work around.
>
> Regards,
> Sandeep
>
> On Fri, Feb 27, 2015 at 9:38 PM, Yana Kadiyska 
> wrote:
>
>> I think you're mixing two things: the docs say "When* not *configured by
>> the hive-site.xml, the context automatically creates metastore_db and
>> warehouse in the current directory.". AFAIK if you want a local
>> metastore, you don't put hive-site.xml anywhere. You only need the file if
>> you're going to point to an external metastore. If you're pointing to an
>> external metastore, in my experience I've also had to copy core-site.xml
>> into conf in order to specify this property:  fs.defaultFS
>>
>> On Fri, Feb 27, 2015 at 10:39 AM, sandeep vura 
>> wrote:
>>
>>> Hi Sparkers,
>>>
>>> I am using hive version - hive 0.13 and copied hive-site.xml in
>>> spark/conf and using default derby local metastore .
>>>
>>> While creating a table in spark shell getting the following error ..Can
>>> any one please look and give solution asap..
>>>
>>> sqlContext.sql("CREATE TABLE IF NOT EXISTS sandeep (key INT, value
>>> STRING)")
>>> 15/02/27 23:06:13 ERROR RetryingHMSHandler:
>>> MetaException(message:file:/user/hive/warehouse_1/sandeep is not a
>>> directory or unable to create one)
>>> at
>>> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
>>> at
>>> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:622)
>>> at
>>> org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
>>> at
>>> com.sun.proxy.$Proxy12.create_table_with_environment_context(Unknown Source)
>>> at
>>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:558)
>>> at
>>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:547)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:622)
>>> at
>>> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
>>> at com.sun.proxy.$Proxy13.createTable(Unknown Source)
>>> at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:613)
>>> at
>>> org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4189)
>>> at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:281)
>>> at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
>>> at
>>> org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
>>> at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503)
>>> at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270)
>>> at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088)
>>> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
>>> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
>>> at
>>> org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
>>> at
>>> org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
>>> at
>>> org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
>>> at
>>> org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
>>> at
>>> org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
>>> at
>>> org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
>>> at
>>> org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
>

Re: java.util.NoSuchElementException: key not found:

2015-02-27 Thread Shixiong Zhu
RDD is not thread-safe. You should not use it in multiple threads.

Best Regards,
Shixiong Zhu

2015-02-27 23:14 GMT+08:00 rok :

> I'm seeing this java.util.NoSuchElementException: key not found: exception
> pop up sometimes when I run operations on an RDD from multiple threads in a
> python application. It ends up shutting down the SparkContext so I'm
> assuming this is a bug -- from what I understand, I should be able to run
> operations on the same RDD from multiple threads or is this not
> recommended?
>
> I can't reproduce it all the time and I've tried eliminating caching
> wherever possible to see if that would have an effect, but it doesn't seem
> to. Each thread first splits the base RDD and then runs the
> LogisticRegressionWithSGD on the subset.
>
> Is there a workaround to this exception?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/java-util-NoSuchElementException-key-not-found-tp21848.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


how to improve performance of spark job with large input to executor?

2015-02-27 Thread ey-chih chow
Hi,

I ran a spark job.  Each executor is allocated a chuck of input data.  For
the executor with a small chunk of input data, the performance is reasonable
good.  But for the executor with a large chunk of input data, the
performance is not good.  How can I tune Spark configuration parameters to
have better performance for large input data?  Thanks.


Ey-Chih Chow 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-improve-performance-of-spark-job-with-large-input-to-executor-tp21856.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: JLine hangs under Windows8

2015-02-27 Thread Cheng, Hao
It works after adding the -Djline.terminal=jline.UnsupportedTerminal 

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com] 
Sent: Saturday, February 28, 2015 10:24 AM
To: user@spark.apache.org
Subject: JLine hangs under Windows8

Hi, All
I was trying to run spark sql cli on windows 8 for debugging purpose, 
however, seems the JLine hangs in waiting input after "ENTER" key, I didn't see 
that under Linux, is there anybody meet the same issue?

The call stack as below:
"main" prio=6 tid=0x02548800 nid=0x17cc runnable [0x0253e000]
   java.lang.Thread.State: RUNNABLE
at jline.WindowsTerminal.readByte(Native Method)
at jline.WindowsTerminal.readCharacter(WindowsTerminal.java:233)
at jline.WindowsTerminal.readVirtualKey(WindowsTerminal.java:319)
at jline.ConsoleReader.readVirtualKey(ConsoleReader.java:1453)
at jline.ConsoleReader.readBinding(ConsoleReader.java:654)
at jline.ConsoleReader.readLine(ConsoleReader.java:494)
at jline.ConsoleReader.readLine(ConsoleReader.java:448)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:202)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)


Thanks,
Cheng Hao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Sean Owen
This seems like a job for userClassPathFirst. Or could be. It's
definitely an issue of visibility between where the serializer is and
where the user class is.

At the top you said Pat that you didn't try this, but why not?

On Fri, Feb 27, 2015 at 10:11 PM, Pat Ferrel  wrote:
> I’ll try to find a Jira for it. I hope a fix is in 1.3
>
>
> On Feb 27, 2015, at 1:59 PM, Pat Ferrel  wrote:
>
> Thanks! that worked.
>
> On Feb 27, 2015, at 1:50 PM, Pat Ferrel  wrote:
>
> I don’t use spark-submit I have a standalone app.
>
> So I guess you want me to add that key/value to the conf in my code and make 
> sure it exists on workers.
>
>
> On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin  wrote:
>
> On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel  wrote:
>> I changed in the spark master conf, which is also the only worker. I added a 
>> path to the jar that has guava in it. Still can’t find the class.
>
> Sorry, I'm still confused about what config you're changing. I'm
> suggesting using:
>
> spark-submit --conf spark.executor.extraClassPath=/guava.jar blah
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka DStream Parallelism

2015-02-27 Thread Corey Nolet
This was what I was thinking but wanted to verify. Thanks Sean!

On Fri, Feb 27, 2015 at 9:56 PM, Sean Owen  wrote:

> The coarsest level at which you can parallelize is topic. Topics are
> all but unrelated to each other so can be consumed independently. But
> you can parallelize within the context of a topic too.
>
> A Kafka group ID defines a consumer group. One consumer in a group
> receive each message to the topic that group is listening to. Topics
> can have partitions too. You can thus make N consumers in a group
> listening to N partitions and each will effectively be listening to a
> partition.
>
> Yes, my understanding is that multiple receivers in one group are the
> way to consume a topic's partitions in parallel.
>
> On Sat, Feb 28, 2015 at 12:56 AM, Corey Nolet  wrote:
> > Looking @ [1], it seems to recommend pull from multiple Kafka topics in
> > order to parallelize data received from Kafka over multiple nodes. I
> notice
> > in [2], however, that one of the createConsumer() functions takes a
> groupId.
> > So am I understanding correctly that creating multiple DStreams with the
> > same groupId allow data to be partitioned across many nodes on a single
> > topic?
> >
> > [1]
> >
> http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
> > [2]
> >
> https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
>


Re: Kafka DStream Parallelism

2015-02-27 Thread Sean Owen
The coarsest level at which you can parallelize is topic. Topics are
all but unrelated to each other so can be consumed independently. But
you can parallelize within the context of a topic too.

A Kafka group ID defines a consumer group. One consumer in a group
receive each message to the topic that group is listening to. Topics
can have partitions too. You can thus make N consumers in a group
listening to N partitions and each will effectively be listening to a
partition.

Yes, my understanding is that multiple receivers in one group are the
way to consume a topic's partitions in parallel.

On Sat, Feb 28, 2015 at 12:56 AM, Corey Nolet  wrote:
> Looking @ [1], it seems to recommend pull from multiple Kafka topics in
> order to parallelize data received from Kafka over multiple nodes. I notice
> in [2], however, that one of the createConsumer() functions takes a groupId.
> So am I understanding correctly that creating multiple DStreams with the
> same groupId allow data to be partitioned across many nodes on a single
> topic?
>
> [1]
> http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
> [2]
> https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



JLine hangs under Windows8

2015-02-27 Thread Cheng, Hao
Hi, All
I was trying to run spark sql cli on windows 8 for debugging purpose, 
however, seems the JLine hangs in waiting input after "ENTER" key, I didn't see 
that under Linux, is there anybody meet the same issue?

The call stack as below:
"main" prio=6 tid=0x02548800 nid=0x17cc runnable [0x0253e000]
   java.lang.Thread.State: RUNNABLE
at jline.WindowsTerminal.readByte(Native Method)
at jline.WindowsTerminal.readCharacter(WindowsTerminal.java:233)
at jline.WindowsTerminal.readVirtualKey(WindowsTerminal.java:319)
at jline.ConsoleReader.readVirtualKey(ConsoleReader.java:1453)
at jline.ConsoleReader.readBinding(ConsoleReader.java:654)
at jline.ConsoleReader.readLine(ConsoleReader.java:494)
at jline.ConsoleReader.readLine(ConsoleReader.java:448)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:202)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)


Thanks,
Cheng Hao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error when running spark-shell

2015-02-27 Thread Sean Owen
Well, that would just show the JVM bug. This isnt a Spark issue. The JVM
crashes and not because of some native code used by Spark.
On Feb 28, 2015 2:04 AM, "amoners"  wrote:

> Please put your logs, you can get logs follow below:
>
> # An error report file with more information is saved as:
> # /Users/anupamajoshi/spark-1.2.0-bin-hadoop2.4/bin/hs_err_pid4709.log
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-spark-shell-tp21852p21855.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Error when running spark-shell

2015-02-27 Thread amoners
Please put your logs, you can get logs follow below:

# An error report file with more information is saved as:
# /Users/anupamajoshi/spark-1.2.0-bin-hadoop2.4/bin/hs_err_pid4709.log 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-spark-shell-tp21852p21855.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Kafka DStream Parallelism

2015-02-27 Thread Corey Nolet
Looking @ [1], it seems to recommend pull from multiple Kafka topics in
order to parallelize data received from Kafka over multiple nodes. I notice
in [2], however, that one of the createConsumer() functions takes a
groupId. So am I understanding correctly that creating multiple DStreams
with the same groupId allow data to be partitioned across many nodes on a
single topic?

[1]
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
[2]
https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$


Re: group by order by fails

2015-02-27 Thread iceback
String query = "select  s.name, count(s.name) as tally from sample s group by
s.name order by tally";



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/group-by-order-by-fails-tp21815p21854.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Speed Benchmark

2015-02-27 Thread Guillaume Guy
It is a simple text file.

I'm not using SQL. just doing a rdd.count() on it. Does the bug affect it?

On Friday, February 27, 2015, Davies Liu  wrote:

> What is this dataset? text file or parquet file?
>
> There is an issue with serialization in Spark SQL, which will make it
> very slow, see https://issues.apache.org/jira/browse/SPARK-6055, will
> be fixed very soon.
>
> Davies
>
> On Fri, Feb 27, 2015 at 1:59 PM, Guillaume Guy
> > wrote:
> > Hi Sean:
> >
> > Thanks for your feedback. Scala is much faster. The count is performed
> in ~1
> > minutes (vs 17min). I would expect scala to be 2-5X faster but this gap
> > seems to be more than that. Is that also your conclusion?
> >
> > Thanks.
> >
> >
> > Best,
> >
> > Guillaume Guy
> >  +1 919 - 972 - 8750
> >
> > On Fri, Feb 27, 2015 at 9:12 AM, Sean Owen  > wrote:
> >>
> >> That's very slow, and there are a lot of possible explanations. The
> >> first one that comes to mind is: I assume your YARN and HDFS are on
> >> the same machines, but are you running executors on all HDFS nodes
> >> when you run this? if not, a lot of these reads could be remote.
> >>
> >> You have 6 executor slots, but your data exists in 96 blocks on HDFS.
> >> You could read with up to 96-way parallelism. You say you're CPU-bound
> >> though, but normally I'd wonder if this was simply a case of
> >> under-using parallelism.
> >>
> >> I also wonder if the bottleneck is something to do with pyspark in
> >> this case; might be good to just try it in the spark-shell to check.
> >>
> >> On Fri, Feb 27, 2015 at 2:00 PM, Guillaume Guy
> >> > wrote:
> >> > Dear Spark users:
> >> >
> >> > I want to see if anyone has an idea of the performance for a small
> >> > cluster.
> >> >
> >> > Reading from HDFS, what should be the performance of  a count()
> >> > operation on
> >> > an 10GB RDD with 100M rows using pyspark. I looked into the CPU usage,
> >> > all 6
> >> > are at 100%.
> >> >
> >> > Details:
> >> >
> >> > master yarn-client
> >> > num-executors 3
> >> > executor-cores 2
> >> > driver-memory 5g
> >> > executor-memory 2g
> >> > Distribution: Cloudera
> >> >
> >> > I also attached the screenshot.
> >> >
> >> > Right now, I'm at 17 minutes which seems quite slow. Any idea how a
> >> > decent
> >> > performance with similar configuration?
> >> >
> >> > If it's way off, I would appreciate any pointers as to ways to improve
> >> > performance.
> >> >
> >> > Thanks.
> >> >
> >> > Best,
> >> >
> >> > Guillaume
> >> >
> >> >
> >> > -
> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> 
> >> > For additional commands, e-mail: user-h...@spark.apache.org
> 
> >
> >
>


-- 

Best,

Guillaume Guy

* +1 919 - 972 - 8750*


Re: What joda-time dependency does spark submit use/need?

2015-02-27 Thread Todd Nist
You can specify these jars (joda-time-2.7.jar, joda-convert-1.7.jar) either
as part of your build and assembly or via the --jars option to spark-submit.

HTH.

On Fri, Feb 27, 2015 at 2:48 PM, Su She  wrote:

> Hello Everyone,
>
> I'm having some issues launching (non-spark) applications via the
> spark-submit commands. The common error I am getting is c/p below. I am
> able to submit a spark streaming/kafka spark application, but can't start a
> dynamoDB java app. The common error is related to joda-time.
>
> 1) I realized spark-submit was pointing to joda-time-1.6 in the
> hadoop/lib,so I deleted this and my error changed from NoSuchMethodFound to
> NoClassDefFoundError.
>
> Instead of pointing to the other version of joda-time in the hadoop/lib,
> it now pointed to the jars I set a path to in my spark-submit command (I
> tried joda-time versions 2.2, 2.3, 2.6, 2.7), but still got the errors
>
> 2) My rudimentary theory is that spark-submit uses < joda-time-2.0, but
> the applications I'm running need >2.0.
>
> Thank you for the help!
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeFormat
> at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:374)
> at
> com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
> at
> com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
> at
> com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
> at
> com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
> at
> com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
> at
> com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
> at
> com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.setEndpoint(AmazonDynamoDBClient.java:2946)
> at
> com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.init(AmazonDynamoDBClient.java:351)
> at
> com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.(AmazonDynamoDBClient.java:273)
> at
> com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.(AmazonDynamoDBClient.java:250)
> at AmazonDynamoDBSample.init(AmazonDynamoDBSample.java:81)
> at AmazonDynamoDBSample.main(AmazonDynamoDBSample.java:87)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.joda.time.format.DateTimeFormat
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>


Re: Running spark function on parquet without sql

2015-02-27 Thread Deborah Siegel
Hi Michael,

Would you help me understand  the apparent difference here..

The Spark 1.2.1 programming guide indicates:

"Note that if you call schemaRDD.cache() rather than
sqlContext.cacheTable(...), tables will *not* be cached using the in-memory
columnar format, and therefore sqlContext.cacheTable(...) is strongly
recommended for this use case."

Yet the API doc shows that :
def cache(): SchemaRDD

.this.typeOverridden cache function will always use the in-memory columnar
caching.


links
https://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory
https://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.sql.SchemaRDD

Thanks
Sincerely
Deb

On Fri, Feb 27, 2015 at 2:13 PM, Michael Armbrust 
wrote:

> From Zhan Zhang's reply, yes I still get the parquet's advantage.
>>
>
> You will need to at least use SQL or the DataFrame API (coming in Spark
> 1.3) to specify the columns that you want in order to get the parquet
> benefits.   The rest of your operations can be standard Spark.
>
> My next question is, if I operate on SchemaRdd will I get the advantage of
>> Spark SQL's in memory columnar store when cached the table using
>> cacheTable()?
>>
>
> Yes, SchemaRDDs always use the in-memory columnar cache for cacheTable and
> .cache() since Spark 1.2+
>


Re: Speed Benchmark

2015-02-27 Thread Sean Owen
Is machine 1 the only one running an HDFS data node? You describe it as one
running Hadoop services.
On Feb 27, 2015 9:44 PM, "Guillaume Guy"  wrote:

> Hi Jason:
>
> Thanks for your feedback.
>
> Beside the information above I mentioned, there are 3 machines in the
> cluster.
>
> *1st one*: Driver + has a bunch of Hadoop services. 32GB of RAM, 8 cores
> (2 used)
> *2nd + 3rd: *16B of RAM, 4 cores (2 used each)
>
>  I hope this helps clarify.
>
> Thx.
>
> GG
>
>
>
> Best,
>
> Guillaume Guy
>
> * +1 919 - 972 - 8750 <%2B1%20919%20-%20972%20-%208750>*
>
> On Fri, Feb 27, 2015 at 9:06 AM, Jason Bell  wrote:
>
>> How many machines are on the cluster?
>> And what is the configuration of those machines (Cores/RAM)?
>>
>> "Small cluster" is very subjective statement.
>>
>>
>>
>> Guillaume Guy wrote:
>>
>>> Dear Spark users:
>>>
>>> I want to see if anyone has an idea of the performance for a small
>>> cluster.
>>>
>>>
>


Perf impact of BlockManager byte[] copies

2015-02-27 Thread Paul Wais
Dear List,

I'm investigating some problems related to native code integration
with Spark, and while picking through BlockManager I noticed that data
(de)serialization currently issues lots of array copies.
Specifically:

- Deserialization: BlockManager marshals all deserialized bytes
through a spark.util. ByteBufferInputStream, which necessitates
copying data into an intermediate temporary byte[] .  The temporary
byte[] might be reused between deserialization of T instances, but
nevertheless the bytes must be copied (and likely in a Java loop).

- Serialization: BlockManager buffers all serialized bytes into a
java.io.ByteArrayOutputStream, which maintains an internal byte[]
buffer and grows/re-copies the buffer like a vector as the buffer
fills.  BlockManager then retrieves the internal byte[] buffer, wraps
it in a ByteBuffer, and sends it off to be stored (e.g. in
MemoryStore, DiskStore, Tachyon, etc).

When an individual T is somewhat large (e.g. a feature vector, an
image, etc), or blocks are megabytes in size, these copies become
expensive (especially for large written blocks), right?  Does anybody
have any measurements of /how/ expensive they are?  If not, is there
serialization benchmark code (e.g. for KryoSerializer ) that might be
helpful here?


As part of my investigation, I've found that one might be able to
sidestep these issues by extending Spark's SerializerInstance API to
offer I/O on ByteBuffers (in addition to {Input,Output}Streams).  An
extension including a ByteBuffer API would furthermore have many
benefits for native code.  A major downside of this API addition is
that it wouldn't interoperate (nontrivially) with compression, so
shuffles wouldn't benefit.  Nevertheless, BlockManager could probably
deduce when use of this ByteBuffer API is possible and leverage it.

Cheers,
-Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed to parse Hive query

2015-02-27 Thread Anusha Shamanur
I do.
What tags should I change in this?
I changed the value of hive.exec.scratchdir to /tmp/hive.
What else?

On Fri, Feb 27, 2015 at 2:14 PM, Michael Armbrust 
wrote:

> Do you have a hive-site.xml file or a core-site.xml file?  Perhaps
> something is misconfigured there?
>
> On Fri, Feb 27, 2015 at 7:17 AM, Anusha Shamanur 
> wrote:
>
>> Hi,
>>
>> I am trying to do this in spark-shell:
>>
>> val hiveCtx = neworg.apache.spark.sql.hive.HiveContext(sc) val listTables 
>> =hiveCtx.hql("show tables")
>>
>> The second line fails to execute with this message:
>>
>> warning: there were 1 deprecation warning(s); re-run with -deprecation
>> for details org.apache.spark.sql.hive.HiveQl$ParseException: Failed to
>> parse: show tables at
>> org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) at
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
>> at
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
>> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>>
>> ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused
>> by: java.lang.NullPointerException: Conf non-local session path expected to
>> be non-null at
>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>> at
>> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:586)
>> at org.apache.hadoop.hive.ql.Context.(Context.java:129) at
>> org.apache.hadoop.hive.ql.Context.(Context.java:116) at
>> org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227) at
>> org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:240) ... 87 more
>>
>>
>> Any help would be appreciated.
>>
>>
>>
>> --
>> Sent from Gmail mobile
>>
>
>


-- 
Regards,
Anusha


Re: Get importerror when i run pyspark with ipython=1

2015-02-27 Thread sourabhguha

 

Here's the PYTHONPATH. It points to the correct location. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Get-importerror-when-i-run-pyspark-with-ipython-1-tp21843p21853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Speed Benchmark

2015-02-27 Thread Davies Liu
What is this dataset? text file or parquet file?

There is an issue with serialization in Spark SQL, which will make it
very slow, see https://issues.apache.org/jira/browse/SPARK-6055, will
be fixed very soon.

Davies

On Fri, Feb 27, 2015 at 1:59 PM, Guillaume Guy
 wrote:
> Hi Sean:
>
> Thanks for your feedback. Scala is much faster. The count is performed in ~1
> minutes (vs 17min). I would expect scala to be 2-5X faster but this gap
> seems to be more than that. Is that also your conclusion?
>
> Thanks.
>
>
> Best,
>
> Guillaume Guy
>  +1 919 - 972 - 8750
>
> On Fri, Feb 27, 2015 at 9:12 AM, Sean Owen  wrote:
>>
>> That's very slow, and there are a lot of possible explanations. The
>> first one that comes to mind is: I assume your YARN and HDFS are on
>> the same machines, but are you running executors on all HDFS nodes
>> when you run this? if not, a lot of these reads could be remote.
>>
>> You have 6 executor slots, but your data exists in 96 blocks on HDFS.
>> You could read with up to 96-way parallelism. You say you're CPU-bound
>> though, but normally I'd wonder if this was simply a case of
>> under-using parallelism.
>>
>> I also wonder if the bottleneck is something to do with pyspark in
>> this case; might be good to just try it in the spark-shell to check.
>>
>> On Fri, Feb 27, 2015 at 2:00 PM, Guillaume Guy
>>  wrote:
>> > Dear Spark users:
>> >
>> > I want to see if anyone has an idea of the performance for a small
>> > cluster.
>> >
>> > Reading from HDFS, what should be the performance of  a count()
>> > operation on
>> > an 10GB RDD with 100M rows using pyspark. I looked into the CPU usage,
>> > all 6
>> > are at 100%.
>> >
>> > Details:
>> >
>> > master yarn-client
>> > num-executors 3
>> > executor-cores 2
>> > driver-memory 5g
>> > executor-memory 2g
>> > Distribution: Cloudera
>> >
>> > I also attached the screenshot.
>> >
>> > Right now, I'm at 17 minutes which seems quite slow. Any idea how a
>> > decent
>> > performance with similar configuration?
>> >
>> > If it's way off, I would appreciate any pointers as to ways to improve
>> > performance.
>> >
>> > Thanks.
>> >
>> > Best,
>> >
>> > Guillaume
>> >
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Error when running spark-shell

2015-02-27 Thread AJ614
Has anyone seen this - 

# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000110fcd9cd, pid=4709, tid=11011
#
# JRE version: Java(TM) SE Runtime Environment (8.0_25-b17) (build
1.8.0_25-b17)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.25-b02 mixed mode bsd-amd64
compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x1cd9cd]  oopDesc::size()+0x2d
#
# Failed to write core dump. Core dumps have been disabled. To enable core
dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /Users/anupamajoshi/spark-1.2.0-bin-hadoop2.4/bin/hs_err_pid4709.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.sun.com/bugreport/crash.jsp
#
./spark-shell: line 48:  4709 Abort trap: 6



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-spark-shell-tp21852.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed to parse Hive query

2015-02-27 Thread Michael Armbrust
Do you have a hive-site.xml file or a core-site.xml file?  Perhaps
something is misconfigured there?

On Fri, Feb 27, 2015 at 7:17 AM, Anusha Shamanur 
wrote:

> Hi,
>
> I am trying to do this in spark-shell:
>
> val hiveCtx = neworg.apache.spark.sql.hive.HiveContext(sc) val listTables 
> =hiveCtx.hql("show tables")
>
> The second line fails to execute with this message:
>
> warning: there were 1 deprecation warning(s); re-run with -deprecation for
> details org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse:
> show tables at
> org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
> at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>
> ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused
> by: java.lang.NullPointerException: Conf non-local session path expected to
> be non-null at
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
> at
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:586)
> at org.apache.hadoop.hive.ql.Context.(Context.java:129) at
> org.apache.hadoop.hive.ql.Context.(Context.java:116) at
> org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227) at
> org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:240) ... 87 more
>
>
> Any help would be appreciated.
>
>
>
> --
> Sent from Gmail mobile
>


Re: Running spark function on parquet without sql

2015-02-27 Thread Michael Armbrust
>
> From Zhan Zhang's reply, yes I still get the parquet's advantage.
>

You will need to at least use SQL or the DataFrame API (coming in Spark
1.3) to specify the columns that you want in order to get the parquet
benefits.   The rest of your operations can be standard Spark.

My next question is, if I operate on SchemaRdd will I get the advantage of
> Spark SQL's in memory columnar store when cached the table using
> cacheTable()?
>

Yes, SchemaRDDs always use the in-memory columnar cache for cacheTable and
.cache() since Spark 1.2+


Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
I’ll try to find a Jira for it. I hope a fix is in 1.3


On Feb 27, 2015, at 1:59 PM, Pat Ferrel  wrote:

Thanks! that worked.

On Feb 27, 2015, at 1:50 PM, Pat Ferrel  wrote:

I don’t use spark-submit I have a standalone app.

So I guess you want me to add that key/value to the conf in my code and make 
sure it exists on workers.


On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin  wrote:

On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel  wrote:
> I changed in the spark master conf, which is also the only worker. I added a 
> path to the jar that has guava in it. Still can’t find the class.

Sorry, I'm still confused about what config you're changing. I'm
suggesting using:

spark-submit --conf spark.executor.extraClassPath=/guava.jar blah


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL Converting RDD to SchemaRDD without hardcoding a case class in scala

2015-02-27 Thread Michael Armbrust
http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

On Fri, Feb 27, 2015 at 1:39 PM, kpeng1  wrote:

> Hi All,
>
> I am currently trying to build out a spark job that would basically convert
> a csv file into parquet.  From what I have seen it looks like spark sql is
> the way to go and how I would go about this would be to load in the csv
> file
> into an RDD and convert it into a schemaRDD by injecting in the schema via
> a
> case class.
>
> What I want to avoid is hard coding in the case class itself.  I want to
> reuse this job and pass in a file that contains the schema i.e. an avro
> avsc
> file or something similar.  I was wondering if there was a way to do this,
> since I couldn't figure out how to create a case class dynamically... if
> there are ways around creating a case class I am definitely open to trying
> it out as well.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Converting-RDD-to-SchemaRDD-without-hardcoding-a-case-class-in-scala-tp21851.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
Thanks! that worked.

On Feb 27, 2015, at 1:50 PM, Pat Ferrel  wrote:

I don’t use spark-submit I have a standalone app.

So I guess you want me to add that key/value to the conf in my code and make 
sure it exists on workers.


On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin  wrote:

On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel  wrote:
> I changed in the spark master conf, which is also the only worker. I added a 
> path to the jar that has guava in it. Still can’t find the class.

Sorry, I'm still confused about what config you're changing. I'm
suggesting using:

spark-submit --conf spark.executor.extraClassPath=/guava.jar blah


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Speed Benchmark

2015-02-27 Thread Guillaume Guy
Hi Sean:

Thanks for your feedback. Scala is much faster. The count is performed in
~1 minutes (vs 17min). I would expect scala to be 2-5X faster but this gap
seems to be more than that. Is that also your conclusion?

Thanks.


Best,

Guillaume Guy

* +1 919 - 972 - 8750*

On Fri, Feb 27, 2015 at 9:12 AM, Sean Owen  wrote:

> That's very slow, and there are a lot of possible explanations. The
> first one that comes to mind is: I assume your YARN and HDFS are on
> the same machines, but are you running executors on all HDFS nodes
> when you run this? if not, a lot of these reads could be remote.
>
> You have 6 executor slots, but your data exists in 96 blocks on HDFS.
> You could read with up to 96-way parallelism. You say you're CPU-bound
> though, but normally I'd wonder if this was simply a case of
> under-using parallelism.
>
> I also wonder if the bottleneck is something to do with pyspark in
> this case; might be good to just try it in the spark-shell to check.
>
> On Fri, Feb 27, 2015 at 2:00 PM, Guillaume Guy
>  wrote:
> > Dear Spark users:
> >
> > I want to see if anyone has an idea of the performance for a small
> cluster.
> >
> > Reading from HDFS, what should be the performance of  a count()
> operation on
> > an 10GB RDD with 100M rows using pyspark. I looked into the CPU usage,
> all 6
> > are at 100%.
> >
> > Details:
> >
> > master yarn-client
> > num-executors 3
> > executor-cores 2
> > driver-memory 5g
> > executor-memory 2g
> > Distribution: Cloudera
> >
> > I also attached the screenshot.
> >
> > Right now, I'm at 17 minutes which seems quite slow. Any idea how a
> decent
> > performance with similar configuration?
> >
> > If it's way off, I would appreciate any pointers as to ways to improve
> > performance.
> >
> > Thanks.
> >
> > Best,
> >
> > Guillaume
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
>


Running hive query from spark

2015-02-27 Thread Anusha Shamanur
Hi,

I am trying to do this in spark-shell:

val hiveCtx = neworg.apache.spark.sql.hive.HiveContext(sc) val
listTables =hiveCtx.hql("show tables")

The second line fails to execute with this message:

warning: there were 1 deprecation warning(s); re-run with -deprecation for
details org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse:
show tables at
org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at
scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)

... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused
by: java.lang.NullPointerException: Conf non-local session path expected to
be non-null at
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at
org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:586)
at org.apache.hadoop.hive.ql.Context.(Context.java:129) at
org.apache.hadoop.hive.ql.Context.(Context.java:116) at
org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227) at
org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:240) ... 87 more


Any help would be appreciated. I am stuck with this from past two days.


Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
I don’t use spark-submit I have a standalone app.

So I guess you want me to add that key/value to the conf in my code and make 
sure it exists on workers.


On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin  wrote:

On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel  wrote:
> I changed in the spark master conf, which is also the only worker. I added a 
> path to the jar that has guava in it. Still can’t find the class.

Sorry, I'm still confused about what config you're changing. I'm
suggesting using:

spark-submit --conf spark.executor.extraClassPath=/guava.jar blah


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Marcelo Vanzin
On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel  wrote:
> I changed in the spark master conf, which is also the only worker. I added a 
> path to the jar that has guava in it. Still can’t find the class.

Sorry, I'm still confused about what config you're changing. I'm
suggesting using:

spark-submit --conf spark.executor.extraClassPath=/guava.jar blah


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
I changed in the spark master conf, which is also the only worker. I added a 
path to the jar that has guava in it. Still can’t find the class.

Trying Erland’s idea next.

On Feb 27, 2015, at 1:35 PM, Marcelo Vanzin  wrote:

On Fri, Feb 27, 2015 at 1:30 PM, Pat Ferrel  wrote:
> @Marcelo do you mean by modifying spark.executor.extraClassPath on all
> workers, that didn’t seem to work?

That's an app configuration, not a worker configuration, so if you're
trying to set it on the worker configuration it will definitely not
work.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Speed Benchmark

2015-02-27 Thread Guillaume Guy
Hi Jason:

Thanks for your feedback.

Beside the information above I mentioned, there are 3 machines in the
cluster.

*1st one*: Driver + has a bunch of Hadoop services. 32GB of RAM, 8 cores (2
used)
*2nd + 3rd: *16B of RAM, 4 cores (2 used each)

 I hope this helps clarify.

Thx.

GG



Best,

Guillaume Guy

* +1 919 - 972 - 8750*

On Fri, Feb 27, 2015 at 9:06 AM, Jason Bell  wrote:

> How many machines are on the cluster?
> And what is the configuration of those machines (Cores/RAM)?
>
> "Small cluster" is very subjective statement.
>
>
>
> Guillaume Guy wrote:
>
>> Dear Spark users:
>>
>> I want to see if anyone has an idea of the performance for a small
>> cluster.
>>
>>


Spark SQL Converting RDD to SchemaRDD without hardcoding a case class in scala

2015-02-27 Thread kpeng1
Hi All,

I am currently trying to build out a spark job that would basically convert
a csv file into parquet.  From what I have seen it looks like spark sql is
the way to go and how I would go about this would be to load in the csv file
into an RDD and convert it into a schemaRDD by injecting in the schema via a
case class.

What I want to avoid is hard coding in the case class itself.  I want to
reuse this job and pass in a file that contains the schema i.e. an avro avsc
file or something similar.  I was wondering if there was a way to do this,
since I couldn't figure out how to create a case class dynamically... if
there are ways around creating a case class I am definitely open to trying
it out as well.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Converting-RDD-to-SchemaRDD-without-hardcoding-a-case-class-in-scala-tp21851.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Marcelo Vanzin
On Fri, Feb 27, 2015 at 1:30 PM, Pat Ferrel  wrote:
> @Marcelo do you mean by modifying spark.executor.extraClassPath on all
> workers, that didn’t seem to work?

That's an app configuration, not a worker configuration, so if you're
trying to set it on the worker configuration it will definitely not
work.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
@Erlend hah, we were trying to merge your PR and ran into this—small world. You 
actually compile the JavaSerializer source in your project?

@Marcelo do you mean by modifying spark.executor.extraClassPath on all workers, 
that didn’t seem to work?

On Feb 27, 2015, at 1:23 PM, Erlend Hamnaberg  wrote:

Hi.

I have had a simliar issue. I had to pull the JavaSerializer source into my own 
project, just so I got the classloading of this class under control.

This must be a class loader issue with spark.

-E

On Fri, Feb 27, 2015 at 8:52 PM, Pat Ferrel mailto:p...@occamsmachete.com>> wrote:
I understand that I need to supply Guava to Spark. The HashBiMap is created in 
the client and broadcast to the workers. So it is needed in both. To achieve 
this there is a deps.jar with Guava (and Scopt but that is only for the 
client). Scopt is found so I know the jar is fine for the client.

I pass in the deps.jar to the context creation code. I’ve checked the content 
of the jar and have verified that it is used at context creation time.

I register the serializer as follows:

class MyKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) = {

val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]()
//kryo.addDefaultSerializer(h.getClass, new JavaSerializer())
log.info ("\n\n\nRegister Serializer for " + 
h.getClass.getCanonicalName + "\n\n\n") // just to be sure this does indeed get 
logged
kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], 
new JavaSerializer())
  }
}

The job proceeds up until the broadcast value, a HashBiMap, is deserialized, 
which is where I get the following error.

Have I missed a step for deserialization of broadcast values? Odd that 
serialization happened but deserialization failed. I’m running on a standalone 
localhost-only cluster.


15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 4.0 
(TID 9, 192.168.0.2): java.io.IOException: 
com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
at 
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
... 19 more

 root eror ==
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.HashBiMap
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
...








On Feb 25, 2015, at 5:24 PM, Marcelo Vanzin mailto:van...@cloudera.com>> wrote:

Guava is not in Spark. (Well, long version: it's in Spark but it's
relocated to a different package except for some special classes
leaked through the public API.)

If your app needs Guava, it n

Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Erlend Hamnaberg
Hi.

I have had a simliar issue. I had to pull the JavaSerializer source into my
own project, just so I got the classloading of this class under control.

This must be a class loader issue with spark.

-E

On Fri, Feb 27, 2015 at 8:52 PM, Pat Ferrel  wrote:

> I understand that I need to supply Guava to Spark. The HashBiMap is
> created in the client and broadcast to the workers. So it is needed in
> both. To achieve this there is a deps.jar with Guava (and Scopt but that is
> only for the client). Scopt is found so I know the jar is fine for the
> client.
>
> I pass in the deps.jar to the context creation code. I’ve checked the
> content of the jar and have verified that it is used at context creation
> time.
>
> I register the serializer as follows:
>
> class MyKryoRegistrator extends KryoRegistrator {
>
>   override def registerClasses(kryo: Kryo) = {
>
> val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]()
> //kryo.addDefaultSerializer(h.getClass, new JavaSerializer())
> log.info("\n\n\nRegister Serializer for " +
> h.getClass.getCanonicalName + "\n\n\n") // just to be sure this does indeed
> get logged
> kryo.register(classOf[com.google.common.collect.HashBiMap[String,
> Int]], new JavaSerializer())
>   }
> }
>
> The job proceeds up until the broadcast value, a HashBiMap, is
> deserialized, which is where I get the following error.
>
> Have I missed a step for deserialization of broadcast values? Odd that
> serialization happened but deserialization failed. I’m running on a
> standalone localhost-only cluster.
>
>
> 15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage
> 4.0 (TID 9, 192.168.0.2): java.io.IOException:
> com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at
> my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
> at
> my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java
> deserialization.
> at
> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
> at
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
> ... 19 more
>
>  root eror ==
> Caused by: java.lang.ClassNotFoundException:
> com.google.common.collect.HashBiMap
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> ...
>
>
>
>
>
>
>
>
> On Feb 25, 2015, at 5:24 PM, Marcelo Vanzin  wrote:
>
> Guava is not in Spark. (Well, long version: it's in Spark but it's
> relocated to a different package except for some special classes
> leaked through the public API.)
>
> If your app needs Guava, it needs to package Guava with it (e.g. by
> using maven-shade-plugin, or using "--jars" if only executors use
> Guava).
>
> On Wed, Feb 25, 2015 at 5:17 PM, Pat Ferrel  wrote:
> > The root Spark pom has guava set at a certai

Re: Problem getting program to run on 15TB input

2015-02-27 Thread Burak Yavuz
Hi,

Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
many small objects that lead to very long GC time, causing the executor
losts, heartbeat not received, and GC overhead limit exceeded messages.
Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
try `OFF_HEAP` (and use Tachyon).

Burak

On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra  wrote:

> My program in pseudocode looks like this:
>
> val conf = new SparkConf().setAppName("Test")
>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>   .set("spark.shuffle.manager","SORT") // preferred setting for
> optimized joins
>   .set("spark.shuffle.consolidateFiles","true") // helpful for "too
> many files open"
>   .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
> errors?
>   .set("spark.akka.frameSize","500") // helpful when using
> consildateFiles=true
>   .set("spark.akka.askTimeout", "30")
>   .set("spark.shuffle.compress","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.file.transferTo","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.core.connection.ack.wait.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.speculation","true")
>   .set("spark.worker.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>   .set("spark.akka.timeout","300") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>   .set("spark.driver.maxResultSize","2048") // in response to error:
> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB)
>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>   .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>   .set("spark.kryo.registrationRequired", "true")
>
> val rdd1 = 
> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
> -1)...filter(...)
>
> val rdd2 =
> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
> -1)...filter(...)
>
>
> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>
>
> I run the code with:
>   --num-executors 500 \
>   --driver-memory 20g \
>   --executor-memory 20g \
>   --executor-cores 32 \
>
>
> I'm using kryo serialization on everything, including broadcast variables.
>
> Spark creates 145k tasks, and the first stage includes everything before
> groupByKey(). It fails before getting to groupByKey. I have tried doubling
> and tripling the number of partitions when calling textFile, with no
> success.
>
> Very similar code (trivial changes, to accomodate different input) worked
> on a smaller input (~8TB)... Not that it was easy to get that working.
>
>
>
> Errors vary, here is what I am getting right now:
>
> ERROR SendingConnection: Exception while reading SendingConnection
> ... java.nio.channels.ClosedChannelException
> (^ guessing that is symptom of something else)
>
> WARN BlockManagerMasterActor: Removing BlockManager
> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
> (^ guessing that is symptom of something else)
>
> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
> down ActorSystem [sparkDriver]
> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>
>
>
> Other times I will get messages about "executor lost..." about 1 message
> per second, after ~~50k tasks complete, until there are almost no executors
> left and progress slows to nothing.
>
> I ran with verbose GC info; I do see failing yarn containers that have
> multiple (like 30) "Full GC" messages but I don't know how to interpret if
> that is the problem. Typical Full GC time taken seems ok: [Times:
> user=23.30 sys=0.06, real=1.94 secs]
>
>
>
> Suggestions, please?
>
> Huge thanks for useful suggestions,
> Arun
>


RE: Race Condition in Streaming Thread

2015-02-27 Thread Nastooh Avessta (navesta)
Thank you for your time and effort. Here is the code:
---

public  final class Multinode extends Receiver {

  String host = null;
  int portRx = -1;
  int portTx = -1;
  private final Semaphore sem = new Semaphore(1);

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("Multinode");
sparkConf.set("spark.cores.max", args[1]);
sparkConf.set("spark.task.cpus", args[2]);
sparkConf.set("spark.default.parallelism",args[2]);
sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(Integer.parseInt(args[6])));

JavaReceiverInputDStream data = ssc.receiverStream(
new Multinode(args[3], 
Integer.parseInt(args[4]),Integer.parseInt(args[5])));

JavaDStream udp = data.map(new Function() {
  @Override
  public Integer call(Output x) {

Integer Ret=new Integer(x.position());
return (Ret);
  }
});

udp.print();
ssc.start();
ssc.awaitTermination();
  }


  public Multinode(String host_ , int portRx_,int portTx_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
portRx = portRx_;
portTx = portTx_;

  }

  public void onStart() {
// Start the thread that receives data over a connection
new Thread()  {
  @Override public void run() {

receive();

  }
}.start();
  }

  public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
  Process p;
DatagramSocket socket = null;
byte[] buf = new byte[1400];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
Kryo kryo = new Kryo();
kryo.register(DatagramPacket.class);
Output output=new Output(2900); //2*(1400+35)


try {

  // connect to the server
  if(socket==null) socket = new  DatagramSocket(portRx);
  InetAddress returnIPAddress = InetAddress.getByName(host) ;
  int returnPort =portTx;

 do{

 try {
sem.acquire();

  p = 
Runtime.getRuntime().exec("Prog");
 sem.release();
}catch(IOException ioe){
  //ioe.printStackTrace();
  break;
}
  socket.receive(packet);

  output.clear();
  kryo.writeObject(output, packet);
  store(output);
  packet.setAddress(returnIPAddress);
  packet.setPort(returnPort);
  socket.send(packet);
  } while (!isStopped());
  socket.close(); socket = null;

  // Restart in an attempt to connect again when server is active again
  restart("Trying to connect again");
} catch(ConnectException ce) {
  // restart if could not connect to server
  restart("Could not connect", ce);
} catch(Throwable t) {
  restart("Error receiving data", t);
}
  }

}

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferences - 
Unsubscribe – 
Privacy

From: Tathagata Das [mailto:t...@databricks.com]
Sent: Friday, February 27, 2015 12:39 PM
To: Nastooh Avessta (navesta)
Cc: user@spark.apache.org
Subject: Re: Race Condition in Streaming Thread

Its wasn't clear from the snippet whats going on. Can your provide the whole 
Receiver code?

TD

On Fri, Feb 27, 2015 at 12:37 PM, Nastooh Avessta (navesta) 
mailto:nave...@ci

How to debug a Hung task

2015-02-27 Thread Manas Kar
Hi,
 I have a spark application that hangs on doing just one task (Rest 200-300
task gets completed in reasonable time)
I can see in the Thread dump which function gets stuck how ever I don't
have a clue as to what value is causing that behaviour.
Also, logging the inputs before the function is executed does not help as
the actual message gets buried in logs.

How do one go about debugging such case?
Also, is there a way I can wrap my function inside some sort of timer based
environment and if it took too long I would throw a stack trace or some
sort.

Thanks
Manas


RE: Race Condition in Streaming Thread

2015-02-27 Thread Nastooh Avessta (navesta)
I am, as I issue killall -9 Prog, prior to testing.
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferences - 
Unsubscribe – 
Privacy

From: Tathagata Das [mailto:t...@databricks.com]
Sent: Friday, February 27, 2015 12:29 PM
To: Nastooh Avessta (navesta)
Cc: user@spark.apache.org
Subject: Re: Race Condition in Streaming Thread

Are you sure the multiple invocations are not from previous runs of the program?

TD

On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) 
mailto:nave...@cisco.com>> wrote:
Hi
Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd party udp 
traffic generator, from the streaming thread. The excerpt is as follows:
…
   do{

 try {

 p = Runtime.getRuntime().exec(Prog ");

  socket.receive(packet);

  output.clear();
  kryo.writeObject(output, packet);
  store(output);
…
Program has a test to check for existing instantiation, e.g. [ "$(pidof Prog)" 
] && exit.  This code runs fine, i.e., 3rd party application is invoked, data 
is received, analyzed on driver, etc. Problem arises, when I test redundancy 
and fault-tolerance. Specifically, when I manually terminate Prog, upon 
recovery,  multiple invocations are observed. This could be due to multiple 
threads getting through  [ "$(pidof Prog)" ] && exit. However, I was hoping by 
adding semaphores, as follows, to avoid this problem:
…
   do{

 try {
sem.acquire();
  p = 
Runtime.getRuntime().exec("Prog");
 sem.release();
}catch(IOException ioe){
  //ioe.printStackTrace();
  break;
}
  socket.receive(packet);
  //InetAddress returnIPAddress = packet.getAddress(); 
returnPort = packet.getPort();
  output.clear();
  kryo.writeObject(output, packet);
  store(output);
…

However, I am still seeing multiple invocations of Prog, upon recovery. I have 
also experimented with the following configuration parameters, to no avail:
  sparkConf.set("spark.cores.max", args[1]);
  sparkConf.set("spark.task.cpus", args[2]);
 sparkConf.set("spark.default.parallelism",args[2]);
with args={(1,1),(2,1), (1,2),…}
Any thoughts?
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferences - 
Unsubscribe – 
Privacy




Re: Race Condition in Streaming Thread

2015-02-27 Thread Tathagata Das
Its wasn't clear from the snippet whats going on. Can your provide the
whole Receiver code?

TD

On Fri, Feb 27, 2015 at 12:37 PM, Nastooh Avessta (navesta) <
nave...@cisco.com> wrote:

>  I am, as I issue killall -9 Prog, prior to testing.
>
> Cheers,
>
>
>
> [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]
>
> *Nastooh Avessta*
> ENGINEER.SOFTWARE ENGINEERING
> nave...@cisco.com
> Phone: *+1 604 647 1527 <%2B1%20604%20647%201527>*
>
> *Cisco Systems Limited*
> 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
> VANCOUVER
> BRITISH COLUMBIA
> V7X 1J1
> CA
> Cisco.com 
>
>
>
> [image: Think before you print.]Think before you print.
>
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
>
> For corporate legal information go to:
> http://www.cisco.com/web/about/doing_business/legal/cri/index.html
>
> Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
> 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
>  - Unsubscribe
>  – Privacy
> *
>
>
>
> *From:* Tathagata Das [mailto:t...@databricks.com]
> *Sent:* Friday, February 27, 2015 12:29 PM
> *To:* Nastooh Avessta (navesta)
> *Cc:* user@spark.apache.org
> *Subject:* Re: Race Condition in Streaming Thread
>
>
>
> Are you sure the multiple invocations are not from previous runs of the
> program?
>
>
>
> TD
>
>
>
> On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) <
> nave...@cisco.com> wrote:
>
> Hi
>
> Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd
> party udp traffic generator, from the streaming thread. The excerpt is as
> follows:
>
> …
>
>do{
>
>
>
>  try {
>
>
>
>
>  p = Runtime.getRuntime().exec(Prog ");
>
>
>
>   socket.receive(packet);
>
>
>
>   output.clear();
>
>   kryo.writeObject(output, packet);
>
>   store(output);
>
> …
>
> Program has a test to check for existing instantiation, e.g. [ "$(pidof
> Prog)" ] && exit.  This code runs fine, i.e., 3rd party application is
> invoked, data is received, analyzed on driver, etc. Problem arises, when I
> test redundancy and fault-tolerance. Specifically, when I manually
> terminate Prog, upon recovery,  multiple invocations are observed. This
> could be due to multiple threads getting through  [ "$(pidof Prog)" ] &&
> exit. However, I was hoping by adding semaphores, as follows, to avoid this
> problem:
>
> …
>
>do{
>
>
>
>  try {
>
>
> sem.acquire();
>
>   p =
> Runtime.getRuntime().exec("Prog");
>
>  sem.release();
>
> }catch(IOException ioe){
>
>
>   //ioe.printStackTrace();
>
>   break;
>
> }
>
>   socket.receive(packet);
>
>   //InetAddress returnIPAddress = packet.getAddress();
> returnPort = packet.getPort();
>
>   output.clear();
>
>   kryo.writeObject(output, packet);
>
>   store(output);
>
> …
>
>
>
> However, I am still seeing multiple invocations of Prog, upon recovery. I
> have also experimented with the following configuration parameters, to no
> avail:
>
>   sparkConf.set("spark.cores.max", args[1]);
>
>   sparkConf.set("spark.task.cpus", args[2]);
>
>  sparkConf.set("spark.default.parallelism",args[2]);
>
> with args={(1,1),(2,1), (1,2),…}
>
> Any thoughts?
>
> Cheers,
>
>
>
> [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]
>
> *Nastooh Avessta*
> ENGINEER.SOFTWARE ENGINEERING
> nave...@cisco.com
> Phone: *+1 604 647 1527 <%2B1%20604%20647%201527>*
>
> *Cisco Systems Limited*
> 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
> VANCOUVER
> BRITISH COLUMBIA
> V7X 1J1
> CA
> Cisco.com 
>
>
>
> [image: Think before you print.]Think before you print.
>
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
>
> For corporate legal information go to:
> http://www.cisco.com/web/about/doing_business/legal/cri/index.html
>
> Cisco Systems Canada Co, 181 Bay St., S

Re: Race Condition in Streaming Thread

2015-02-27 Thread Tathagata Das
Are you sure the multiple invocations are not from previous runs of the
program?

TD

On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) <
nave...@cisco.com> wrote:

>  Hi
>
> Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd
> party udp traffic generator, from the streaming thread. The excerpt is as
> follows:
>
> …
>
>do{
>
>
>
>  try {
>
>
>
>
>  p = Runtime.getRuntime().exec(Prog ");
>
>
>
>   socket.receive(packet);
>
>
>
>   output.clear();
>
>   kryo.writeObject(output, packet);
>
>   store(output);
>
> …
>
> Program has a test to check for existing instantiation, e.g. [ "$(pidof
> Prog)" ] && exit.  This code runs fine, i.e., 3rd party application is
> invoked, data is received, analyzed on driver, etc. Problem arises, when I
> test redundancy and fault-tolerance. Specifically, when I manually
> terminate Prog, upon recovery,  multiple invocations are observed. This
> could be due to multiple threads getting through  [ "$(pidof Prog)" ] &&
> exit. However, I was hoping by adding semaphores, as follows, to avoid this
> problem:
>
> …
>
>do{
>
>
>
>  try {
>
>
> sem.acquire();
>
>   p =
> Runtime.getRuntime().exec("Prog");
>
>  sem.release();
>
> }catch(IOException ioe){
>
>
>   //ioe.printStackTrace();
>
>   break;
>
> }
>
>   socket.receive(packet);
>
>   //InetAddress returnIPAddress = packet.getAddress();
> returnPort = packet.getPort();
>
>   output.clear();
>
>   kryo.writeObject(output, packet);
>
>   store(output);
>
> …
>
>
>
> However, I am still seeing multiple invocations of Prog, upon recovery. I
> have also experimented with the following configuration parameters, to no
> avail:
>
>   sparkConf.set("spark.cores.max", args[1]);
>
>   sparkConf.set("spark.task.cpus", args[2]);
>
>  sparkConf.set("spark.default.parallelism",args[2]);
>
> with args={(1,1),(2,1), (1,2),…}
>
> Any thoughts?
>
> Cheers,
>
>
>
> [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]
>
> *Nastooh Avessta*
> ENGINEER.SOFTWARE ENGINEERING
> nave...@cisco.com
> Phone: *+1 604 647 1527 <%2B1%20604%20647%201527>*
>
> *Cisco Systems Limited*
> 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
> VANCOUVER
> BRITISH COLUMBIA
> V7X 1J1
> CA
> Cisco.com 
>
>
>
> [image: Think before you print.]Think before you print.
>
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
>
> For corporate legal information go to:
> http://www.cisco.com/web/about/doing_business/legal/cri/index.html
>
> Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
> 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
>  - Unsubscribe
>  – Privacy
> *
>
>
>


Race Condition in Streaming Thread

2015-02-27 Thread Nastooh Avessta (navesta)
Hi
Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd party udp 
traffic generator, from the streaming thread. The excerpt is as follows:
...
   do{

 try {

 p = Runtime.getRuntime().exec(Prog ");

  socket.receive(packet);

  output.clear();
  kryo.writeObject(output, packet);
  store(output);
...
Program has a test to check for existing instantiation, e.g. [ "$(pidof Prog)" 
] && exit.  This code runs fine, i.e., 3rd party application is invoked, data 
is received, analyzed on driver, etc. Problem arises, when I test redundancy 
and fault-tolerance. Specifically, when I manually terminate Prog, upon 
recovery,  multiple invocations are observed. This could be due to multiple 
threads getting through  [ "$(pidof Prog)" ] && exit. However, I was hoping by 
adding semaphores, as follows, to avoid this problem:
...
   do{

 try {
sem.acquire();
  p = 
Runtime.getRuntime().exec("Prog");
 sem.release();
}catch(IOException ioe){
  //ioe.printStackTrace();
  break;
}
  socket.receive(packet);
  //InetAddress returnIPAddress = packet.getAddress(); 
returnPort = packet.getPort();
  output.clear();
  kryo.writeObject(output, packet);
  store(output);
...

However, I am still seeing multiple invocations of Prog, upon recovery. I have 
also experimented with the following configuration parameters, to no avail:
  sparkConf.set("spark.cores.max", args[1]);
  sparkConf.set("spark.task.cpus", args[2]);
 sparkConf.set("spark.default.parallelism",args[2]);
with args={(1,1),(2,1), (1,2),...}
Any thoughts?
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferences - 
Unsubscribe - 
Privacy



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Marcelo Vanzin
Ah, I see. That makes a lot of sense now.

You might be running into some weird class loader visibility issue.
I've seen some bugs in jira about this in the past, maybe you're
hitting one of them.

Until I have some time to investigate (of if you're curious feel free
to scavenge jira), a workaround could be to manually copy the guava
jar to your executor nodes, and add them to the executor's class path
manually (spark.executor.extraClassPath). That will place your guava
in the Spark classloader (vs. your app's class loader when using
--jars), and things should work.


On Fri, Feb 27, 2015 at 11:52 AM, Pat Ferrel  wrote:
> I understand that I need to supply Guava to Spark. The HashBiMap is created 
> in the client and broadcast to the workers. So it is needed in both. To 
> achieve this there is a deps.jar with Guava (and Scopt but that is only for 
> the client). Scopt is found so I know the jar is fine for the client.
>
> I pass in the deps.jar to the context creation code. I’ve checked the content 
> of the jar and have verified that it is used at context creation time.
>
> I register the serializer as follows:
>
> class MyKryoRegistrator extends KryoRegistrator {
>
>   override def registerClasses(kryo: Kryo) = {
>
> val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]()
> //kryo.addDefaultSerializer(h.getClass, new JavaSerializer())
> log.info("\n\n\nRegister Serializer for " + h.getClass.getCanonicalName + 
> "\n\n\n") // just to be sure this does indeed get logged
> kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], 
> new JavaSerializer())
>   }
> }
>
> The job proceeds up until the broadcast value, a HashBiMap, is deserialized, 
> which is where I get the following error.
>
> Have I missed a step for deserialization of broadcast values? Odd that 
> serialization happened but deserialization failed. I’m running on a 
> standalone localhost-only cluster.
>
>
> 15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 4.0 
> (TID 9, 192.168.0.2): java.io.IOException: 
> com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
> at 
> my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
> deserialization.
> at 
> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
> ... 19 more
>
>  root eror ==
> Caused by: java.lang.ClassNotFoundException: 
> com.google.common.collect.HashBiMap
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> ...
>
>
>
>
>
>
>
>
> On Feb 25, 2015, at 5:24 PM, Marcelo Vanzin  wrote:
>
> Guava is not in Spark. (Well

Re: Upgrade to Spark 1.2.1 using Guava

2015-02-27 Thread Pat Ferrel
I understand that I need to supply Guava to Spark. The HashBiMap is created in 
the client and broadcast to the workers. So it is needed in both. To achieve 
this there is a deps.jar with Guava (and Scopt but that is only for the 
client). Scopt is found so I know the jar is fine for the client. 

I pass in the deps.jar to the context creation code. I’ve checked the content 
of the jar and have verified that it is used at context creation time. 

I register the serializer as follows:

class MyKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) = {

val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]()
//kryo.addDefaultSerializer(h.getClass, new JavaSerializer())
log.info("\n\n\nRegister Serializer for " + h.getClass.getCanonicalName + 
"\n\n\n") // just to be sure this does indeed get logged
kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], 
new JavaSerializer())
  }
}

The job proceeds up until the broadcast value, a HashBiMap, is deserialized, 
which is where I get the following error. 

Have I missed a step for deserialization of broadcast values? Odd that 
serialization happened but deserialization failed. I’m running on a standalone 
localhost-only cluster.


15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 4.0 
(TID 9, 192.168.0.2): java.io.IOException: 
com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
at 
my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
at 
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
... 19 more

 root eror ==
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.HashBiMap
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
...








On Feb 25, 2015, at 5:24 PM, Marcelo Vanzin  wrote:

Guava is not in Spark. (Well, long version: it's in Spark but it's
relocated to a different package except for some special classes
leaked through the public API.)

If your app needs Guava, it needs to package Guava with it (e.g. by
using maven-shade-plugin, or using "--jars" if only executors use
Guava).

On Wed, Feb 25, 2015 at 5:17 PM, Pat Ferrel  wrote:
> The root Spark pom has guava set at a certain version number. It’s very hard
> to read the shading xml. Someone suggested that I try using
> userClassPathFirst but that sounds too heavy handed since I don’t really
> care which version of guava I get, not picky.
> 
> When I set my project to use the same version as Spark I get a missing
> classdef, which usually means a version conflict.
> 
> At this point I am quite confused about what is actually in Spark as far as
>

What joda-time dependency does spark submit use/need?

2015-02-27 Thread Su She
Hello Everyone,

I'm having some issues launching (non-spark) applications via the
spark-submit commands. The common error I am getting is c/p below. I am
able to submit a spark streaming/kafka spark application, but can't start a
dynamoDB java app. The common error is related to joda-time.

1) I realized spark-submit was pointing to joda-time-1.6 in the
hadoop/lib,so I deleted this and my error changed from NoSuchMethodFound to
NoClassDefFoundError.

Instead of pointing to the other version of joda-time in the hadoop/lib, it
now pointed to the jars I set a path to in my spark-submit command (I tried
joda-time versions 2.2, 2.3, 2.6, 2.7), but still got the errors

2) My rudimentary theory is that spark-submit uses < joda-time-2.0, but the
applications I'm running need >2.0.

Thank you for the help!

Exception in thread "main" java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeFormat
at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at
com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
at
com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
at
com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
at
com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.setEndpoint(AmazonDynamoDBClient.java:2946)
at
com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.init(AmazonDynamoDBClient.java:351)
at
com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.(AmazonDynamoDBClient.java:273)
at
com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.(AmazonDynamoDBClient.java:250)
at AmazonDynamoDBSample.init(AmazonDynamoDBSample.java:81)
at AmazonDynamoDBSample.main(AmazonDynamoDBSample.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
org.joda.time.format.DateTimeFormat
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)


Problem getting program to run on 15TB input

2015-02-27 Thread Arun Luthra
My program in pseudocode looks like this:

val conf = new SparkConf().setAppName("Test")
  .set("spark.storage.memoryFraction","0.2") // default 0.6
  .set("spark.shuffle.memoryFraction","0.12") // default 0.2
  .set("spark.shuffle.manager","SORT") // preferred setting for
optimized joins
  .set("spark.shuffle.consolidateFiles","true") // helpful for "too
many files open"
  .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
errors?
  .set("spark.akka.frameSize","500") // helpful when using
consildateFiles=true
  .set("spark.akka.askTimeout", "30")
  .set("spark.shuffle.compress","false") //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
  .set("spark.file.transferTo","false") //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
  .set("spark.core.connection.ack.wait.timeout","600") //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
  .set("spark.speculation","true")
  .set("spark.worker.timeout","600") //
http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
  .set("spark.akka.timeout","300") //
http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
  .set("spark.storage.blockManagerSlaveTimeoutMs","12")
  .set("spark.driver.maxResultSize","2048") // in response to error:
Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
  .set("spark.kryo.registrationRequired", "true")

val rdd1 = 
sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
-1)...filter(...)

val rdd2 =
sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
-1)...filter(...)

rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


I run the code with:
  --num-executors 500 \
  --driver-memory 20g \
  --executor-memory 20g \
  --executor-cores 32 \


I'm using kryo serialization on everything, including broadcast variables.

Spark creates 145k tasks, and the first stage includes everything before
groupByKey(). It fails before getting to groupByKey. I have tried doubling
and tripling the number of partitions when calling textFile, with no
success.

Very similar code (trivial changes, to accomodate different input) worked
on a smaller input (~8TB)... Not that it was easy to get that working.



Errors vary, here is what I am getting right now:

ERROR SendingConnection: Exception while reading SendingConnection
... java.nio.channels.ClosedChannelException
(^ guessing that is symptom of something else)

WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
(^ guessing that is symptom of something else)

ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting down
ActorSystem [sparkDriver]
*java.lang.OutOfMemoryError: GC overhead limit exceeded*



Other times I will get messages about "executor lost..." about 1 message
per second, after ~~50k tasks complete, until there are almost no executors
left and progress slows to nothing.

I ran with verbose GC info; I do see failing yarn containers that have
multiple (like 30) "Full GC" messages but I don't know how to interpret if
that is the problem. Typical Full GC time taken seems ok: [Times:
user=23.30 sys=0.06, real=1.94 secs]



Suggestions, please?

Huge thanks for useful suggestions,
Arun


Re: job keeps failing with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1

2015-02-27 Thread Kelvin Chu
Hi Darin, you might increase spark.yarn.executor.memoryOverhead to see if
it fixes the problem. Please take a look of this report:
https://issues.apache.org/jira/browse/SPARK-4996

On Fri, Feb 27, 2015 at 12:38 AM, Arush Kharbanda <
ar...@sigmoidanalytics.com> wrote:

> Can you share what error you are getting when the job fails.
>
> On Thu, Feb 26, 2015 at 4:32 AM, Darin McBeath <
> ddmcbe...@yahoo.com.invalid> wrote:
>
>> I'm using Spark 1.2, stand-alone cluster on ec2 I have a cluster of 8
>> r3.8xlarge machines but limit the job to only 128 cores.  I have also tried
>> other things such as setting 4 workers per r3.8xlarge and 67gb each but
>> this made no difference.
>>
>> The job frequently fails at the end in this step (saveasHadoopFile).   It
>> will sometimes work.
>>
>> finalNewBaselinePairRDD is hashPartitioned with 1024 partitions and a
>> total size around 1TB.  There are about 13.5M records in
>> finalNewBaselinePairRDD.  finalNewBaselinePairRDD is 
>>
>>
>> JavaPairRDD finalBaselineRDDWritable =
>> finalNewBaselinePairRDD.mapToPair(new
>> ConvertToWritableTypes()).persist(StorageLevel.MEMORY_AND_DISK_SER());
>>
>> // Save to hdfs (gzip)
>> finalBaselineRDDWritable.saveAsHadoopFile("hdfs:///sparksync/",
>> Text.class, Text.class,
>> SequenceFileOutputFormat.class,org.apache.hadoop.io.compress.GzipCodec.class);
>>
>>
>> If anyone has any tips for what I should look into it would be
>> appreciated.
>>
>> Thanks.
>>
>> Darin.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>


Re: Running out of space (when there's no shortage)

2015-02-27 Thread Kelvin Chu
Hi Joe, you might increase spark.yarn.executor.memoryOverhead to see if it
fixes the problem. Please take a look of this report:
https://issues.apache.org/jira/browse/SPARK-4996

Hope this helps.

On Tue, Feb 24, 2015 at 2:05 PM, Yiannis Gkoufas 
wrote:

> No problem, Joe. There you go
> https://issues.apache.org/jira/browse/SPARK-5081
> And also there is this one
> https://issues.apache.org/jira/browse/SPARK-5715 which is marked as
> resolved
>
> On 24 February 2015 at 21:51, Joe Wass  wrote:
>
>> Thanks everyone.
>>
>> Yiannis, do you know if there's a bug report for this regression? For
>> some other (possibly connected) reason I upgraded from 1.1.1 to 1.2.1, but
>> I can't remember what the bug was.
>>
>> Joe
>>
>>
>>
>>
>> On 24 February 2015 at 19:26, Yiannis Gkoufas 
>> wrote:
>>
>>> Hi there,
>>>
>>> I assume you are using spark 1.2.1 right?
>>> I faced the exact same issue and switched to 1.1.1 with the same
>>> configuration and it was solved.
>>> On 24 Feb 2015 19:22, "Ted Yu"  wrote:
>>>
 Here is a tool which may give you some clue:
 http://file-leak-detector.kohsuke.org/

 Cheers

 On Tue, Feb 24, 2015 at 11:04 AM, Vladimir Rodionov <
 vrodio...@splicemachine.com> wrote:

> Usually it happens in Linux when application deletes file w/o double
> checking that there are no open FDs (resource leak). In this case, Linux
> holds all space allocated and does not release it until application
> exits (crashes in your case). You check file system and everything is
> normal, you have enough space and you have no idea why does application
> report "no space left on device".
>
> Just a guess.
>
> -Vladimir Rodionov
>
> On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass  wrote:
>
>> I'm running a cluster of 3 Amazon EC2 machines (small number because
>> it's expensive when experiments keep crashing after a day!).
>>
>> Today's crash looks like this (stacktrace at end of message).
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
>> output location for shuffle 0
>>
>> On my three nodes, I have plenty of space and inodes:
>>
>> A $ df -i
>> FilesystemInodes   IUsed   IFree IUse% Mounted on
>> /dev/xvda1524288   97937  426351   19% /
>> tmpfs1909200   1 19091991% /dev/shm
>> /dev/xvdb2457600  54 24575461% /mnt
>> /dev/xvdc2457600  24 24575761% /mnt2
>> /dev/xvds831869296   23844 8318454521% /vol0
>>
>> A $ df -h
>> FilesystemSize  Used Avail Use% Mounted on
>> /dev/xvda17.9G  3.4G  4.5G  44% /
>> tmpfs 7.3G 0  7.3G   0% /dev/shm
>> /dev/xvdb  37G  1.2G   34G   4% /mnt
>> /dev/xvdc  37G  177M   35G   1% /mnt2
>> /dev/xvds1000G  802G  199G  81% /vol0
>>
>> B $ df -i
>> FilesystemInodes   IUsed   IFree IUse% Mounted on
>> /dev/xvda1524288   97947  426341   19% /
>> tmpfs1906639   1 19066381% /dev/shm
>> /dev/xvdb2457600  54 24575461% /mnt
>> /dev/xvdc2457600  24 24575761% /mnt2
>> /dev/xvds816200704   24223 8161764811% /vol0
>>
>> B $ df -h
>> FilesystemSize  Used Avail Use% Mounted on
>> /dev/xvda17.9G  3.6G  4.3G  46% /
>> tmpfs 7.3G 0  7.3G   0% /dev/shm
>> /dev/xvdb  37G  1.2G   34G   4% /mnt
>> /dev/xvdc  37G  177M   35G   1% /mnt2
>> /dev/xvds1000G  805G  195G  81% /vol0
>>
>> C $df -i
>> FilesystemInodes   IUsed   IFree IUse% Mounted on
>> /dev/xvda1524288   97938  426350   19% /
>> tmpfs1906897   1 19068961% /dev/shm
>> /dev/xvdb2457600  54 24575461% /mnt
>> /dev/xvdc2457600  24 24575761% /mnt2
>> /dev/xvds755218352   24024 7551943281% /vol0
>> root@ip-10-204-136-223 ~]$
>>
>> C $ df -h
>> FilesystemSize  Used Avail Use% Mounted on
>> /dev/xvda17.9G  3.4G  4.5G  44% /
>> tmpfs 7.3G 0  7.3G   0% /dev/shm
>> /dev/xvdb  37G  1.2G   34G   4% /mnt
>> /dev/xvdc  37G  177M   35G   1% /mnt2
>> /dev/xvds1000G  820G  181G  82% /vol0
>>
>> The devices may be ~80% full but that still leaves ~200G free on
>> each. My spark-env.sh has
>>
>> export SPARK_LOCAL_DIRS="/vol0/spark"
>>
>> I have manually verified that on each slave the only temporary files
>> are stored on /vol0, all looking something like this
>>
>>
>> /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884
>>
>> So it 

Re: Iterating on RDDs

2015-02-27 Thread Vijayasarathy Kannan
As you suggested, I tried to save the grouped RDD and persisted it in
memory before the iterations begin. The performance seems to be much better
now.

My previous comment that the run times doubled was from a wrong observation.

Thanks.


On Fri, Feb 27, 2015 at 10:27 AM, Vijayasarathy Kannan 
wrote:

> Thanks.
>
> I tried persist() on the RDD. The runtimes appear to have doubled now
> (without persist() it was ~7s per iteration and now its ~15s). I am running
> standalone Spark on a 8-core machine.
> Any thoughts on why the increase in runtime?
>
> On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid 
> wrote:
>
>>
>> val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
>> // or whatever persistence makes more sense for you ...
>> while(true) {
>>   val res = grouped.flatMap(F)
>>   res.collect.foreach(func)
>>   if(criteria)
>>  break
>> }
>>
>> On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan 
>> wrote:
>>
>>> Hi,
>>>
>>> I have the following use case.
>>>
>>> (1) I have an RDD of edges of a graph (say R).
>>> (2) do a groupBy on R (by say source vertex) and call a function F on
>>> each group.
>>> (3) collect the results from Fs and do some computation
>>> (4) repeat the above steps until some criteria is met
>>>
>>> In (2), the groups are always going to be the same (since R is grouped
>>> by source vertex).
>>>
>>> Question:
>>> Is R distributed every iteration (when in (2)) or is it distributed only
>>> once when it is created?
>>>
>>> A sample code snippet is below.
>>>
>>> while(true) {
>>>   val res = R.groupBy[VertexId](G).flatMap(F)
>>>   res.collect.foreach(func)
>>>   if(criteria)
>>>  break
>>> }
>>>
>>> Since the groups remain the same, what is the best way to go about
>>> implementing the above logic?
>>>
>>
>>
>


Re: Global sequential access of elements in RDD

2015-02-27 Thread Imran Rashid
Why would you want to use spark to sequentially process your entire data
set?  The entire purpose is to let you do distributed processing -- which
means letting partitions get processed simultaneously by different cores /
nodes.

that being said, occasionally in a bigger pipeline with a lot of
distributed operations, you might need to do one segment in a completely
sequential manner.  You have a few options -- just be aware that with all
of them, you are working *around* the idea of an RDD, so make sure you have
a really good reason.

1) rdd.toLocalIterator.  Still pulls all of the data to the driver, just
like rdd.collect(), but its slightly more scalable since it won't store
*all* of the data in memory on the driver (it does still store all of the
data in one partition in memory, though.)

2) write the rdd to some external data storage (eg. hdfs), and then read
the data sequentially off of hdfs on your driver.  Still needs to pull all
of the data to the driver, but you can get it to avoid pulling an entire
partition into memory and make it streaming.

3) create a number of rdds that consist of just one partition of your
original rdd, and then execute actions on them sequentially:

val originalRDD = ... //this should be cached to make sure you don't
recompute it
(0 until originalRDD.partitions.size).foreach{partitionIdx =>
  val prunedRdd = new PartitionPruningRDD(originalRDD, {x => x ==
partitionIdx})
  prunedRDD.runSomeActionHere()
}

note that PartitionPruningRDD is a developer api, however.  This will run
your action on one partition at a time, and ideally the tasks will be
scheduled on the same node where the partitions have been cached, so you
don't need to move the data around.  But again, b/c you're turning it into
a sequential program, most of your cluster is sitting idle, and your not
really leveraging spark ...


imran

On Fri, Feb 27, 2015 at 1:38 AM, Wush Wu  wrote:

> Dear all,
>
> I want to implement some sequential algorithm on RDD.
>
> For example:
>
> val conf = new SparkConf()
>   conf.setMaster("local[2]").
>   setAppName("SequentialSuite")
> val sc = new SparkContext(conf)
> val rdd = sc.
>parallelize(Array(1, 3, 2, 7, 1, 4, 2, 5, 1, 8, 9), 2).
>sortBy(x => x, true)
> rdd.foreach(println)
>
> I want to see the ordered number on my screen, but it shows unordered
> integers. The two partitions execute the println simultaneously.
>
> How do I make the RDD execute a function globally sequential?
>
> Best,
> Wush
>


RE: group by order by fails

2015-02-27 Thread Tridib Samanta
Thanks Michael! It worked. Some how my mails are not getting accepted by spark 
user mailing list. :(
 
From: mich...@databricks.com
Date: Thu, 26 Feb 2015 17:49:43 -0800
Subject: Re: group by order by fails
To: tridib.sama...@live.com
CC: ak...@sigmoidanalytics.com; user@spark.apache.org

Assign an alias to the count in the select clause and use that alias in the 
order by clause.
On Wed, Feb 25, 2015 at 11:17 PM, Tridib Samanta  
wrote:



Actually I just realized , I am using 1.2.0.
 
Thanks
Tridib
 
Date: Thu, 26 Feb 2015 12:37:06 +0530
Subject: Re: group by order by fails
From: ak...@sigmoidanalytics.com
To: tridib.sama...@live.com
CC: user@spark.apache.org

Which version of spark are you having? It seems there was a similar Jira 
https://issues.apache.org/jira/browse/SPARK-2474ThanksBest Regards

On Thu, Feb 26, 2015 at 12:03 PM, tridib  wrote:
Hi,

I need to find top 10 most selling samples. So query looks like:

select  s.name, count(s.name) from sample s group by s.name order by

count(s.name)



This query fails with following error:

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:

Sort [COUNT(name#0) ASC], true

 Exchange (RangePartitioning [COUNT(name#0) ASC], 200)

  Aggregate false, [name#0], [name#0 AS

name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0]

   Exchange (HashPartitioning [name#0], 200)

Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L]

 PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at

JavaSQLContext.scala:102



at

org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:206)

at 
org.apache.spark.sql.execution.Project.execute(basicOperators.scala:43)

at

org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)

at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444)

at

org.apache.spark.sql.api.java.JavaSchemaRDD.collect(JavaSchemaRDD.scala:114)

at

com.edifecs.platform.df.analytics.spark.domain.dao.OrderByTest.testGetVisitDistributionByPrimaryDx(OrderByTest.java:48)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at

org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)

at

org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

at

org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)

at

org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)

at

org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)

at

org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)

at org.junit.runners.ParentRunner.run(ParentRunner.java:309)

at org.junit.runner.JUnitCore.run(JUnitCore.java:160)

at

com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)

at

com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)

at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:121)

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:

execute, tree:

Exchange (RangePartitioning [COUNT(name#0) ASC], 200)

 Aggregate false, [name#0], [name#0 AS

name#1,Coalesce(SUM(PartialCount#4L),0) AS count#2L,name#0]

  Exchange (HashPartitioning [name#0], 200)

   Aggregate true, [name#0], [name#0,COUNT(name#0) AS PartialCount#4L]

PhysicalRDD [name#0], MapPartitionsRDD[1] at mapPartitions at

JavaSQLContext.scala:102



at

org.apache.spark.sql.catalyst.

Re: Running spark function on parquet without sql

2015-02-27 Thread tridib
Somehow my posts are not getting excepted, and replies are not visible here.
But I got following reply from Zhan.

>From Zhan Zhang's reply, yes I still get the parquet's advantage. 

My next question is, if I operate on SchemaRdd will I get the advantage of
Spark SQL's in memory columnar store when cached the table using
cacheTable()?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-function-on-parquet-without-sql-tp21833p21850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: High CPU usage in Driver

2015-02-27 Thread Paweł Szulc
Thanks for coming back to the list  with response!

pt., 27 lut 2015, 3:16 PM Himanish Kushary użytkownik 
napisał:

> Hi,
>
> I was able to solve the issue. Putting down the settings that worked for
> me.
>
> 1) It was happening due to the large number of partitions.I *coalesce*'d
> the RDD as early as possible in my code into lot less partitions ( used .
> coalesce(1) to bring down from 500K to 10k)
>
> 2) Increased the settings for the parameters *spark.akka.frameSize (=
> 500), **spark.akka.timeout,**spark.akka.askTimeout and 
> **spark.core.connection.ack.wait.timeout
> *to get rid of any insufficient frame size and timeout errors
>
> Thanks
> Himanish
>
> On Thu, Feb 26, 2015 at 5:00 PM, Himanish Kushary 
> wrote:
>
>> Hi,
>>
>> I am working with a RDD (PairRDD) with 500K+ partitions. The RDD is
>> loaded into memory , the size is around 18G.
>>
>> Whenever I run a distinct() on the RDD, the driver ( spark-shell in
>> yarn-client mode) host CPU usage rockets up (400+ %) and the distinct()
>> process seems to stall.The spark driver UI also hangs.
>>
>> In ganglia the only node with high load is the driver host. I have tried
>> repartitioning the data into less number of partitions ( using coalesce or
>> repartition) with no luck.
>>
>> I have attached the jstack output which shows few threads in BLOCKED
>> status. Not sure what exactly is going on here.
>>
>> The driver program was started with 15G memory on AWS EMR. Appreciate any
>> thoughts regarding the issue.
>>
>> --
>> Thanks & Regards
>> Himanish
>>
>
>
>
> --
> Thanks & Regards
> Himanish
>


Re: Question about Spark best practice when counting records.

2015-02-27 Thread Darin McBeath
Thanks for you quick reply.  Yes, that would be fine.   I would rather wait/use 
the optimal approach as opposed to hacking some one-off solution.

Darin.



From: Kostas Sakellis 
To: Darin McBeath  
Cc: User  
Sent: Friday, February 27, 2015 12:19 PM
Subject: Re: Question about Spark best practice when counting records.



Hey Darin,

Record count metrics are coming in Spark 1.3. Can you wait until it is 
released? Or do you need a solution in older versions of spark.

Kostas

On Friday, February 27, 2015, Darin McBeath  wrote:



I have a fairly large Spark job where I'm essentially creating quite a few 
RDDs, do several types of joins using these RDDS resulting in a final RDD which 
I write back to S3.
>
>
>Along the way, I would like to capture record counts for some of these RDDs. 
>My initial approach was to use the count action on some of these intermediate  
>RDDS (and cache them since the count would force the materialization of the 
>RDD and the RDD would be needed again later).  This seemed to work 'ok' when 
>my RDDs were fairly small/modest but as they grew in size I started to 
>experience problems.
>
>After watching a recent very good screencast on performance, this doesn't seem 
>the correct approach as I believe I'm really breaking (or hindering) the 
>pipelining concept in Spark.  If I remove all of my  counts, I'm only left 
>with the one job/action (save as Hadoop file at the end).  Spark then seems to 
>run smoother (and quite a bit faster) and I really don't need (or want) to 
>even cache any of my intermediate RDDs.
>
>So, the approach I've been kicking around is to use accumulators instead.  I 
>was already using them to count 'bad' records but why not 'good' records as 
>well? I realize that if I lose a partition that I might over count, but  
>perhaps that is an acceptable trade-off.
>
>I'm guessing that others have ran into this before so I would like to learn 
>from the experience of others and how they have addressed this.
>
>Thanks.
>
>Darin.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Question about Spark best practice when counting records.

2015-02-27 Thread Paweł Szulc
Currently if you use accumulators inside actions (like foreach) you have
guarantee that, even if partition will be recalculated, the values will be
correct.  Same thing does NOT apply to transformations and you can not
relay 100% on the values.

Pawel Szulc

pt., 27 lut 2015, 4:54 PM Darin McBeath użytkownik
 napisał:

> I have a fairly large Spark job where I'm essentially creating quite a few
> RDDs, do several types of joins using these RDDS resulting in a final RDD
> which I write back to S3.
>
>
> Along the way, I would like to capture record counts for some of these
> RDDs. My initial approach was to use the count action on some of these
> intermediate  RDDS (and cache them since the count would force the
> materialization of the RDD and the RDD would be needed again later).  This
> seemed to work 'ok' when my RDDs were fairly small/modest but as they grew
> in size I started to experience problems.
>
> After watching a recent very good screencast on performance, this doesn't
> seem the correct approach as I believe I'm really breaking (or hindering)
> the pipelining concept in Spark.  If I remove all of my  counts, I'm only
> left with the one job/action (save as Hadoop file at the end).  Spark then
> seems to run smoother (and quite a bit faster) and I really don't need (or
> want) to even cache any of my intermediate RDDs.
>
> So, the approach I've been kicking around is to use accumulators instead.
> I was already using them to count 'bad' records but why not 'good' records
> as well? I realize that if I lose a partition that I might over count, but
> perhaps that is an acceptable trade-off.
>
> I'm guessing that others have ran into this before so I would like to
> learn from the experience of others and how they have addressed this.
>
> Thanks.
>
> Darin.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Question about Spark best practice when counting records.

2015-02-27 Thread Kostas Sakellis
Hey Darin,

Record count metrics are coming in Spark 1.3. Can you wait until it is
released? Or do you need a solution in older versions of spark.

Kostas

On Friday, February 27, 2015, Darin McBeath 
wrote:

> I have a fairly large Spark job where I'm essentially creating quite a few
> RDDs, do several types of joins using these RDDS resulting in a final RDD
> which I write back to S3.
>
>
> Along the way, I would like to capture record counts for some of these
> RDDs. My initial approach was to use the count action on some of these
> intermediate  RDDS (and cache them since the count would force the
> materialization of the RDD and the RDD would be needed again later).  This
> seemed to work 'ok' when my RDDs were fairly small/modest but as they grew
> in size I started to experience problems.
>
> After watching a recent very good screencast on performance, this doesn't
> seem the correct approach as I believe I'm really breaking (or hindering)
> the pipelining concept in Spark.  If I remove all of my  counts, I'm only
> left with the one job/action (save as Hadoop file at the end).  Spark then
> seems to run smoother (and quite a bit faster) and I really don't need (or
> want) to even cache any of my intermediate RDDs.
>
> So, the approach I've been kicking around is to use accumulators instead.
> I was already using them to count 'bad' records but why not 'good' records
> as well? I realize that if I lose a partition that I might over count, but
> perhaps that is an acceptable trade-off.
>
> I'm guessing that others have ran into this before so I would like to
> learn from the experience of others and how they have addressed this.
>
> Thanks.
>
> Darin.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>


Re: Spark excludes "fastutil" dependencies we need

2015-02-27 Thread Jim Kleckner
Yes, I used both.

The discussion on this seems to be at github now:
  https://github.com/apache/spark/pull/4780

I am using more classes from a package from which spark uses HyperLogLog as
well.
So we are both including the jar file but Spark is excluding the dependent
package that is required.


On Thu, Feb 26, 2015 at 9:54 AM, Marcelo Vanzin  wrote:

> On Wed, Feb 25, 2015 at 8:42 PM, Jim Kleckner 
> wrote:
> > So, should the userClassPathFirst flag work and there is a bug?
>
> Sorry for jumping in the middle of conversation (and probably missing
> some of it), but note that this option applies only to executors. If
> you're trying to use the class in your driver, there's a separate
> option for that.
>
> Also to note is that if you're adding a class that doesn't exist
> inside the Spark jars, which seems to be the case, this option should
> be irrelevant, since the class loaders should all end up finding the
> one copy of the class that you're adding with your app.
>
> --
> Marcelo
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Spark-excludes-fastutil-dependencies-we-need-tp21849.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Errors in spark

2015-02-27 Thread sandeep vura
Hi yana,

I have removed hive-site.xml from spark/conf directory but still getting
the same errors. Anyother way to work around.

Regards,
Sandeep

On Fri, Feb 27, 2015 at 9:38 PM, Yana Kadiyska 
wrote:

> I think you're mixing two things: the docs say "When* not *configured by
> the hive-site.xml, the context automatically creates metastore_db and
> warehouse in the current directory.". AFAIK if you want a local
> metastore, you don't put hive-site.xml anywhere. You only need the file if
> you're going to point to an external metastore. If you're pointing to an
> external metastore, in my experience I've also had to copy core-site.xml
> into conf in order to specify this property:  fs.defaultFS
>
> On Fri, Feb 27, 2015 at 10:39 AM, sandeep vura 
> wrote:
>
>> Hi Sparkers,
>>
>> I am using hive version - hive 0.13 and copied hive-site.xml in
>> spark/conf and using default derby local metastore .
>>
>> While creating a table in spark shell getting the following error ..Can
>> any one please look and give solution asap..
>>
>> sqlContext.sql("CREATE TABLE IF NOT EXISTS sandeep (key INT, value
>> STRING)")
>> 15/02/27 23:06:13 ERROR RetryingHMSHandler:
>> MetaException(message:file:/user/hive/warehouse_1/sandeep is not a
>> directory or unable to create one)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:622)
>> at
>> org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
>> at
>> com.sun.proxy.$Proxy12.create_table_with_environment_context(Unknown Source)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:558)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:547)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:622)
>> at
>> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
>> at com.sun.proxy.$Proxy13.createTable(Unknown Source)
>> at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:613)
>> at
>> org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4189)
>> at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:281)
>> at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
>> at
>> org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
>> at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503)
>> at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270)
>> at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088)
>> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
>> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
>> at
>> org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
>> at
>> org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
>> at
>> org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
>> at
>> org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
>> at
>> org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
>> at
>> org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
>> at
>> org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
>> at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
>> at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
>> at $line9.$read$$iwC$$iwC$$iwC$$iwC.(:15)
>> at $line9.$read$$iwC$$iwC$$iwC.(:20)
>> at $line9.$read$$iwC$$iwC.(:22)
>> at $line9.$read$$iwC.(:24)
>> at $line9.$read.(:26)
>> at $line9.$read$.(:30)
>> at $line9.$read$.()
>> at $line9.$eval$.(:7)
>> at $line9.$eval$.()
>> at $line9.$eval.$print()
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAc

Re: Errors in spark

2015-02-27 Thread Yana Kadiyska
I think you're mixing two things: the docs say "When* not *configured by
the hive-site.xml, the context automatically creates metastore_db and
warehouse in the current directory.". AFAIK if you want a local metastore,
you don't put hive-site.xml anywhere. You only need the file if you're
going to point to an external metastore. If you're pointing to an external
metastore, in my experience I've also had to copy core-site.xml into conf
in order to specify this property:  fs.defaultFS

On Fri, Feb 27, 2015 at 10:39 AM, sandeep vura 
wrote:

> Hi Sparkers,
>
> I am using hive version - hive 0.13 and copied hive-site.xml in spark/conf
> and using default derby local metastore .
>
> While creating a table in spark shell getting the following error ..Can
> any one please look and give solution asap..
>
> sqlContext.sql("CREATE TABLE IF NOT EXISTS sandeep (key INT, value
> STRING)")
> 15/02/27 23:06:13 ERROR RetryingHMSHandler:
> MetaException(message:file:/user/hive/warehouse_1/sandeep is not a
> directory or unable to create one)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:622)
> at
> org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
> at
> com.sun.proxy.$Proxy12.create_table_with_environment_context(Unknown Source)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:558)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:547)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:622)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
> at com.sun.proxy.$Proxy13.createTable(Unknown Source)
> at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:613)
> at
> org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4189)
> at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:281)
> at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
> at
> org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
> at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503)
> at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270)
> at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
> at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
> at
> org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
> at
> org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
> at
> org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
> at
> org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
> at
> org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
> at
> org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
> at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
> at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
> at $line9.$read$$iwC$$iwC$$iwC$$iwC.(:15)
> at $line9.$read$$iwC$$iwC$$iwC.(:20)
> at $line9.$read$$iwC$$iwC.(:22)
> at $line9.$read$$iwC.(:24)
> at $line9.$read.(:26)
> at $line9.$read$.(:30)
> at $line9.$read$.()
> at $line9.$eval$.(:7)
> at $line9.$eval$.()
> at $line9.$eval.$print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:622)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
> at
> org.apache.spark.

Re: Iterating on RDDs

2015-02-27 Thread Vijayasarathy Kannan
Thanks.

I tried persist() on the RDD. The runtimes appear to have doubled now
(without persist() it was ~7s per iteration and now its ~15s). I am running
standalone Spark on a 8-core machine.
Any thoughts on why the increase in runtime?

On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid  wrote:

>
> val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
> // or whatever persistence makes more sense for you ...
> while(true) {
>   val res = grouped.flatMap(F)
>   res.collect.foreach(func)
>   if(criteria)
>  break
> }
>
> On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan 
> wrote:
>
>> Hi,
>>
>> I have the following use case.
>>
>> (1) I have an RDD of edges of a graph (say R).
>> (2) do a groupBy on R (by say source vertex) and call a function F on
>> each group.
>> (3) collect the results from Fs and do some computation
>> (4) repeat the above steps until some criteria is met
>>
>> In (2), the groups are always going to be the same (since R is grouped by
>> source vertex).
>>
>> Question:
>> Is R distributed every iteration (when in (2)) or is it distributed only
>> once when it is created?
>>
>> A sample code snippet is below.
>>
>> while(true) {
>>   val res = R.groupBy[VertexId](G).flatMap(F)
>>   res.collect.foreach(func)
>>   if(criteria)
>>  break
>> }
>>
>> Since the groups remain the same, what is the best way to go about
>> implementing the above logic?
>>
>
>


Question about Spark best practice when counting records.

2015-02-27 Thread Darin McBeath
I have a fairly large Spark job where I'm essentially creating quite a few 
RDDs, do several types of joins using these RDDS resulting in a final RDD which 
I write back to S3.


Along the way, I would like to capture record counts for some of these RDDs. My 
initial approach was to use the count action on some of these intermediate  
RDDS (and cache them since the count would force the materialization of the RDD 
and the RDD would be needed again later).  This seemed to work 'ok' when my 
RDDs were fairly small/modest but as they grew in size I started to experience 
problems.

After watching a recent very good screencast on performance, this doesn't seem 
the correct approach as I believe I'm really breaking (or hindering) the 
pipelining concept in Spark.  If I remove all of my  counts, I'm only left with 
the one job/action (save as Hadoop file at the end).  Spark then seems to run 
smoother (and quite a bit faster) and I really don't need (or want) to even 
cache any of my intermediate RDDs.

So, the approach I've been kicking around is to use accumulators instead.  I 
was already using them to count 'bad' records but why not 'good' records as 
well? I realize that if I lose a partition that I might over count, but  
perhaps that is an acceptable trade-off.

I'm guessing that others have ran into this before so I would like to learn 
from the experience of others and how they have addressed this.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Errors in spark

2015-02-27 Thread sandeep vura
Hi Sparkers,

I am using hive version - hive 0.13 and copied hive-site.xml in spark/conf
and using default derby local metastore .

While creating a table in spark shell getting the following error ..Can any
one please look and give solution asap..

sqlContext.sql("CREATE TABLE IF NOT EXISTS sandeep (key INT, value STRING)")
15/02/27 23:06:13 ERROR RetryingHMSHandler:
MetaException(message:file:/user/hive/warehouse_1/sandeep is not a
directory or unable to create one)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
at com.sun.proxy.$Proxy12.create_table_with_environment_context(Unknown
Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:558)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:547)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
at com.sun.proxy.$Proxy13.createTable(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:613)
at org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4189)
at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:281)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
at
org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
at
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
at
org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
at
org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at
org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
at $line9.$read$$iwC$$iwC$$iwC$$iwC.(:15)
at $line9.$read$$iwC$$iwC$$iwC.(:20)
at $line9.$read$$iwC$$iwC.(:22)
at $line9.$read$$iwC.(:24)
at $line9.$read.(:26)
at $line9.$read$.(:30)
at $line9.$read$.()
at $line9.$eval$.(:7)
at $line9.$eval$.()
at $line9.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkI

java.util.NoSuchElementException: key not found:

2015-02-27 Thread rok
I'm seeing this java.util.NoSuchElementException: key not found: exception
pop up sometimes when I run operations on an RDD from multiple threads in a
python application. It ends up shutting down the SparkContext so I'm
assuming this is a bug -- from what I understand, I should be able to run
operations on the same RDD from multiple threads or is this not recommended? 

I can't reproduce it all the time and I've tried eliminating caching
wherever possible to see if that would have an effect, but it doesn't seem
to. Each thread first splits the base RDD and then runs the
LogisticRegressionWithSGD on the subset.  

Is there a workaround to this exception? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-util-NoSuchElementException-key-not-found-tp21848.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Failed to parse Hive query

2015-02-27 Thread Anusha Shamanur
Hi,

I am trying to do this in spark-shell:

val hiveCtx = neworg.apache.spark.sql.hive.HiveContext(sc) val
listTables =hiveCtx.hql("show tables")

The second line fails to execute with this message:

warning: there were 1 deprecation warning(s); re-run with -deprecation for
details org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse:
show tables at
org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at
scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)

... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused
by: java.lang.NullPointerException: Conf non-local session path expected to
be non-null at
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at
org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:586)
at org.apache.hadoop.hive.ql.Context.(Context.java:129) at
org.apache.hadoop.hive.ql.Context.(Context.java:116) at
org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227) at
org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:240) ... 87 more


Any help would be appreciated.



-- 
Sent from Gmail mobile


Re: SparkStreaming failing with exception Could not compute split, block input

2015-02-27 Thread Mukesh Jha
Also my job is map only so there is no shuffle/reduce phase.

On Fri, Feb 27, 2015 at 7:10 PM, Mukesh Jha  wrote:

> I'm streamin data from kafka topic using kafkautils & doing some
> computation and writing records to hbase.
>
> Storage level is memory-and-disk-ser
> On 27 Feb 2015 16:20, "Akhil Das"  wrote:
>
>> You could be hitting this issue
>> https://issues.apache.org/jira/browse/SPARK-4516
>> Apart from that little more information about your job would be helpful.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha 
>> wrote:
>>
>>> Hi Experts,
>>>
>>> My Spark Job is failing with below error.
>>>
>>> From the logs I can see that input-3-1424842351600 was added at 5:32:32
>>> and was never purged out of memory. Also the available free memory for the
>>> executor is *2.1G*.
>>>
>>> Please help me figure out why executors cannot fetch this input.
>>>
>>> Txz for any help, Cheers.
>>>
>>>
>>> *Logs*
>>> 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
>>> input-3-1424842351600 in memory on
>>> chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB)
>>> .
>>> .
>>> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
>>> input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
>>> (size: 232.3 KB, free: 2.1 GB)
>>> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
>>> input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
>>> (size: 291.4 KB, free: 2.1 GB)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
>>> stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
>>> stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
>>> stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
>>> stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
>>> stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
>>> stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
>>> stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
>>> stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
>>> stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
>>> stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
>>> 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
>>> Could not compute split, block input-3-1424842351600 not found
>>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>> at
>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
>>> 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
>>> Could not compute split, block input-3-1424842355600 not found
>>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>>
>>> --
>>> Thanks & Regards,
>>>
>>> *Mukesh Jha *
>>>
>>
>>


-- 


Thanks & Regards,

*Mukesh Jha *


Re: High CPU usage in Driver

2015-02-27 Thread Himanish Kushary
Hi,

I was able to solve the issue. Putting down the settings that worked for me.

1) It was happening due to the large number of partitions.I *coalesce*'d
the RDD as early as possible in my code into lot less partitions ( used .
coalesce(1) to bring down from 500K to 10k)

2) Increased the settings for the parameters *spark.akka.frameSize (=
500), **spark.akka.timeout,**spark.akka.askTimeout and
**spark.core.connection.ack.wait.timeout
*to get rid of any insufficient frame size and timeout errors

Thanks
Himanish

On Thu, Feb 26, 2015 at 5:00 PM, Himanish Kushary 
wrote:

> Hi,
>
> I am working with a RDD (PairRDD) with 500K+ partitions. The RDD is loaded
> into memory , the size is around 18G.
>
> Whenever I run a distinct() on the RDD, the driver ( spark-shell in
> yarn-client mode) host CPU usage rockets up (400+ %) and the distinct()
> process seems to stall.The spark driver UI also hangs.
>
> In ganglia the only node with high load is the driver host. I have tried
> repartitioning the data into less number of partitions ( using coalesce or
> repartition) with no luck.
>
> I have attached the jstack output which shows few threads in BLOCKED
> status. Not sure what exactly is going on here.
>
> The driver program was started with 15G memory on AWS EMR. Appreciate any
> thoughts regarding the issue.
>
> --
> Thanks & Regards
> Himanish
>



-- 
Thanks & Regards
Himanish


Re: Speed Benchmark

2015-02-27 Thread Jason Bell

How many machines are on the cluster?
And what is the configuration of those machines (Cores/RAM)?

"Small cluster" is very subjective statement.


Guillaume Guy wrote:

Dear Spark users:

I want to see if anyone has an idea of the performance for a small 
cluster.




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Speed Benchmark

2015-02-27 Thread Sean Owen
That's very slow, and there are a lot of possible explanations. The
first one that comes to mind is: I assume your YARN and HDFS are on
the same machines, but are you running executors on all HDFS nodes
when you run this? if not, a lot of these reads could be remote.

You have 6 executor slots, but your data exists in 96 blocks on HDFS.
You could read with up to 96-way parallelism. You say you're CPU-bound
though, but normally I'd wonder if this was simply a case of
under-using parallelism.

I also wonder if the bottleneck is something to do with pyspark in
this case; might be good to just try it in the spark-shell to check.

On Fri, Feb 27, 2015 at 2:00 PM, Guillaume Guy
 wrote:
> Dear Spark users:
>
> I want to see if anyone has an idea of the performance for a small cluster.
>
> Reading from HDFS, what should be the performance of  a count() operation on
> an 10GB RDD with 100M rows using pyspark. I looked into the CPU usage, all 6
> are at 100%.
>
> Details:
>
> master yarn-client
> num-executors 3
> executor-cores 2
> driver-memory 5g
> executor-memory 2g
> Distribution: Cloudera
>
> I also attached the screenshot.
>
> Right now, I'm at 17 minutes which seems quite slow. Any idea how a decent
> performance with similar configuration?
>
> If it's way off, I would appreciate any pointers as to ways to improve
> performance.
>
> Thanks.
>
> Best,
>
> Guillaume
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Speed Benchmark

2015-02-27 Thread Guillaume Guy
Dear Spark users:

I want to see if anyone has an idea of the performance for a small cluster.

Reading from HDFS, what should be the performance of  a count() operation
on an 10GB RDD with 100M rows using pyspark. I looked into the CPU usage,
all 6 are at 100%.

Details:

   - master yarn-client
   - num-executors 3
   - executor-cores 2
   - driver-memory 5g
   - executor-memory 2g
   - Distribution: Cloudera

I also attached the screenshot.

Right now, I'm at 17 minutes which seems quite slow. Any idea how a decent
performance with similar configuration?

If it's way off, I would appreciate any pointers as to ways to improve
performance.

Thanks.

Best,

Guillaume

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Spark partial data in memory/and partial in disk

2015-02-27 Thread Siddharth Ubale
Hi,

How do we manage putting partial data in to memory and partial into disk where 
data resides in hive table ?
We have tried using the available documentation but unable to go ahead with 
above approach , we are only able to cache the entire table or uncache it.

Thanks,
Siddharth Ubale,
Synchronized Communications
#43, Velankani Tech Park, Block No. II,
3rd Floor, Electronic City Phase I,
Bangalore – 560 100
Tel : +91 80 3202 4060
Web: www.syncoms.com
[LogoNEWmohLARGE]
London|Bangalore|Orlando

we innovate, plan, execute, and transform the business​



Re: SparkStreaming failing with exception Could not compute split, block input

2015-02-27 Thread Mukesh Jha
I'm streamin data from kafka topic using kafkautils & doing some
computation and writing records to hbase.

Storage level is memory-and-disk-ser
On 27 Feb 2015 16:20, "Akhil Das"  wrote:

> You could be hitting this issue
> https://issues.apache.org/jira/browse/SPARK-4516
> Apart from that little more information about your job would be helpful.
>
> Thanks
> Best Regards
>
> On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha 
> wrote:
>
>> Hi Experts,
>>
>> My Spark Job is failing with below error.
>>
>> From the logs I can see that input-3-1424842351600 was added at 5:32:32
>> and was never purged out of memory. Also the available free memory for the
>> executor is *2.1G*.
>>
>> Please help me figure out why executors cannot fetch this input.
>>
>> Txz for any help, Cheers.
>>
>>
>> *Logs*
>> 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
>> input-3-1424842351600 in memory on
>> chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB)
>> .
>> .
>> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
>> input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
>> (size: 232.3 KB, free: 2.1 GB)
>> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
>> input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
>> (size: 291.4 KB, free: 2.1 GB)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
>> stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
>> stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
>> stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
>> stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
>> stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
>> stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
>> stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
>> stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
>> stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
>> stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
>> 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
>> Could not compute split, block input-3-1424842351600 not found
>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
>> 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
>> Could not compute split, block input-3-1424842355600 not found
>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>
>> --
>> Thanks & Regards,
>>
>> *Mukesh Jha *
>>
>
>


Re: Unable to run hive queries inside spark

2015-02-27 Thread sandeep vura
Hi Kundan,

Sorry even i am also facing the similar issue today.How did you resolve
this issue?

Regards,
Sandeep.v

On Thu, Feb 26, 2015 at 2:25 AM, Michael Armbrust 
wrote:

> It looks like that is getting interpreted as a local path.  Are you
> missing a core-site.xml file to configure hdfs?
>
> On Tue, Feb 24, 2015 at 10:40 PM, kundan kumar 
> wrote:
>
>> Hi Denny,
>>
>> yes the user has all the rights to HDFS. I am running all the spark
>> operations with this user.
>>
>> and my hive-site.xml looks like this
>>
>>  
>> hive.metastore.warehouse.dir
>> /user/hive/warehouse
>> location of default database for the
>> warehouse
>>   
>>
>> Do I need to do anything explicitly other than placing hive-site.xml in
>> the spark.conf directory ?
>>
>> Thanks !!
>>
>>
>>
>> On Wed, Feb 25, 2015 at 11:42 AM, Denny Lee 
>> wrote:
>>
>>> The error message you have is:
>>>
>>> FAILED: Execution Error, return code 1 from 
>>> org.apache.hadoop.hive.ql.exec.DDLTask.
>>> MetaException(message:file:/user/hive/warehouse/src is not a directory
>>> or unable to create one)
>>>
>>> Could you verify that you (the user you are running under) has the
>>> rights to create the necessary folders within HDFS?
>>>
>>>
>>> On Tue, Feb 24, 2015 at 9:06 PM kundan kumar 
>>> wrote:
>>>
 Hi ,

 I have placed my hive-site.xml inside spark/conf and i am trying to
 execute some hive queries given in the documentation.

 Can you please suggest what wrong am I doing here.



 scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 hiveContext: org.apache.spark.sql.hive.HiveContext =
 org.apache.spark.sql.hive.HiveContext@3340a4b8

 scala> hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value
 STRING)")
 warning: there were 1 deprecation warning(s); re-run with -deprecation
 for details
 15/02/25 10:30:59 INFO ParseDriver: Parsing command: CREATE TABLE IF
 NOT EXISTS src (key INT, value STRING)
 15/02/25 10:30:59 INFO ParseDriver: Parse Completed
 15/02/25 10:30:59 INFO HiveMetaStore: 0: Opening raw store with
 implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
 15/02/25 10:30:59 INFO ObjectStore: ObjectStore, initialize called
 15/02/25 10:30:59 INFO Persistence: Property datanucleus.cache.level2
 unknown - will be ignored
 15/02/25 10:30:59 INFO Persistence: Property
 hive.metastore.integral.jdo.pushdown unknown - will be ignored
 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in
 CLASSPATH (or one of dependencies)
 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in
 CLASSPATH (or one of dependencies)
 15/02/25 10:31:08 INFO ObjectStore: Setting MetaStore object pin
 classes with
 hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
 15/02/25 10:31:08 INFO MetaStoreDirectSql: MySQL check failed, assuming
 we are not on mysql: Lexical error at line 1, column 5.  Encountered: "@"
 (64), after : "".
 15/02/25 10:31:09 INFO Datastore: The class
 "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
 "embedded-only" so does not have its own datastore table.
 15/02/25 10:31:09 INFO Datastore: The class
 "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
 "embedded-only" so does not have its own datastore table.
 15/02/25 10:31:15 INFO Datastore: The class
 "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
 "embedded-only" so does not have its own datastore table.
 15/02/25 10:31:15 INFO Datastore: The class
 "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
 "embedded-only" so does not have its own datastore table.
 15/02/25 10:31:17 INFO ObjectStore: Initialized ObjectStore
 15/02/25 10:31:17 WARN ObjectStore: Version information not found in
 metastore. hive.metastore.schema.verification is not enabled so recording
 the schema version 0.13.1aa
 15/02/25 10:31:18 INFO HiveMetaStore: Added admin role in metastore
 15/02/25 10:31:18 INFO HiveMetaStore: Added public role in metastore
 15/02/25 10:31:18 INFO HiveMetaStore: No user is added in admin role,
 since config is empty
 15/02/25 10:31:18 INFO SessionState: No Tez session required at this
 point. hive.execution.engine=mr.
 15/02/25 10:31:18 INFO PerfLogger: >>> from=org.apache.hadoop.hive.ql.Driver>
 15/02/25 10:31:18 INFO PerfLogger: >>> from=org.apache.hadoop.hive.ql.Driver>
 15/02/25 10:31:18 INFO Driver: Concurrency mode is disabled, not
 creating a lock manager
 15/02/25 10:31:18 INFO PerfLogger: >>> from=org.apache.hadoop.hive.ql.Driver>
 15/02/25 10:31:18 INFO PerfLogger: >>> from=org.apache.hadoop.hive.ql.Driver>
 15/02/25 10:31:18 INFO ParseDriver: Parsing command: CREATE TABLE IF
 NOT EXISTS src (key INT, value STRING)
>>>

Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-02-27 Thread Ritesh Kumar Singh
try using breeze (scala linear algebra library)

On Fri, Feb 27, 2015 at 5:56 PM, shahab  wrote:

> Thanks a lot Vijay, let me see how it performs.
>
> Best
> Shahab
>
>
> On Friday, February 27, 2015, Vijay Saraswat  wrote:
>
>> Available in GML --
>>
>> http://x10-lang.org/x10-community/applications/global-matrix-library.html
>>
>> We are exploring how to make it available within Spark. Any ideas would
>> be much appreciated.
>>
>> On 2/27/15 7:01 AM, shahab wrote:
>>
>>> Hi,
>>>
>>> I just wonder if there is any Sparse Matrix implementation available  in
>>> Spark, so it can be used in spark application?
>>>
>>> best,
>>> /Shahab
>>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-02-27 Thread shahab
Thanks a lot Vijay, let me see how it performs.

Best
Shahab

On Friday, February 27, 2015, Vijay Saraswat  wrote:

> Available in GML --
>
> http://x10-lang.org/x10-community/applications/global-matrix-library.html
>
> We are exploring how to make it available within Spark. Any ideas would be
> much appreciated.
>
> On 2/27/15 7:01 AM, shahab wrote:
>
>> Hi,
>>
>> I just wonder if there is any Sparse Matrix implementation available  in
>> Spark, so it can be used in spark application?
>>
>> best,
>> /Shahab
>>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-02-27 Thread Vijay Saraswat

Available in GML --

http://x10-lang.org/x10-community/applications/global-matrix-library.html

We are exploring how to make it available within Spark. Any ideas would 
be much appreciated.


On 2/27/15 7:01 AM, shahab wrote:

Hi,

I just wonder if there is any Sparse Matrix implementation available 
 in Spark, so it can be used in spark application?


best,
/Shahab



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-02-27 Thread shahab
Thanks,

But do you know if access to Coordinated matrix elements is almost as fast
as a normal matrix or it has access time similar to RDD ( relatively slow)?
I am looking for some fast access sparse matrix data structure.



On Friday, February 27, 2015, Peter Rudenko  wrote:

>  Yes, it's called Coordinated Matrix(
> http://spark.apache.org/docs/latest/mllib-data-types.html#coordinatematrix)
> you need to fill it with elemets of type MatrixEntry( (Long, Long,
> Double))
>
>
> Thanks,
> Peter Rudenko
> On 2015-02-27 14:01, shahab wrote:
>
> Hi,
>
>  I just wonder if there is any Sparse Matrix implementation available  in
> Spark, so it can be used in spark application?
>
>  best,
> /Shahab
>
>
>


Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-02-27 Thread Peter Rudenko
Yes, it's called Coordinated 
Matrix(http://spark.apache.org/docs/latest/mllib-data-types.html#coordinatematrix) 
you need to fill it with elemets of type MatrixEntry( (Long, Long, Double))



Thanks,
Peter Rudenko
On 2015-02-27 14:01, shahab wrote:

Hi,

I just wonder if there is any Sparse Matrix implementation available 
 in Spark, so it can be used in spark application?


best,
/Shahab




Is there any Sparse Matrix implementation in Spark/MLib?

2015-02-27 Thread shahab
Hi,

I just wonder if there is any Sparse Matrix implementation available  in
Spark, so it can be used in spark application?

best,
/Shahab


Re: how to map and filter in one step?

2015-02-27 Thread Jeffrey Jedele
Hi,
we are using RDD#mapPartitions() to achieve the same.

Are there advantages/disadvantages of using one method over the other?

Regards,
Jeff

2015-02-26 20:02 GMT+01:00 Mark Hamstra :

> rdd.map(foo).filter(bar) and rdd.filter(bar).map(foo) will each already be
> pipelined into a single stage, so there generally isn't any need to
> complect the map and filter into a single function.
>
> Additionally, there is RDD#collect[U](f: PartialFunction[T, U])(implicit
> arg0: ClassTag[U]): RDD[U], which only applies the partial function to
> those elements of the RDD for which f is defined.
>
> On Thu, Feb 26, 2015 at 10:49 AM, Crystal Xing 
> wrote:
>
>> I see.
>> The reason we can use flatmap to map to null but not using map to map to
>> null is because
>> flatmap supports map to zero and more  but map only support 1-1 mapping?
>>
>> It seems Flatmap is more equivalent to haddop's map.
>>
>>
>> Thanks,
>>
>> Zheng zhen
>>
>> On Thu, Feb 26, 2015 at 10:44 AM, Sean Owen  wrote:
>>
>>> You can flatMap:
>>>
>>> rdd.flatMap { in =>
>>>   if (condition(in)) {
>>> Some(transformation(in))
>>>   } else {
>>> None
>>>   }
>>> }
>>>
>>> On Thu, Feb 26, 2015 at 6:39 PM, Crystal Xing 
>>> wrote:
>>> > Hi,
>>> > I have a text file input and I want to parse line by line and map each
>>> line
>>> > to another format. But at the same time, I want to filter out some
>>> lines I
>>> > do not need.
>>> >
>>> > I wonder if there is a way to filter out those lines in the map
>>> function.
>>> >
>>> > Do I have to do two steps filter and map?  In that way, I have to scan
>>> and
>>> > parse the lines twice in order to filter and map.
>>> >
>>> > If I map those unwanted line to null and filter out null, will that
>>> work?
>>> > never tried yet.
>>> >
>>> > Thanks,
>>> >
>>> > Zheng zheng
>>>
>>
>>
>


Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query

2015-02-27 Thread anu
I have three tables with the following schema:

case class *date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp,
DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR:
Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String,
MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int)



case class *interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int,
MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int,
VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int,
SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME:
java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp,
INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME:
java.sql.Timestamp)



class * sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID
:Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT
:Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT
:Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK
:Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String],
IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO
:Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String],
ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE
:Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String],
BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String],
LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String],
INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME
:Option[java.sql.Timestamp]) extends Product{

@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int) = n match
{
case 0 => WID; case 1 => BATCH_ID; case 2 => SRC_ID; case 3 =>
ORG_ID; case 4 => CLASS_WID; case 5 => DESC_TEXT; case 6 => PREMISE_WID;
case 7 => FEED_LOC; case 8 => GPS_LAT; case 9 => GPS_LONG; case 10 =>
PULSE_OUTPUT_BLOCK; case 11 => UDC_ID; case 12 => UNIVERSAL_ID; case 13 =>
IS_VIRTUAL_FLG; case 14 => SEAL_INFO; case 15 => ACCESS_INFO; case 16 =>
ALT_ACCESS_INFO; case 17 => LOC_INFO; case 18 => ALT_LOC_INFO; case 19 =>
TYPE; case 20 => SUB_TYPE; case 21 => TIMEZONE_ID; case 22 => GIS_ID; case
23 => BILLED_UPTO_TIME; case 24 => POWER_STATUS; case 25 => LOAD_STATUS;
case 26 => BILLING_HOLD_STATUS; case 27 => INSERT_TIME; case 28 =>
LAST_UPD_TIME; case _ => throw new IndexOutOfBoundsException(n.toString())
}

override def productArity: Int = 29; override def canEqual(that: Any):
Boolean = that.isInstanceOf[sdp_d]
}



Non-join queries work fine:

*val q1 = sqlContext.sql("""SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID),
COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR,
DAY_OF_YEAR""")*

res4: Array[org.apache.spark.sql.Row] =
Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1],
[2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1],
[2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1],
[2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1],
[2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1],
[2014,315,2014,2014,1], [2014,316,20141112,20141112,1],
[2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1],
[2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1],
[2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1],
[2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1],
[2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1],
[2014,327,20141123,20141123,1], [2014,328,20141...



*But the join queries throw this error:
java.lang.ArrayIndexOutOfBoundsException*

*scala> val q = sqlContext.sql("""select * from date_d dd join interval_f
intf on intf.DATE_WID = dd.WID Where intf.DATE_WID >= 20141101 AND
intf.DATE_WID <= 20141110""")*

q: org.apache.spark.sql.SchemaRDD =
SchemaRDD[38] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
Project
[WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29]
 ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight
  Exchange (HashPartitioning [WID#0], 200)
   InMemoryColumnarTableScan
[WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA...


*scala> q.take(5).foreach(println)*

15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at
basicOperators.scala:136
15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46 (mapPartitions at
Exchange.scala:48)
15/02/27 15:50:26 INFO FileInputFormat: Total input paths to process : 1
15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 42 (mapPartitions at
Exchange.scala:48)
15

Re: Augment more data to existing MatrixFactorization Model?

2015-02-27 Thread Jeffrey Jedele
Hey Anish,
machine learning models that are updated with incoming data are commonly
known as "online learning systems". Matrix factorization is one way to
implement recommender systems, but not the only one. There are papers about
how to do online matrix factorization, but you will likely have to
implement this on your own.

Have a look at:
http://en.wikipedia.org/wiki/Recommender_system
www0.cs.ucl.ac.uk/staff/l.capra/publications/seams11-vale.pdf

Regards,
Jeff

2015-02-26 19:40 GMT+01:00 anishm :

> I am a beginner to the world of Machine Learning and the usage of Apache
> Spark.
> I have followed the tutorial at
>
> https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html#augmenting-matrix-factors
> <
> https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html#augmenting-matrix-factors
> >
> , and was succesfully able to develop the application. Now, as it is
> required that today's web application need to be powered by real time
> recommendations. I would like my model to be ready for new data that keeps
> coming on the server.
> The site has quoted:
> *
> A better way to get the recommendations for you is training a matrix
> factorization model first and then augmenting the model using your
> ratings.*
>
> How do I do that? I am using Python to develop my application. Also, please
> tell me how do I persist the model to use it again, or an idea how do I
> interface this with a web service.
>
> Thanking you,
> Anish Mashankar
> A Data Science Enthusiast
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Augment-more-data-to-existing-MatrixFactorization-Model-tp21830.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query

2015-02-27 Thread anamika gupta
I have three tables with the following schema:

case class* date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp,
DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR:
Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String,
MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int)



case class* interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int,
MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int,
VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int,
SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME:
java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp,
INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME:
java.sql.Timestamp)



class *sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID
:Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT
:Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String],
GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK
:Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String],
IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO
:Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String],
ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE
:Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String],
BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String],
LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String],
INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME
:Option[java.sql.Timestamp]) extends Product{

@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int) = n match
{
case 0 => WID; case 1 => BATCH_ID; case 2 => SRC_ID; case 3 =>
ORG_ID; case 4 => CLASS_WID; case 5 => DESC_TEXT; case 6 => PREMISE_WID;
case 7 => FEED_LOC; case 8 => GPS_LAT; case 9 => GPS_LONG; case 10 =>
PULSE_OUTPUT_BLOCK; case 11 => UDC_ID; case 12 => UNIVERSAL_ID; case 13 =>
IS_VIRTUAL_FLG; case 14 => SEAL_INFO; case 15 => ACCESS_INFO; case 16 =>
ALT_ACCESS_INFO; case 17 => LOC_INFO; case 18 => ALT_LOC_INFO; case 19 =>
TYPE; case 20 => SUB_TYPE; case 21 => TIMEZONE_ID; case 22 => GIS_ID; case
23 => BILLED_UPTO_TIME; case 24 => POWER_STATUS; case 25 => LOAD_STATUS;
case 26 => BILLING_HOLD_STATUS; case 27 => INSERT_TIME; case 28 =>
LAST_UPD_TIME; case _ => throw new IndexOutOfBoundsException(n.toString())
}

override def productArity: Int = 29; override def canEqual(that: Any):
Boolean = that.isInstanceOf[sdp_d]
}



Non-join queries work fine:

*val q1 = sqlContext.sql("""SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID),
COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR,
DAY_OF_YEAR""")*

res4: Array[org.apache.spark.sql.Row] =
Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1],
[2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1],
[2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1],
[2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1],
[2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1],
[2014,315,2014,2014,1], [2014,316,20141112,20141112,1],
[2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1],
[2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1],
[2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1],
[2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1],
[2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1],
[2014,327,20141123,20141123,1], [2014,328,20141...



But the join queries throw this error:*
java.lang.ArrayIndexOutOfBoundsException*

*scala> val q = sqlContext.sql("""select * from date_d dd join interval_f
intf on intf.DATE_WID = dd.WID Where intf.DATE_WID >= 20141101 AND
intf.DATE_WID <= 20141110""")*

q: org.apache.spark.sql.SchemaRDD =
SchemaRDD[38] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
Project
[WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29]
 ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight
  Exchange (HashPartitioning [WID#0], 200)
   InMemoryColumnarTableScan
[WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA...


*scala> q.take(5).foreach(println)*

15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at
basicOperators.scala:136
15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46 (mapPartitions at
Exchange.scala:48)
15/02/27 15:50:26 INFO FileInputFormat: Total input paths to process : 1
15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 42 (mapPartitions at
Exchange.scala:48)
15/

Re: SparkStreaming failing with exception Could not compute split, block input

2015-02-27 Thread Akhil Das
You could be hitting this issue
https://issues.apache.org/jira/browse/SPARK-4516
Apart from that little more information about your job would be helpful.

Thanks
Best Regards

On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha 
wrote:

> Hi Experts,
>
> My Spark Job is failing with below error.
>
> From the logs I can see that input-3-1424842351600 was added at 5:32:32
> and was never purged out of memory. Also the available free memory for the
> executor is *2.1G*.
>
> Please help me figure out why executors cannot fetch this input.
>
> Txz for any help, Cheers.
>
>
> *Logs*
> 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
> input-3-1424842351600 in memory on
> chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB)
> .
> .
> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
> input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
> (size: 232.3 KB, free: 2.1 GB)
> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
> input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
> (size: 291.4 KB, free: 2.1 GB)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
> stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
> stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
> stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
> stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
> stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
> stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
> stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
> stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
> stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
> stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
> bytes)
> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
> 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
> Could not compute split, block input-3-1424842351600 not found
> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
> 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
> Could not compute split, block input-3-1424842355600 not found
> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>
> --
> Thanks & Regards,
>
> *Mukesh Jha *
>


Re: SparkStreaming failing with exception Could not compute split, block input

2015-02-27 Thread Jeffrey Jedele
I don't have an idea, but perhaps a little more context would be helpful.

What is the source of your streaming data? What's the storage level you're
using?
What are you doing? Some kind of windows operations?

Regards,
Jeff

2015-02-26 18:59 GMT+01:00 Mukesh Jha :

>
> On Wed, Feb 25, 2015 at 8:09 PM, Mukesh Jha 
> wrote:
>
>> My application runs fine for ~3/4 hours and then hits this issue.
>>
>> On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha 
>> wrote:
>>
>>> Hi Experts,
>>>
>>> My Spark Job is failing with below error.
>>>
>>> From the logs I can see that input-3-1424842351600 was added at 5:32:32
>>> and was never purged out of memory. Also the available free memory for the
>>> executor is *2.1G*.
>>>
>>> Please help me figure out why executors cannot fetch this input.
>>>
>>> Txz for any help, Cheers.
>>>
>>>
>>> *Logs*
>>> 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
>>> input-3-1424842351600 in memory on
>>> chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB)
>>> .
>>> .
>>> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
>>> input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
>>> (size: 232.3 KB, free: 2.1 GB)
>>> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
>>> input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
>>> (size: 291.4 KB, free: 2.1 GB)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
>>> stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
>>> stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
>>> stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
>>> stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
>>> stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
>>> stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
>>> stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
>>> stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
>>> stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
>>> stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>>> bytes)
>>> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
>>> 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
>>> Could not compute split, block input-3-1424842351600 not found
>>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>> at
>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
>>> 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
>>> Could not compute split, block input-3-1424842355600 not found
>>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>>
>>> --
>>> Thanks & Regards,
>>>
>>> *Mukesh Jha *
>>>
>>
>>
>>
>> --
>>
>>
>> Thanks & Regards,
>>
>> *Mukesh Jha *
>>
>
>
>
> --
>
>
> Thanks & Regards,
>
> *Mukesh Jha *
>


Re: One of the executor not getting StopExecutor message

2015-02-27 Thread Akhil Das
Mostly, that particular executor is stuck on GC Pause, what operation are
you performing? You can try increasing the parallelism if you see only 1
executor is doing the task.

Thanks
Best Regards

On Fri, Feb 27, 2015 at 11:39 AM, twinkle sachdeva <
twinkle.sachd...@gmail.com> wrote:

> Hi,
>
> I am running a spark application on Yarn in cluster mode.
> One of my executor appears to be in hang state, for  a long time, and gets
> finally killed by the driver.
>
> As compared to other executors, It have not received StopExecutor message
> from the driver.
>
> Here are the logs at the end of this container (C_1):
>
> 
> 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Done removing
> broadcast 36, response is 2
> 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Sent response: 2
> to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$aB]
> 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to
> TMO-GCR70/192.168.162.70:9000 from admin: closed
> 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to
> TMO-GCR70/192.168.162.70:9000 from admin: stopped, remaining connections 0
> 15/02/26 18:17:32 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for []
> with renew id 1 executed
> 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for []
> with renew id 1 expired
> 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for []
> with renew id 1 exited
> 15/02/26 20:33:13 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
> SIGNAL 15: SIGTERM
>
> NOTE that it has no logs for more than 2hrs.
>
> Here are the logs at the end of normal container ( C_2):
>
> 
> 15/02/26 20:33:09 DEBUG storage.BlockManagerSlaveActor: Sent response: 2
> to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$D+b]
> 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor]
> received message StopExecutor from Actor[akka.tcp://sparkDriver@TMO-DN73
> :37906/user/CoarseGrainedScheduler#160899257]
> 15/02/26 20:33:10 INFO executor.CoarseGrainedExecutorBackend: Driver
> commanded a shutdown
> 15/02/26 20:33:10 INFO storage.MemoryStore: MemoryStore cleared
> 15/02/26 20:33:10 INFO storage.BlockManager: BlockManager stopped
> 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor] 
> *handled
> message (181.499835 ms) StopExecutor* from
> Actor[akka.tcp://sparkDriver@TMO-DN73
> :37906/user/CoarseGrainedScheduler#160899257]
> 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Shutting down remote daemon.
> 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remote daemon shut down; proceeding with flushing remote transports.
> 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remoting shut down.
> 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache:
> org.apache.hadoop.ipc.Client@76a68bd4
> 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache:
> org.apache.hadoop.ipc.Client@76a68bd4
> 15/02/26 20:33:10 DEBUG ipc.Client: removing client from cache:
> org.apache.hadoop.ipc.Client@76a68bd4
> 15/02/26 20:33:10 DEBUG ipc.Client: stopping actual client because no more
> references remain: org.apache.hadoop.ipc.Client@76a68bd4
> 15/02/26 20:33:10 DEBUG ipc.Client: Stopping client
> 15/02/26 20:33:10 DEBUG storage.DiskBlockManager: Shutdown hook called
> 15/02/26 20:33:10 DEBUG util.Utils: Shutdown hook called
>
> At the driver side, i can see the logs related to heartbeat messages from
> C_1 till 20:05:00
>
> --
> 15/02/26 20:05:00 DEBUG spark.HeartbeatReceiver: [actor] received message
> Heartbeat(7,[Lscala.Tuple2;@151e5ce6,BlockManagerId(7, TMO-DN73, 34106))
> from Actor[akka.tcp://sparkExecutor@TMO-DN73:43671/temp/$fn]
>
> After this, it continues to receive the heartbeat from other executors
> except this one, and here follows the message responsible for it's SIGTERM:
>
>
> 
>
> 15/02/26 20:06:20 WARN storage.BlockManagerMasterActor: Removing
> BlockManager BlockManagerId(7, TMO-DN73, 34106) with no recent heart beats:
> 80515ms exceeds 45000ms
>
>
> I am using spark 1.2.1.
>
> Any pointer(s) ?
>
>
> Thanks,
>
> Twinkle
>


NoSuchElementException: None.get

2015-02-27 Thread patcharee

Hi,

I got NoSuchElementException when I tried to iterate through a Map which 
contains some elements (not null, not empty). When I debug my code 
(below). It seems the first part of the code which fills the Map is 
executed after the second part that iterates the Map. The 1st part and 
2nd part belongs to a method of a case class, it should be executed 
sequentially? Any ideas?


Best,
Patcharee

---
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at 
no.uni.computing.etl.CalculateHeightClass$$anonfun$calculateHeightForEachZ$2.apply(LoadWrfIntoHive.scala:161)
at 
no.uni.computing.etl.CalculateHeightClass$$anonfun$calculateHeightForEachZ$2.apply(LoadWrfIntoHive.scala:156)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
at 
no.uni.computing.etl.CalculateHeightClass.calculateHeightForEachZ(LoadWrfIntoHive.scala:156)
at 
no.uni.computing.etl.LoadWrfIntoHive$$anonfun$6.apply(LoadWrfIntoHive.scala:74)
at 
no.uni.computing.etl.LoadWrfIntoHive$$anonfun$6.apply(LoadWrfIntoHive.scala:74)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:56)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


---
object LoadWrfIntoHive {
def main(args: Array[String]) {
..
val listDataHeightRDD = 
groupDataHeightRDD.map(CalculateHeightClass().calculateHeightForEachZ)

..
}
}

case class CalculateHeightClass() {
def calculateHeightForEachZ(x: (String, 
Iterable[RawDataForHeight])): (String, Map[Integer,Float]) = {

var result: Map[Integer, Float] = Map()
var valMap: Map[Integer, 
scala.collection.mutable.MutableList[Double]] = Map()

val it = x._2.iterator
while (it.hasNext) {
//Adding element into valMap
}
for (currZ <- valMap.keySet) { < ERROR THROWN

}
(x._1, result)
}
}

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming, batchinterval,windowinterval and window sliding interval difference

2015-02-27 Thread Jeffrey Jedele
If you read the streaming programming guide, you'll notice that Spark does
not do "real" streaming but "emulates" it with a so-called mini-batching
approach. Let's say you want to work with a continuous stream of incoming
events from a computing centre:

Batch interval:
That's the basic "heartbeat" of your streaming application. If you set this
to 1 second, Spark will create a RDD every second containing the events of
that second. That's your "mini-batch" of data.

Windowing:
That's a way to do aggregations on your streaming data. Let's say you want
to have a summary of how many warnings your system produced in the last
hour. Then you would use a windowed reduce with a window size of 1h.

Sliding:
This tells Spark how often to perform your windowed operation. If you would
set this to 1h as well, you would aggregate your data stream to consecutive
1h windows of data - no overlap. You could also tell spark to create your
1h aggregation 2 times a day only by setting the sliding interval to 12h.
Or you could tell Spark to create a 1h aggregation every 30 min. Then each
data window overlaps with the previous window of course.

I recommend to carefully read the programming guide- it explains these
concepts pretty well.
https://spark.apache.org/docs/latest/streaming-programming-guide.html

Regards,
Jeff

2015-02-26 18:51 GMT+01:00 Hafiz Mujadid :

> Can somebody explain the difference between
> batchinterval,windowinterval and window sliding interval with example.
> If there is any real time use case of using these parameters?
>
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-batchinterval-windowinterval-and-window-sliding-interval-difference-tp21829.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Global sequential access of elements in RDD

2015-02-27 Thread Wush Wu
Thanks for your reply.

But your code snippet uses the `collect` which is not feasible for me.
My algorithm involves a large amount of data and I do not want to transmit
them.

Wush

2015-02-27 16:27 GMT+08:00 Yanbo Liang :

> Actually, sortBy will return an ordered RDD.
> Your output is unordered integers may be due to foreach.
>
> You can reference the following code snippet, it will return ordered
> integers [1,1,1,2,2,3,4,5,7,8,9]
>
> val rdd = sc.parallelize(Array(1, 3, 2, 7, 1, 4, 2, 5, 1, 8, 9), 2).sortBy(x 
> => x, true)
> println(rdd.collect().mkString(","))
>
>
>
> 2015-02-27 15:38 GMT+08:00 Wush Wu :
>
>> Dear all,
>>
>> I want to implement some sequential algorithm on RDD.
>>
>> For example:
>>
>> val conf = new SparkConf()
>>   conf.setMaster("local[2]").
>>   setAppName("SequentialSuite")
>> val sc = new SparkContext(conf)
>> val rdd = sc.
>>parallelize(Array(1, 3, 2, 7, 1, 4, 2, 5, 1, 8, 9), 2).
>>sortBy(x => x, true)
>> rdd.foreach(println)
>>
>> I want to see the ordered number on my screen, but it shows unordered
>> integers. The two partitions execute the println simultaneously.
>>
>> How do I make the RDD execute a function globally sequential?
>>
>> Best,
>> Wush
>>
>
>


Re: How to get yarn logs to display in the spark or yarn history-server?

2015-02-27 Thread Christophe Préaud
Yes, spark.yarn.historyServer.address is used to access the spark history 
server from yarn, it is not needed if you use only the yarn history server.
It may be possible to have both history servers running, but I have not tried 
that yet.

Besides, as far as I have understood, yarn and spark history servers have two 
different purposes:
- yarn history server is for looking at your application logs after it has 
finished
- spark history server is for looking at your application in the spark web ui 
(the one with the "Stages", "Storage", "Environment" and "Executors") after it 
has finished

Regards,
Christophe.

On 26/02/2015 20:30, Colin Kincaid Williams wrote:
 Right now I have set spark.yarn.historyServer.address in my spark configs to 
have yarn point to the spark-history server. Then from your mail it sounds like 
I should try another setting, or remove it completely. I also noticed that the 
aggregated log files appear in a directory in hdfs under application/spark vs. 
application/yarn or similar. I will review my configurations and see if I can 
get this working.

Thanks,

Colin Williams


On Thu, Feb 26, 2015 at 9:11 AM, Christophe Préaud 
mailto:christophe.pre...@kelkoo.com>> wrote:
You can see this information in the yarn web UI using the configuration I 
provided in my former mail (click on the application id, then on logs; you will 
then be automatically redirected to the yarn history server UI).


On 24/02/2015 19:49, Colin Kincaid Williams wrote:
So back to my original question.

I can see the spark logs using the example above:

yarn logs -applicationId application_1424740955620_0009

This shows yarn log aggregation working. I can see the std out and std error in 
that container information above. Then how can I get this information in a 
web-ui ? Is this not currently supported?

On Tue, Feb 24, 2015 at 10:44 AM, Imran Rashid 
mailto:iras...@cloudera.com>> wrote:
the spark history server and the yarn history server are totally independent.  
Spark knows nothing about yarn logs, and vice versa, so unfortunately there 
isn't any way to get all the info in one place.

On Tue, Feb 24, 2015 at 12:36 PM, Colin Kincaid Williams 
mailto:disc...@uw.edu>> wrote:
Looks like in my tired state, I didn't mention spark the whole time. However, 
it might be implied by the application log above. Spark log aggregation appears 
to be working, since I can run the yarn command above. I do have yarn logging 
setup for the yarn history server. I was trying to use the spark 
history-server, but maybe I should try setting

spark.yarn.historyServer.address

to the yarn history-server, instead of the spark history-server? I tried this 
configuration when I started, but didn't have much luck.

Are you getting your spark apps run in yarn client or cluster mode in your yarn 
history server? If so can you share any spark settings?

On Tue, Feb 24, 2015 at 8:48 AM, Christophe Préaud 
mailto:christophe.pre...@kelkoo.com>> wrote:
Hi Colin,

Here is how I have configured my hadoop cluster to have yarn logs available 
through both the yarn CLI and the _yarn_ history server (with gzip compression 
and 10 days retention):

1. Add the following properties in the yarn-site.xml on each node managers and 
on the resource manager:
  
yarn.log-aggregation-enable
true
  
  
yarn.log-aggregation.retain-seconds
864000
  
  
yarn.log.server.url

http://dc1-kdp-dev-hadoop-03.dev.dc1.kelkoo.net:19888/jobhistory/logs
  
  
yarn.nodemanager.log-aggregation.compression-type
gz
  

2. Restart yarn and then start the yarn history server on the server defined in 
the yarn.log.server.url property above:

/opt/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver # should fail if 
historyserver is not yet started
/opt/hadoop/sbin/stop-yarn.sh
/opt/hadoop/sbin/start-yarn.sh
/opt/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver


It may be slightly different for you if the resource manager and the history 
server are not on the same machine.

Hope it will work for you as well!
Christophe.

On 24/02/2015 06:31, Colin Kincaid Williams wrote:
> Hi,
>
> I have been trying to get my yarn logs to display in the spark history-server 
> or yarn history-server. I can see the log information
>
>
> yarn logs -applicationId application_1424740955620_0009
> 15/02/23 22:15:14 INFO client.ConfiguredRMFailoverProxyProvider: Failing over 
> to us3sm2hbqa04r07-comp-prod-local
>
>
> Container: container_1424740955620_0009_01_02 on 
> us3sm2hbqa07r07.comp.prod.local_8041
> ===
> LogType: stderr
> LogLength: 0
> Log Contents:
>
> LogType: stdout
> LogLength: 897
> Log Contents:
> [GC [PSYoungGen: 262656K->23808K(306176K)] 262656K->23880K(1005568K), 
> 0.0283450 secs] [Times: user=0.14 sys=0.03, real=0.03 secs]
> Heap
>  PSYoungGen  total 306176K, used 111279K [0xeaa8, 
> 0x0001, 0x0001)
>   eden spac

spark.default.parallelism

2015-02-27 Thread Deep Pradhan
Hi,
I have a four single core machines as slaves in my cluster. I set the
spark.default.parallelism to 4 and ran SparkTC given in examples. It took
around 26 sec.
Now, I increased the spark.default.parallelism to 8, but my performance
deteriorates. The same application takes 32 sec now.
I have read that usually the best performance is obtained when the
parallelism is set to 2X of the number of cores available. I do not quite
understand this. Could anyone please tell me?


Thank You
Regards,
Deep


Re: job keeps failing with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1

2015-02-27 Thread Arush Kharbanda
Can you share what error you are getting when the job fails.

On Thu, Feb 26, 2015 at 4:32 AM, Darin McBeath 
wrote:

> I'm using Spark 1.2, stand-alone cluster on ec2 I have a cluster of 8
> r3.8xlarge machines but limit the job to only 128 cores.  I have also tried
> other things such as setting 4 workers per r3.8xlarge and 67gb each but
> this made no difference.
>
> The job frequently fails at the end in this step (saveasHadoopFile).   It
> will sometimes work.
>
> finalNewBaselinePairRDD is hashPartitioned with 1024 partitions and a
> total size around 1TB.  There are about 13.5M records in
> finalNewBaselinePairRDD.  finalNewBaselinePairRDD is 
>
>
> JavaPairRDD finalBaselineRDDWritable =
> finalNewBaselinePairRDD.mapToPair(new
> ConvertToWritableTypes()).persist(StorageLevel.MEMORY_AND_DISK_SER());
>
> // Save to hdfs (gzip)
> finalBaselineRDDWritable.saveAsHadoopFile("hdfs:///sparksync/",
> Text.class, Text.class,
> SequenceFileOutputFormat.class,org.apache.hadoop.io.compress.GzipCodec.class);
>
>
> If anyone has any tips for what I should look into it would be appreciated.
>
> Thanks.
>
> Darin.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: How to pass a org.apache.spark.rdd.RDD in a recursive function

2015-02-27 Thread Arush Kharbanda
Passing RDD's around is not a good idea. RDD's are immutable and cant be
changed inside functions. Have you considered taking a different approach?

On Thu, Feb 26, 2015 at 3:42 AM, dritanbleco  wrote:

> Hello
>
> i am trying to pass as a parameter a org.apache.spark.rdd.RDD table to a
> recursive function. This table should be changed in any step of the
> recursion and could not be just a global var
>
> need help :)
>
> Thank you
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-a-org-apache-spark-rdd-RDD-in-a-recursive-function-tp21805.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Global sequential access of elements in RDD

2015-02-27 Thread Yanbo Liang
Actually, sortBy will return an ordered RDD.
Your output is unordered integers may be due to foreach.

You can reference the following code snippet, it will return ordered
integers [1,1,1,2,2,3,4,5,7,8,9]

val rdd = sc.parallelize(Array(1, 3, 2, 7, 1, 4, 2, 5, 1, 8, 9),
2).sortBy(x => x, true)
println(rdd.collect().mkString(","))



2015-02-27 15:38 GMT+08:00 Wush Wu :

> Dear all,
>
> I want to implement some sequential algorithm on RDD.
>
> For example:
>
> val conf = new SparkConf()
>   conf.setMaster("local[2]").
>   setAppName("SequentialSuite")
> val sc = new SparkContext(conf)
> val rdd = sc.
>parallelize(Array(1, 3, 2, 7, 1, 4, 2, 5, 1, 8, 9), 2).
>sortBy(x => x, true)
> rdd.foreach(println)
>
> I want to see the ordered number on my screen, but it shows unordered
> integers. The two partitions execute the println simultaneously.
>
> How do I make the RDD execute a function globally sequential?
>
> Best,
> Wush
>


Re: Is SPARK_CLASSPATH really deprecated?

2015-02-27 Thread Patrick Wendell
I think we need to just update the docs, it is a bit unclear right
now. At the time, we made it worded fairly sternly because we really
wanted people to use --jars when we deprecated SPARK_CLASSPATH. But
there are other types of deployments where there is a legitimate need
to augment the classpath of every executor.

I think it should probably say something more like

"Extra classpath entries to append to the classpath of executors. This
is sometimes used in deployment environments where dependencies of
Spark are present in a specific place on all nodes".

Kannan - if you want to submit I patch I can help review it.

On Thu, Feb 26, 2015 at 8:24 PM, Kannan Rajah  wrote:
> Thanks Marcelo. Do you think it would be useful to make
> spark.executor.extraClassPath be made to pick up some environment variable
> that can be set from spark-env.sh? Here is a example.
>
> spark-env.sh
> --
> executor_extra_cp = get_hbase_jars_for_cp
> export executor_extra_cp
>
> spark-defaults.conf
> -
> spark.executor.extraClassPath = ${executor_extra_cp}
>
> This will let us add logic inside get_hbase_jars_for_cp function to pick the
> right version hbase jars. There could be multiple versions installed on the
> node.
>
>
>
> --
> Kannan
>
> On Thu, Feb 26, 2015 at 6:08 PM, Marcelo Vanzin  wrote:
>>
>> On Thu, Feb 26, 2015 at 5:12 PM, Kannan Rajah  wrote:
>> > Also, I would like to know if there is a localization overhead when we
>> > use
>> > spark.executor.extraClassPath. Again, in the case of hbase, these jars
>> > would
>> > be typically available on all nodes. So there is no need to localize
>> > them
>> > from the node where job was submitted. I am wondering if we use the
>> > SPARK_CLASSPATH approach, then it would not do localization. That would
>> > be
>> > an added benefit.
>> > Please clarify.
>>
>> spark.executor.extraClassPath doesn't localize anything. It just
>> prepends those classpath entries to the usual classpath used to launch
>> the executor. There's no copying of files or anything, so they're
>> expected to exist on the nodes.
>>
>> It's basically exactly the same as SPARK_CLASSPATH, but broken down to
>> two options (one for the executors, and one for the driver).
>>
>> --
>> Marcelo
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org