Re: graph.reverse & Pregel API

2014-04-16 Thread Bogdan Ghidireac
yes, the patch works fine. thank you!


On Thu, Apr 17, 2014 at 12:08 AM, Ankur Dave  wrote:

> Hi Bogdan,
>
> This is a bug -- thanks for reporting it! I just fixed it in
> https://github.com/apache/spark/pull/431. Does it help if you apply that
> patch?
>
> Ankur 
>
>
> On Wed, Apr 16, 2014 at 7:51 AM, Bogdan Ghidireac wrote:
>
>> I am using Pregel API with Spark (1.0 branch compiled on Apr 16th) and I
>> run into some problems when my graph has the edges reversed.
>> If the edges of my graph are reversed, the sendMsg function does no
>> longer receives the attribute for the source vertex (it is null). This does
>> not happen with the original graph.
>>
>


Re: sbt assembly error

2014-04-16 Thread Azuryy Yu
It is only network issue.  you have some network limited access in China.


On Thu, Apr 17, 2014 at 2:27 PM, Sean Owen  wrote:

> The error is "Connection timed out", not 404. The build references
> many repos, and only one will contain any given artifact. You are
> seeing it fail through trying many different repos, many of which
> don't even have the artifact either, but that's not the underlying
> cause.
>
> FWIW I can build the assembly just fine and I suspect there is no
> noise on this since others can too.
>
> wget does not have the same proxy settings that SBT / Maven would use,
> and the issue may be HTTPS connections.
>
> That said it could be a transient network problem. I was going to
> suggest that maybe something in the SBT build changed recently -- it's
> a little weird that it's trying snapshot repos? -- but you are looking
> at a very recent 0.9.1.
>
> Try HEAD to confirm?
>
>
> On Wed, Apr 16, 2014 at 11:36 PM, Yiou Li  wrote:
> > Hi Sean,
> >
> > It's true that the sbt is trying different links but ALL of them have
> > connections issue (which is actually 404 File not found error) and the
> build
> > process takes forever connecting different links..
> >
> > I don't think it's a proxy issue because my other projects (other than
> > spark) builds well and I can wget things like wget www.yahoo.com.
> >
> > I am surprised that there is not much noise raised on this issue --- at
> > least on the most recent spark release.
> >
> > Best,
> > Leo
> >
>


Re: sbt assembly error

2014-04-16 Thread Sean Owen
The error is "Connection timed out", not 404. The build references
many repos, and only one will contain any given artifact. You are
seeing it fail through trying many different repos, many of which
don't even have the artifact either, but that's not the underlying
cause.

FWIW I can build the assembly just fine and I suspect there is no
noise on this since others can too.

wget does not have the same proxy settings that SBT / Maven would use,
and the issue may be HTTPS connections.

That said it could be a transient network problem. I was going to
suggest that maybe something in the SBT build changed recently -- it's
a little weird that it's trying snapshot repos? -- but you are looking
at a very recent 0.9.1.

Try HEAD to confirm?


On Wed, Apr 16, 2014 at 11:36 PM, Yiou Li  wrote:
> Hi Sean,
>
> It's true that the sbt is trying different links but ALL of them have
> connections issue (which is actually 404 File not found error) and the build
> process takes forever connecting different links..
>
> I don't think it's a proxy issue because my other projects (other than
> spark) builds well and I can wget things like wget www.yahoo.com.
>
> I am surprised that there is not much noise raised on this issue --- at
> least on the most recent spark release.
>
> Best,
> Leo
>


Re: PySpark still reading only text?

2014-04-16 Thread Matei Zaharia
Yes, this JIRA would enable that. The Hive support also handles HDFS.

Matei

On Apr 16, 2014, at 9:55 PM, Jesvin Jose  wrote:

> When this is implemented, can you load/save an RDD of pickled objects to HDFS?
> 
> 
> On Thu, Apr 17, 2014 at 1:51 AM, Matei Zaharia  
> wrote:
> Hi Bertrand,
> 
> We should probably add a SparkContext.pickleFile and RDD.saveAsPickleFile 
> that will allow saving pickled objects. Unfortunately this is not in yet, but 
> there is an issue up to track it: 
> https://issues.apache.org/jira/browse/SPARK-1161.
> 
> In 1.0, one feature we do have now is the ability to load binary data from 
> Hive using Spark SQL’s Python API. Later we will also be able to save to Hive.
> 
> Matei
> 
> On Apr 16, 2014, at 4:27 AM, Bertrand Dechoux  wrote:
> 
> > Hi,
> >
> > I have browsed the online documentation and it is stated that PySpark only 
> > read text files as sources. Is it still the case?
> >
> > From what I understand, the RDD can after this first step be any serialized 
> > python structure if the class definitions are well distributed.
> >
> > Is it not possible to read back those RDDs? That is create a flow to parse 
> > everything and then, e.g. the next week, start from the binary, structured 
> > data?
> >
> > Technically, what is the difficulty? I would assume the code reading a 
> > binary python RDD or a binary python file to be quite similar. Where can I 
> > know more about this subject?
> >
> > Thanks in advance
> >
> > Bertrand
> 
> 
> 
> 
> -- 
> We dont beat the reaper by living longer. We beat the reaper by living well 
> and living fully. The reaper will come for all of us. Question is, what do we 
> do between the time we are born and the time he shows up? -Randy Pausch
> 



Re: PySpark still reading only text?

2014-04-16 Thread Jesvin Jose
When this is implemented, can you load/save an RDD of pickled objects to
HDFS?


On Thu, Apr 17, 2014 at 1:51 AM, Matei Zaharia wrote:

> Hi Bertrand,
>
> We should probably add a SparkContext.pickleFile and RDD.saveAsPickleFile
> that will allow saving pickled objects. Unfortunately this is not in yet,
> but there is an issue up to track it:
> https://issues.apache.org/jira/browse/SPARK-1161.
>
> In 1.0, one feature we do have now is the ability to load binary data from
> Hive using Spark SQL’s Python API. Later we will also be able to save to
> Hive.
>
> Matei
>
> On Apr 16, 2014, at 4:27 AM, Bertrand Dechoux  wrote:
>
> > Hi,
> >
> > I have browsed the online documentation and it is stated that PySpark
> only read text files as sources. Is it still the case?
> >
> > From what I understand, the RDD can after this first step be any
> serialized python structure if the class definitions are well distributed.
> >
> > Is it not possible to read back those RDDs? That is create a flow to
> parse everything and then, e.g. the next week, start from the binary,
> structured data?
> >
> > Technically, what is the difficulty? I would assume the code reading a
> binary python RDD or a binary python file to be quite similar. Where can I
> know more about this subject?
> >
> > Thanks in advance
> >
> > Bertrand
>
>


-- 
We dont beat the reaper by living longer. We beat the reaper by living well
and living fully. The reaper will come for all of us. Question is, what do
we do between the time we are born and the time he shows up? -Randy Pausch


Re: choose the number of partition according to the number of nodes

2014-04-16 Thread Joe L
Thank you Nicholas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/choose-the-number-of-partition-according-to-the-number-of-nodes-tp4362p4364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: choose the number of partition according to the number of nodes

2014-04-16 Thread Nicholas Chammas
>From the Spark tuning guide
:

In general, we recommend 2-3 tasks per CPU core in your cluster.


I think you can only get one task per partition to run concurrently for a
given RDD. So if your RDD has 10 partitions, then 10 tasks at most can
operate on it concurrently (given you have 10 cores available). At the same
time, you want each task to run quickly on a small slice of data. The
smaller the slice of data, the more likely the task working on it will
complete successfully. So it's good to have more, smaller partitions.

For example, if you have 10 cores and an RDD with 20 partitions, you should
see 2 waves of 10 tasks operate on the RDD concurrently.

Hence, I interpret that line from the tuning guide to mean it's best to
have your RDD partitioned into (numCores * 2) or (numCores * 3) partitions.

Nick


On Wed, Apr 16, 2014 at 7:50 PM, Joe L  wrote:

> Is it true that it is better to choose the number of partition according to
> the number of nodes in the cluster?
>
> partitionBy(numNodes)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/choose-the-number-of-partition-according-to-the-number-of-nodes-tp4362.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


choose the number of partition according to the number of nodes

2014-04-16 Thread Joe L
Is it true that it is better to choose the number of partition according to
the number of nodes in the cluster?

partitionBy(numNodes)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/choose-the-number-of-partition-according-to-the-number-of-nodes-tp4362.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: sbt assembly error

2014-04-16 Thread Yiou Li
Hi Sean,

It's true that the sbt is trying different links but ALL of them have
connections issue (which is actually 404 File not found error) and the
build process takes forever connecting different links..

I don't think it's a proxy issue because my other projects (other than
spark) builds well and I can wget things like wget www.yahoo.com.

I am surprised that there is not much noise raised on this issue --- at
least on the most recent spark release.

Best,
Leo



On Wed, Apr 16, 2014 at 12:46 PM, Sean Owen  wrote:

> This is just a red herring. You are seeing the build fail to contact
> many repos it knows about, including ones that do not have a given
> artifact.
>
> This is almost always a symptom of network connectivity problem, like
> perhaps a proxy in between, esp. one that breaks HTTPS connections.
> You may need to fix your proxy settings for Maven locally.
> --
> Sean Owen | Director, Data Science | London
>
>
> On Wed, Apr 16, 2014 at 8:32 PM, Arpit Tak 
> wrote:
> > Its because , there is no sl4f directory exists there may be they
> > updating it .
> > https://oss.sonatype.org/content/repositories/snapshots/org/
> >
> > Hard luck  try after some time...
> >
> > Regards,
> > Arpit
> >
> >
> > On Thu, Apr 17, 2014 at 12:33 AM, Yiou Li  wrote:
> >>
> >> Hi all,
> >>
> >> I am trying to build spark assembly using sbt and got connection error
> >> when resolving dependencies:
> >>
> >> I tried web browser and wget some of the dependency links in the error
> and
> >> also got 404 error too.
> >>
> >> This happened to the following branches:
> >> spark-0.8.1-incubating
> >> spark-0.9.1
> >> spark-0.9.1-bin-hadoop2
> >>
> >> Can somebody please kindly advise?
> >>
> >> Best,
> >> Leo
> >>
> >>
> >> Launching sbt from sbt/sbt-launch-0.12.4.jar
> >> [info] Loading project definition from
> >>
> /home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/project/project
> >> [info] Loading project definition from
> >> /home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/project
> >> [info] Set current project to root (in build
> >> file:/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/)
> >> [info] Updating
> >>
> {file:/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/}core...
> >> [info] Resolving org.slf4j#slf4j-log4j12;1.7.2 ...
> >> [error] Server access Error: Connection timed out
> >> url=
> https://oss.sonatype.org/content/repositories/snapshots/org/slf4j/slf4j-log4j12/1.7.2/slf4j-log4j12-1.7.2.pom
> >> [error] Server access Error: Connection timed out
> >> url=
> https://oss.sonatype.org/service/local/staging/deploy/maven2/org/slf4j/slf4j-log4j12/1.7.2/slf4j-log4j12-1.7.2.pom
> >> [info] Resolving commons-daemon#commons-daemon;1.0.10 ...
> >> [error] Server access Error: Connection timed out
> >> url=
> https://oss.sonatype.org/content/repositories/snapshots/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
> >> [error] Server access Error: Connection timed out
> >> url=
> https://oss.sonatype.org/service/local/staging/deploy/maven2/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
> >> [error] Server access Error: Connection timed out
> >> url=
> https://repository.cloudera.com/artifactory/cloudera-repos/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
> >> [info] Resolving org.apache.commons#commons-parent;23 ...
> >> [error] Server access Error: Connection timed out
> >> url=
> https://oss.sonatype.org/content/repositories/snapshots/org/apache/commons/commons-parent/23/commons-parent-23.pom
> >>
> >> (truncated)
> >
> >
>


Re: GC overhead limit exceeded

2014-04-16 Thread Nicholas Chammas
But wait, does Spark know to unpersist() RDDs that are not referenced
anywhere? That would’ve taken care of the RDDs that I kept creating and
then orphaning as part of my job testing/profiling.

Is that what SPARK-1103
is about, btw?

(Sorry to keep digging up this thread.)


On Wed, Apr 16, 2014 at 5:55 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Never mind. I'll take it from both Andrew and Syed's comments that the
> answer is yes. Dunno why I thought otherwise.
>
>
> On Wed, Apr 16, 2014 at 5:43 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> I’m running into a similar issue as the OP. I’m running the same job over
>> and over (with minor tweaks) in the same cluster to profile it. It just
>> recently started throwing java.lang.OutOfMemoryError: Java heap space.
>>
>>> Are you caching a lot of RDD's? If so, maybe you should unpersist() the
>>> ones that you're not using.
>>
>>  I thought that Spark automatically ejects RDDs from the cache using
>> LRU.
>>
>> Do I need to explicitly unpersist() RDDs that are cached with the
>> default storage level?
>>
>> Nick
>>
>>
>> On Thu, Mar 27, 2014 at 1:46 PM, Andrew Or  wrote:
>>
>> Are you caching a lot of RDD's? If so, maybe you should unpersist() the
>>> ones that you're not using. Also, if you're on 0.9, make sure
>>> spark.shuffle.spill is enabled (which it is by default). This allows your
>>> application to spill in-memory content to disk if necessary.
>>>
>>> How much memory are you giving to your executors? The default,
>>> spark.executor.memory is 512m, which is quite low. Consider raising this.
>>> Checking the web UI is a good way to figure out your runtime memory usage.
>>>
>>>
>>> On Thu, Mar 27, 2014 at 9:22 AM, Ognen Duzlevski <
>>> og...@plainvanillagames.com> wrote:
>>>
  Look at the tuning guide on Spark's webpage for strategies to cope
 with this.
 I have run into quite a few memory issues like these, some are resolved
 by changing the StorageLevel strategy and employing things like Kryo, some
 are solved by specifying the number of tasks to break down a given
 operation into etc.

 Ognen


 On 3/27/14, 10:21 AM, Sai Prasanna wrote:

 "java.lang.OutOfMemoryError: GC overhead limit exceeded"

  What is the problem. The same code, i run, one instance it runs in 8
 second, next time it takes really long time, say 300-500 seconds...
 I see the logs a lot of GC overhead limit exceeded is seen. What should
 be done ??

  Please can someone throw some light on it ??



  --
  *Sai Prasanna. AN*
 *II M.Tech (CS), SSSIHL*


 * Entire water in the ocean can never sink a ship, Unless it gets
 inside. All the pressures of life can never hurt you, Unless you let them
 in.*



>>>
>


Re: GC overhead limit exceeded

2014-04-16 Thread Nicholas Chammas
Never mind. I'll take it from both Andrew and Syed's comments that the
answer is yes. Dunno why I thought otherwise.


On Wed, Apr 16, 2014 at 5:43 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> I’m running into a similar issue as the OP. I’m running the same job over
> and over (with minor tweaks) in the same cluster to profile it. It just
> recently started throwing java.lang.OutOfMemoryError: Java heap space.
>
>> Are you caching a lot of RDD's? If so, maybe you should unpersist() the
>> ones that you're not using.
>
>  I thought that Spark automatically ejects RDDs from the cache using LRU.
>
> Do I need to explicitly unpersist() RDDs that are cached with the default
> storage level?
>
> Nick
>
>
> On Thu, Mar 27, 2014 at 1:46 PM, Andrew Or  wrote:
>
> Are you caching a lot of RDD's? If so, maybe you should unpersist() the
>> ones that you're not using. Also, if you're on 0.9, make sure
>> spark.shuffle.spill is enabled (which it is by default). This allows your
>> application to spill in-memory content to disk if necessary.
>>
>> How much memory are you giving to your executors? The default,
>> spark.executor.memory is 512m, which is quite low. Consider raising this.
>> Checking the web UI is a good way to figure out your runtime memory usage.
>>
>>
>> On Thu, Mar 27, 2014 at 9:22 AM, Ognen Duzlevski <
>> og...@plainvanillagames.com> wrote:
>>
>>>  Look at the tuning guide on Spark's webpage for strategies to cope with
>>> this.
>>> I have run into quite a few memory issues like these, some are resolved
>>> by changing the StorageLevel strategy and employing things like Kryo, some
>>> are solved by specifying the number of tasks to break down a given
>>> operation into etc.
>>>
>>> Ognen
>>>
>>>
>>> On 3/27/14, 10:21 AM, Sai Prasanna wrote:
>>>
>>> "java.lang.OutOfMemoryError: GC overhead limit exceeded"
>>>
>>>  What is the problem. The same code, i run, one instance it runs in 8
>>> second, next time it takes really long time, say 300-500 seconds...
>>> I see the logs a lot of GC overhead limit exceeded is seen. What should
>>> be done ??
>>>
>>>  Please can someone throw some light on it ??
>>>
>>>
>>>
>>>  --
>>>  *Sai Prasanna. AN*
>>> *II M.Tech (CS), SSSIHL*
>>>
>>>
>>> * Entire water in the ocean can never sink a ship, Unless it gets
>>> inside. All the pressures of life can never hurt you, Unless you let them
>>> in.*
>>>
>>>
>>>
>>


Re: GC overhead limit exceeded

2014-04-16 Thread Nicholas Chammas
I’m running into a similar issue as the OP. I’m running the same job over
and over (with minor tweaks) in the same cluster to profile it. It just
recently started throwing java.lang.OutOfMemoryError: Java heap space.

> Are you caching a lot of RDD's? If so, maybe you should unpersist() the
> ones that you're not using.

 I thought that Spark automatically ejects RDDs from the cache using LRU.

Do I need to explicitly unpersist() RDDs that are cached with the default
storage level?

Nick


On Thu, Mar 27, 2014 at 1:46 PM, Andrew Or  wrote:

Are you caching a lot of RDD's? If so, maybe you should unpersist() the
> ones that you're not using. Also, if you're on 0.9, make sure
> spark.shuffle.spill is enabled (which it is by default). This allows your
> application to spill in-memory content to disk if necessary.
>
> How much memory are you giving to your executors? The default,
> spark.executor.memory is 512m, which is quite low. Consider raising this.
> Checking the web UI is a good way to figure out your runtime memory usage.
>
>
> On Thu, Mar 27, 2014 at 9:22 AM, Ognen Duzlevski <
> og...@plainvanillagames.com> wrote:
>
>>  Look at the tuning guide on Spark's webpage for strategies to cope with
>> this.
>> I have run into quite a few memory issues like these, some are resolved
>> by changing the StorageLevel strategy and employing things like Kryo, some
>> are solved by specifying the number of tasks to break down a given
>> operation into etc.
>>
>> Ognen
>>
>>
>> On 3/27/14, 10:21 AM, Sai Prasanna wrote:
>>
>> "java.lang.OutOfMemoryError: GC overhead limit exceeded"
>>
>>  What is the problem. The same code, i run, one instance it runs in 8
>> second, next time it takes really long time, say 300-500 seconds...
>> I see the logs a lot of GC overhead limit exceeded is seen. What should
>> be done ??
>>
>>  Please can someone throw some light on it ??
>>
>>
>>
>>  --
>>  *Sai Prasanna. AN*
>> *II M.Tech (CS), SSSIHL*
>>
>>
>> * Entire water in the ocean can never sink a ship, Unless it gets inside.
>> All the pressures of life can never hurt you, Unless you let them in.*
>>
>>
>>
>


Regarding Partitioner

2014-04-16 Thread yh18190
Hi,,

I have  large dataset of elemenst[RDD] and i want to divide it into two
exactly equal sized partitions maintaining order of elements.I tried using
RangePartitioner like  var data= partitionedFile.partitionBy(new
RangePartitioner(2, partitionedFile)).
This doesnt give satisfactory results becoz it divides roughly but not
exactly equal sized maintaining order of elements..
for example
if their are 64 elements ..we use
Rangepartitioner.>>>then it divides in to 31 elements and 33 elements..

I need partitioner such that i get exactly frirst 32 elements in one half
and other half contains second set of 32 elements..
Guys could anyone hlep me by suggestiing how to use customised partitioner
such that I get equally sized two halves...maintaing the order of elements..

Please help me...




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Regarding-Partitioner-tp4356.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: graph.reverse & Pregel API

2014-04-16 Thread Ankur Dave
Hi Bogdan,

This is a bug -- thanks for reporting it! I just fixed it in
https://github.com/apache/spark/pull/431. Does it help if you apply that
patch?

Ankur 


On Wed, Apr 16, 2014 at 7:51 AM, Bogdan Ghidireac wrote:

> I am using Pregel API with Spark (1.0 branch compiled on Apr 16th) and I
> run into some problems when my graph has the edges reversed.
> If the edges of my graph are reversed, the sendMsg function does no longer
> receives the attribute for the source vertex (it is null). This does not
> happen with the original graph.
>


Re: PySpark still reading only text?

2014-04-16 Thread Matei Zaharia
Hi Bertrand,

We should probably add a SparkContext.pickleFile and RDD.saveAsPickleFile that 
will allow saving pickled objects. Unfortunately this is not in yet, but there 
is an issue up to track it: https://issues.apache.org/jira/browse/SPARK-1161.

In 1.0, one feature we do have now is the ability to load binary data from Hive 
using Spark SQL’s Python API. Later we will also be able to save to Hive.

Matei

On Apr 16, 2014, at 4:27 AM, Bertrand Dechoux  wrote:

> Hi,
> 
> I have browsed the online documentation and it is stated that PySpark only 
> read text files as sources. Is it still the case?
> 
> From what I understand, the RDD can after this first step be any serialized 
> python structure if the class definitions are well distributed.
> 
> Is it not possible to read back those RDDs? That is create a flow to parse 
> everything and then, e.g. the next week, start from the binary, structured 
> data?
> 
> Technically, what is the difficulty? I would assume the code reading a binary 
> python RDD or a binary python file to be quite similar. Where can I know more 
> about this subject?
> 
> Thanks in advance
> 
> Bertrand



Re: How to cogroup/join pair RDDs with different key types?

2014-04-16 Thread Roger Hoover
Thanks for following up.  I hope to get some free time this afternoon to
get it working.  Will let you know.


On Wed, Apr 16, 2014 at 12:43 PM, Andrew Ash  wrote:

> Glad to hear you're making progress!  Do you have a working version of the
> join?  Is there anything else you need help with?
>
>
> On Wed, Apr 16, 2014 at 7:11 PM, Roger Hoover wrote:
>
>> Ah, in case this helps others, looks like RDD.zipPartitions will
>> accomplish step 4.
>>
>>
>> On Tue, Apr 15, 2014 at 10:44 AM, Roger Hoover wrote:
>>
>>> Andrew,
>>>
>>> Thank you very much for your feedback.  Unfortunately, the ranges are
>>> not of predictable size but you gave me an idea of how to handle it.
>>>  Here's what I'm thinking:
>>>
>>> 1. Choose number of partitions, n, over IP space
>>> 2. Preprocess the IPRanges, splitting any of them that cross partition
>>> boundaries
>>> 3. Partition ipToUrl and the new ipRangeToZip according to the
>>> partitioning scheme from step 1
>>> 4. Join matching partitions of these two RDDs
>>>
>>> I still don't know how to do step 4 though.  I see that RDDs have a
>>> mapPartitions() operation to let you do whatever you want with a partition.
>>>  What I need is a way to get my hands on two partitions at once, each from
>>> different RDDs.
>>>
>>> Any ideas?
>>>
>>> Thanks,
>>>
>>> Roger
>>>
>>>
>>> On Mon, Apr 14, 2014 at 5:45 PM, Andrew Ash wrote:
>>>
 Are your IPRanges all on nice, even CIDR-format ranges?  E.g.
 192.168.0.0/16 or 10.0.0.0/8?

 If the range is always an even subnet mask and not split across
 subnets, I'd recommend flatMapping the ipToUrl RDD to (IPRange, String) and
 then joining the two RDDs.  The expansion would be at most 32x if all your
 ranges can be expressed in CIDR notation, and in practice would be much
 smaller than that (typically you don't need things bigger than a /8 and
 often not smaller than a /24)

 Hopefully you can use your knowledge of the ip ranges to make this
 feasible.

 Otherwise, you could additionally flatmap the ipRangeToZip out to a
 list of CIDR notations and do the join then, but you're starting to have
 the cartesian product work against you on scale at that point.

 Andrew


 On Tue, Apr 15, 2014 at 1:07 AM, Roger Hoover 
 wrote:

> Hi,
>
> I'm trying to figure out how to join two RDDs with different key types
> and appreciate any suggestions.
>
> Say I have two RDDS:
> ipToUrl of type (IP, String)
> ipRangeToZip of type (IPRange, String)
>
> How can I join/cogroup these two RDDs together to produce a new RDD of
> type (IP, (String, String)) where IP is the key and the values are the 
> urls
> and zipcodes?
>
> Say I have a method on the IPRange class called matches(ip: IP), I
> want the joined records to match when ipRange.matches(ip).
>
> Thanks,
>
> Roger
>
>

>>>
>>
>


Re: sbt assembly error

2014-04-16 Thread Sean Owen
This is just a red herring. You are seeing the build fail to contact
many repos it knows about, including ones that do not have a given
artifact.

This is almost always a symptom of network connectivity problem, like
perhaps a proxy in between, esp. one that breaks HTTPS connections.
You may need to fix your proxy settings for Maven locally.
--
Sean Owen | Director, Data Science | London


On Wed, Apr 16, 2014 at 8:32 PM, Arpit Tak  wrote:
> Its because , there is no sl4f directory exists there may be they
> updating it .
> https://oss.sonatype.org/content/repositories/snapshots/org/
>
> Hard luck  try after some time...
>
> Regards,
> Arpit
>
>
> On Thu, Apr 17, 2014 at 12:33 AM, Yiou Li  wrote:
>>
>> Hi all,
>>
>> I am trying to build spark assembly using sbt and got connection error
>> when resolving dependencies:
>>
>> I tried web browser and wget some of the dependency links in the error and
>> also got 404 error too.
>>
>> This happened to the following branches:
>> spark-0.8.1-incubating
>> spark-0.9.1
>> spark-0.9.1-bin-hadoop2
>>
>> Can somebody please kindly advise?
>>
>> Best,
>> Leo
>>
>>
>> Launching sbt from sbt/sbt-launch-0.12.4.jar
>> [info] Loading project definition from
>> /home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/project/project
>> [info] Loading project definition from
>> /home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/project
>> [info] Set current project to root (in build
>> file:/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/)
>> [info] Updating
>> {file:/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/}core...
>> [info] Resolving org.slf4j#slf4j-log4j12;1.7.2 ...
>> [error] Server access Error: Connection timed out
>> url=https://oss.sonatype.org/content/repositories/snapshots/org/slf4j/slf4j-log4j12/1.7.2/slf4j-log4j12-1.7.2.pom
>> [error] Server access Error: Connection timed out
>> url=https://oss.sonatype.org/service/local/staging/deploy/maven2/org/slf4j/slf4j-log4j12/1.7.2/slf4j-log4j12-1.7.2.pom
>> [info] Resolving commons-daemon#commons-daemon;1.0.10 ...
>> [error] Server access Error: Connection timed out
>> url=https://oss.sonatype.org/content/repositories/snapshots/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
>> [error] Server access Error: Connection timed out
>> url=https://oss.sonatype.org/service/local/staging/deploy/maven2/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
>> [error] Server access Error: Connection timed out
>> url=https://repository.cloudera.com/artifactory/cloudera-repos/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
>> [info] Resolving org.apache.commons#commons-parent;23 ...
>> [error] Server access Error: Connection timed out
>> url=https://oss.sonatype.org/content/repositories/snapshots/org/apache/commons/commons-parent/23/commons-parent-23.pom
>>
>> (truncated)
>
>


Re: How to cogroup/join pair RDDs with different key types?

2014-04-16 Thread Andrew Ash
Glad to hear you're making progress!  Do you have a working version of the
join?  Is there anything else you need help with?


On Wed, Apr 16, 2014 at 7:11 PM, Roger Hoover wrote:

> Ah, in case this helps others, looks like RDD.zipPartitions will
> accomplish step 4.
>
>
> On Tue, Apr 15, 2014 at 10:44 AM, Roger Hoover wrote:
>
>> Andrew,
>>
>> Thank you very much for your feedback.  Unfortunately, the ranges are not
>> of predictable size but you gave me an idea of how to handle it.  Here's
>> what I'm thinking:
>>
>> 1. Choose number of partitions, n, over IP space
>> 2. Preprocess the IPRanges, splitting any of them that cross partition
>> boundaries
>> 3. Partition ipToUrl and the new ipRangeToZip according to the
>> partitioning scheme from step 1
>> 4. Join matching partitions of these two RDDs
>>
>> I still don't know how to do step 4 though.  I see that RDDs have a
>> mapPartitions() operation to let you do whatever you want with a partition.
>>  What I need is a way to get my hands on two partitions at once, each from
>> different RDDs.
>>
>> Any ideas?
>>
>> Thanks,
>>
>> Roger
>>
>>
>> On Mon, Apr 14, 2014 at 5:45 PM, Andrew Ash  wrote:
>>
>>> Are your IPRanges all on nice, even CIDR-format ranges?  E.g.
>>> 192.168.0.0/16 or 10.0.0.0/8?
>>>
>>> If the range is always an even subnet mask and not split across subnets,
>>> I'd recommend flatMapping the ipToUrl RDD to (IPRange, String) and then
>>> joining the two RDDs.  The expansion would be at most 32x if all your
>>> ranges can be expressed in CIDR notation, and in practice would be much
>>> smaller than that (typically you don't need things bigger than a /8 and
>>> often not smaller than a /24)
>>>
>>> Hopefully you can use your knowledge of the ip ranges to make this
>>> feasible.
>>>
>>> Otherwise, you could additionally flatmap the ipRangeToZip out to a list
>>> of CIDR notations and do the join then, but you're starting to have the
>>> cartesian product work against you on scale at that point.
>>>
>>> Andrew
>>>
>>>
>>> On Tue, Apr 15, 2014 at 1:07 AM, Roger Hoover wrote:
>>>
 Hi,

 I'm trying to figure out how to join two RDDs with different key types
 and appreciate any suggestions.

 Say I have two RDDS:
 ipToUrl of type (IP, String)
 ipRangeToZip of type (IPRange, String)

 How can I join/cogroup these two RDDs together to produce a new RDD of
 type (IP, (String, String)) where IP is the key and the values are the urls
 and zipcodes?

 Say I have a method on the IPRange class called matches(ip: IP), I want
 the joined records to match when ipRange.matches(ip).

 Thanks,

 Roger


>>>
>>
>


Re: sbt assembly error

2014-04-16 Thread Arpit Tak
Its because , there is no sl4f directory exists there may be they
updating it .
https://oss.sonatype.org/content/repositories/snapshots/org/

Hard luck  try after some time...

Regards,
Arpit


On Thu, Apr 17, 2014 at 12:33 AM, Yiou Li  wrote:

> Hi all,
>
> I am trying to build spark assembly using sbt and got connection error
> when resolving dependencies:
>
> I tried web browser and wget some of the dependency links in the error and
> also got 404 error too.
>
> This happened to the following branches:
> spark-0.8.1-incubating
> spark-0.9.1
> spark-0.9.1-bin-hadoop2
>
> Can somebody please kindly advise?
>
> Best,
> Leo
>
>
> Launching sbt from sbt/sbt-launch-0.12.4.jar
> [info] Loading project definition from
> /home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/project/project
> [info] Loading project definition from
> /home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/project
> [info] Set current project to root (in build
> file:/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/)
> [info] Updating
> {file:/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/}core...
> [info] Resolving org.slf4j#slf4j-log4j12;1.7.2 ...
> [error] Server access Error: Connection timed out url=
> https://oss.sonatype.org/content/repositories/snapshots/org/slf4j/slf4j-log4j12/1.7.2/slf4j-log4j12-1.7.2.pom
> [error] Server access Error: Connection timed out url=
> https://oss.sonatype.org/service/local/staging/deploy/maven2/org/slf4j/slf4j-log4j12/1.7.2/slf4j-log4j12-1.7.2.pom
> [info] Resolving commons-daemon#commons-daemon;1.0.10 ...
> [error] Server access Error: Connection timed out url=
> https://oss.sonatype.org/content/repositories/snapshots/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
> [error] Server access Error: Connection timed out url=
> https://oss.sonatype.org/service/local/staging/deploy/maven2/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
> [error] Server access Error: Connection timed out url=
> https://repository.cloudera.com/artifactory/cloudera-repos/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
> [info] Resolving org.apache.commons#commons-parent;23 ...
> [error] Server access Error: Connection timed out url=
> https://oss.sonatype.org/content/repositories/snapshots/org/apache/commons/commons-parent/23/commons-parent-23.pom
>
> (truncated)
>


Re: Spark packaging

2014-04-16 Thread Arpit Tak
Also try this ...
http://docs.sigmoidanalytics.com/index.php/How_to_Install_Spark_on_Ubuntu-12.04

http://docs.sigmoidanalytics.com/index.php/How_to_Install_Spark_on_HortonWorks_VM

Regards,
arpit


On Thu, Apr 10, 2014 at 3:04 AM, Pradeep baji
wrote:

> Thanks Prabeesh.
>
>
> On Wed, Apr 9, 2014 at 12:37 AM, prabeesh k  wrote:
>
>> Please refer
>>
>> http://prabstechblog.blogspot.in/2014/04/creating-single-jar-for-spark-project.html
>>
>> Regards,
>> prabeesh
>>
>>
>> On Wed, Apr 9, 2014 at 1:04 PM, Pradeep baji > > wrote:
>>
>>> Hi all,
>>>
>>> I am new to spark and trying to learn it. Is there any document which
>>> describes how spark is packaged. ( like dependencies needed to build spark,
>>> which jar contains what after build etc)
>>>
>>> Thanks for the help.
>>>
>>> Regards,
>>> Pradeep
>>>
>>>
>>
>


sbt assembly error

2014-04-16 Thread Yiou Li
Hi all,

I am trying to build spark assembly using sbt and got connection error when
resolving dependencies:

I tried web browser and wget some of the dependency links in the error and
also got 404 error too.

This happened to the following branches:
spark-0.8.1-incubating
spark-0.9.1
spark-0.9.1-bin-hadoop2

Can somebody please kindly advise?

Best,
Leo


Launching sbt from sbt/sbt-launch-0.12.4.jar
[info] Loading project definition from
/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/project/project
[info] Loading project definition from
/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/project
[info] Set current project to root (in build
file:/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/)
[info] Updating
{file:/home/leo/workspace_spark/spark-core/spark-0.9.1-bin-hadoop2/}core...
[info] Resolving org.slf4j#slf4j-log4j12;1.7.2 ...
[error] Server access Error: Connection timed out url=
https://oss.sonatype.org/content/repositories/snapshots/org/slf4j/slf4j-log4j12/1.7.2/slf4j-log4j12-1.7.2.pom
[error] Server access Error: Connection timed out url=
https://oss.sonatype.org/service/local/staging/deploy/maven2/org/slf4j/slf4j-log4j12/1.7.2/slf4j-log4j12-1.7.2.pom
[info] Resolving commons-daemon#commons-daemon;1.0.10 ...
[error] Server access Error: Connection timed out url=
https://oss.sonatype.org/content/repositories/snapshots/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
[error] Server access Error: Connection timed out url=
https://oss.sonatype.org/service/local/staging/deploy/maven2/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
[error] Server access Error: Connection timed out url=
https://repository.cloudera.com/artifactory/cloudera-repos/commons-daemon/commons-daemon/1.0.10/commons-daemon-1.0.10.pom
[info] Resolving org.apache.commons#commons-parent;23 ...
[error] Server access Error: Connection timed out url=
https://oss.sonatype.org/content/repositories/snapshots/org/apache/commons/commons-parent/23/commons-parent-23.pom

(truncated)


Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-04-16 Thread Arpit Tak
I too stuck on same issue , but on shark (0.9 with spark-0.9 ) on
hadoop-2.2.0 .

On rest hadoop versions , it works perfect

Regards,
Arpit Tak


On Wed, Apr 16, 2014 at 11:18 PM, Aureliano Buendia wrote:

> Is this resolved in spark 0.9.1?
>
>
> On Tue, Apr 15, 2014 at 6:55 PM, anant  wrote:
>
>> I've received the same error with Spark built using Maven. It turns out
>> that
>> mesos-0.13.0 depends on protobuf-2.4.1 which is causing the clash at
>> runtime. Protobuf included by Akka is shaded and doesn't cause any
>> problems.
>>
>> The solution is to update the mesos dependency to 0.18.0 in spark's
>> pom.xml.
>> Rebuilding the JAR with this configuration solves the issue.
>>
>> -Anant
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158p4286.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: How to cogroup/join pair RDDs with different key types?

2014-04-16 Thread Roger Hoover
Ah, in case this helps others, looks like RDD.zipPartitions will accomplish
step 4.


On Tue, Apr 15, 2014 at 10:44 AM, Roger Hoover wrote:

> Andrew,
>
> Thank you very much for your feedback.  Unfortunately, the ranges are not
> of predictable size but you gave me an idea of how to handle it.  Here's
> what I'm thinking:
>
> 1. Choose number of partitions, n, over IP space
> 2. Preprocess the IPRanges, splitting any of them that cross partition
> boundaries
> 3. Partition ipToUrl and the new ipRangeToZip according to the
> partitioning scheme from step 1
> 4. Join matching partitions of these two RDDs
>
> I still don't know how to do step 4 though.  I see that RDDs have a
> mapPartitions() operation to let you do whatever you want with a partition.
>  What I need is a way to get my hands on two partitions at once, each from
> different RDDs.
>
> Any ideas?
>
> Thanks,
>
> Roger
>
>
> On Mon, Apr 14, 2014 at 5:45 PM, Andrew Ash  wrote:
>
>> Are your IPRanges all on nice, even CIDR-format ranges?  E.g.
>> 192.168.0.0/16 or 10.0.0.0/8?
>>
>> If the range is always an even subnet mask and not split across subnets,
>> I'd recommend flatMapping the ipToUrl RDD to (IPRange, String) and then
>> joining the two RDDs.  The expansion would be at most 32x if all your
>> ranges can be expressed in CIDR notation, and in practice would be much
>> smaller than that (typically you don't need things bigger than a /8 and
>> often not smaller than a /24)
>>
>> Hopefully you can use your knowledge of the ip ranges to make this
>> feasible.
>>
>> Otherwise, you could additionally flatmap the ipRangeToZip out to a list
>> of CIDR notations and do the join then, but you're starting to have the
>> cartesian product work against you on scale at that point.
>>
>> Andrew
>>
>>
>> On Tue, Apr 15, 2014 at 1:07 AM, Roger Hoover wrote:
>>
>>> Hi,
>>>
>>> I'm trying to figure out how to join two RDDs with different key types
>>> and appreciate any suggestions.
>>>
>>> Say I have two RDDS:
>>> ipToUrl of type (IP, String)
>>> ipRangeToZip of type (IPRange, String)
>>>
>>> How can I join/cogroup these two RDDs together to produce a new RDD of
>>> type (IP, (String, String)) where IP is the key and the values are the urls
>>> and zipcodes?
>>>
>>> Say I have a method on the IPRange class called matches(ip: IP), I want
>>> the joined records to match when ipRange.matches(ip).
>>>
>>> Thanks,
>>>
>>> Roger
>>>
>>>
>>
>


Re: Shark: class java.io.IOException: Cannot run program "/bin/java"

2014-04-16 Thread Arpit Tak
just set your java class path properly

export JAVA_HOME=/usr/lib/jvm/java-7-. (somewhat like this...whatever
version you having)

it will work

Regards,
Arpit


On Wed, Apr 16, 2014 at 1:24 AM, ge ko  wrote:

> Hi,
>
>
>
> after starting the shark-shell
> via /opt/shark/shark-0.9.1/bin/shark-withinfo -skipRddReload I receive lots
> of output, including the exception that /bin/java cannot be executed. But
> it is linked to /usr/bin/java ?!?!
>
>
>
> root#>ls -al /bin/java
>
> lrwxrwxrwx 1 root root 13 15. Apr 21:45 /bin/java -> /usr/bin/java
>
> root#>/bin/java -version
>
> java version "1.7.0_51"
> OpenJDK Runtime Environment (rhel-2.4.4.1.el6_5-x86_64 u51-b02)
> OpenJDK 64-Bit Server VM (build 24.45-b08, mixed mode)
>
>
>
> Starting the shark shell:
>
>
>
> [root@hadoop-pg-5 bin]# /opt/shark/shark-0.9.1/bin/shark-withinfo
> -skipRddReload
> -hiveconf hive.root.logger=INFO,console -skipRddReload
> Starting the Shark Command Line Client
> 14/04/15 21:45:57 WARN conf.HiveConf: DEPRECATED: Configuration property
> hive.metastore.local no longer has any effect. Make sure to provide a valid
> value for hive.metastore.uris if you are connecting to a remote metastore.
> 14/04/15 21:45:58 WARN conf.HiveConf: DEPRECATED: Configuration property
> hive.metastore.local no longer has any effect. Make sure to provide a valid
> value for hive.metastore.uris if you are connecting to a remote metastore.
>
> Logging initialized using configuration in
> jar:file:/opt/shark/shark-0.9.1/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark-0.9.1.jar!/hive-log4j.properties
> 14/04/15 21:45:58 INFO SessionState:
> Logging initialized using configuration in
> jar:file:/opt/shark/shark-0.9.1/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark-0.9.1.jar!/hive-log4j.properties
> Hive history
> file=/tmp/root/hive_job_log_root_22574@hadoop-pg-5.cluster_201404152145_159664609.txt
> 14/04/15 21:45:58 INFO exec.HiveHistory: Hive history
> file=/tmp/root/hive_job_log_root_22574@hadoop-pg-5.cluster_201404152145_159664609.txt
> 14/04/15 21:45:58 WARN conf.HiveConf: DEPRECATED: Configuration property
> hive.metastore.local no longer has any effect. Make sure to provide a valid
> value for hive.metastore.uris if you are connecting to a remote metastore.
> 14/04/15 21:45:59 WARN conf.HiveConf: DEPRECATED: Configuration property
> hive.metastore.local no longer has any effect. Make sure to provide a valid
> value for hive.metastore.uris if you are connecting to a remote metastore.
> 14/04/15 21:46:00 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 14/04/15 21:46:00 INFO Remoting: Starting remoting
> 14/04/15 21:46:00 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://spark@hadoop-pg-5.cluster:38835]
> 14/04/15 21:46:00 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://spark@hadoop-pg-5.cluster:38835]
> 14/04/15 21:46:00 INFO spark.SparkEnv: Registering BlockManagerMaster
> 5,108: [GC 262656K->26899K(1005568K), 0,0409080 secs]
> 14/04/15 21:46:00 INFO storage.DiskBlockManager: Created local directory
> at /tmp/spark-local-20140415214600-9537
> 14/04/15 21:46:00 INFO storage.MemoryStore: MemoryStore started with
> capacity 589.2 MB.
> 14/04/15 21:46:00 INFO network.ConnectionManager: Bound socket to port
> 51889 with id = ConnectionManagerId(hadoop-pg-5.cluster,51889)
> 14/04/15 21:46:00 INFO storage.BlockManagerMaster: Trying to register
> BlockManager
> 14/04/15 21:46:00 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Registering block manager hadoop-pg-5.cluster:51889 with 589.2 MB RAM
> 14/04/15 21:46:00 INFO storage.BlockManagerMaster: Registered BlockManager
> 14/04/15 21:46:00 INFO spark.HttpServer: Starting HTTP Server
> 14/04/15 21:46:00 INFO server.Server: jetty-7.6.8.v20121106
> 14/04/15 21:46:00 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:59414
> 14/04/15 21:46:00 INFO broadcast.HttpBroadcast: Broadcast server started
> at http://10.147.210.5:59414
> 14/04/15 21:46:01 INFO spark.SparkEnv: Registering MapOutputTracker
> 14/04/15 21:46:01 INFO spark.HttpFileServer: HTTP File server directory is
> /tmp/spark-cf56ada9-d950-4abc-a1c3-76fecdc4faa3
> 14/04/15 21:46:01 INFO spark.HttpServer: Starting HTTP Server
> 14/04/15 21:46:01 INFO server.Server: jetty-7.6.8.v20121106
> 14/04/15 21:46:01 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:45689
> 14/04/15 21:46:01 INFO server.Server: jetty-7.6.8.v20121106
> 14/04/15 21:46:01 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/storage/rdd,null}
> 14/04/15 21:46:01 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/storage,null}
> 14/04/15 21:46:01 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/stages/stage,null}
> 14/04/15 21:46:01 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/stages/pool,null}
> 14/04/15 21:46:01 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/stages,null}
> 14/0

Using google cloud storage for spark big data

2014-04-16 Thread Aureliano Buendia
Hi,

Google has publisheed a new connector for hadoop: google cloud storage,
which is an equivalent of amazon s3:

googlecloudplatform.blogspot.com/2014/04/google-bigquery-and-datastore-connectors-for-hadoop.html

How can spark be configured to use this connector?


Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-04-16 Thread Aureliano Buendia
Is this resolved in spark 0.9.1?


On Tue, Apr 15, 2014 at 6:55 PM, anant  wrote:

> I've received the same error with Spark built using Maven. It turns out
> that
> mesos-0.13.0 depends on protobuf-2.4.1 which is causing the clash at
> runtime. Protobuf included by Akka is shaded and doesn't cause any
> problems.
>
> The solution is to update the mesos dependency to 0.18.0 in spark's
> pom.xml.
> Rebuilding the JAR with this configuration solves the issue.
>
> -Anant
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-reading-HDFS-file-using-spark-0-9-0-hadoop-2-2-0-incompatible-protobuf-2-5-and-2-4-1-tp2158p4286.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: using saveAsNewAPIHadoopFile with OrcOutputFormat

2014-04-16 Thread Kostiantyn Kudriavtsev
I’d prefer to find good example of using saveAsNewAPIHadoopFile with different 
OutputFormat implementations (not only orc, but EsOutputFormat, etc). Any 
common example

On Apr 16, 2014, at 4:51 PM, Brock Bose  wrote:

> Howdy all, 
> I recently saw that the OrcInputFormat/OutputFormat's have been exposed 
> to be usable outside of hive 
> (https://issues.apache.org/jira/browse/HIVE-5728).   Does anyone know how one 
> could use this with saveAsNewAPIHadoopFile to write records in orc format?
>In particular, I would like to use a spark streaming process to read avro 
> records off of kafka, and write then write them directly to hdfs in orc 
> format where they could be used with shark.
> 
> Thanks,
> Brock



SPARK_YARN_APP_JAR, SPARK_CLASSPATH and ADD_JARS in a spark-shell on YARN

2014-04-16 Thread Christophe Préaud

Hi,

I am running Spark 0.9.1 on a YARN cluster, and I am wondering which is the
correct way to add external jars when running a spark shell on a YARN cluster.

Packaging all this dependencies in an assembly which path is then set in
SPARK_YARN_APP_JAR (as written in the doc:
http://spark.apache.org/docs/latest/running-on-yarn.html) does not work in my
case: it pushes the jar on HDFS in .sparkStaging/application_XXX, but the
spark-shell is still unable to find it (unless ADD_JARS and/or SPARK_CLASSPATH
is defined)

Defining all the dependencies (either in an assembly, or separately) in ADD_JARS
or SPARK_CLASSPATH works (even if SPARK_YARN_APP_JAR is set to /dev/null), but
defining some dependencies in ADD_JARS and the rest in SPARK_CLASSPATH does not!

Hence I'm still wondering which are the differences between ADD_JARS and
SPARK_CLASSPATH, and the purpose of SPARK_YARN_APP_JAR.

Thanks for any insights!
Christophe.



Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


graph.reverse & Pregel API

2014-04-16 Thread Bogdan Ghidireac
Hello,

I am using Pregel API with Spark (1.0 branch compiled on Apr 16th) and I
run into some problems when my graph has the edges reversed.
If the edges of my graph are reversed, the sendMsg function does no longer
receives the attribute for the source vertex (it is null). This does not
happen with the original graph.

I find this behavior inconsistent and the sendMsg becomes unusable if
the EdgeTriplet has only the vertexIDs and no other information.

I attached the code to reproduce the behavior.
http://pastebin.com/Xbb5wDNi

If you run the program, it will throw a NPE at line 44 because edge.srcAttr
is null

If you remove ".reverse" from line 25 and run it again, edge.srcAttr will
have a value and the program will complete.

Am I using graph.reverse incorrectly?

Thank you for your help.
Bogdan


Create cache fails on first time

2014-04-16 Thread Arpit Tak
I am loading some data(25GB) in shark from hdfs : spark,shark ( both- 0.9)
. Generally it happens that caching a table some time fails, for the very
first time we are   caching data. Second time it runs successfully ...

Anybody facing same issue ??..

*Shark Client Log:*
> create table sample_cached as access_cached * from access;
[Hive Error]: Query returned non-zero code: 9, cause: FAILED: Execution
Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask
Time taken (including network latency): 68.016 seconds

*SharkServer Log:*
Moving data to: hdfs://
ec2-xyz.compute-1.amazonaws.com:9000/user/hive/warehouse/access_cached
Failed with exception Unable to rename: hdfs://
ec2-xyz.compute-1.amazonaws.com:9000/tmp/hive-root/hive_2014-04-16_10-52-10_487_3421016764043167178/-ext-10004to:
hdfs://
ec2-xyz.compute-1.amazonaws.com:9000/user/hive/warehouse/access_cached
FAILED: Execution Error, return code 1 from
org.apache.hadoop.hive.ql.exec.MoveTask


Regards,
Arpit Tak


using saveAsNewAPIHadoopFile with OrcOutputFormat

2014-04-16 Thread Brock Bose
Howdy all,
I recently saw that the OrcInputFormat/OutputFormat's have been exposed
to be usable outside of hive (
https://issues.apache.org/jira/browse/HIVE-5728).   Does anyone know how
one could use this with saveAsNewAPIHadoopFile to write records in orc
format?
   In particular, I would like to use a spark streaming process to read
avro records off of kafka, and write then write them directly to hdfs in
orc format where they could be used with shark.

Thanks,
Brock


PySpark still reading only text?

2014-04-16 Thread Bertrand Dechoux
Hi,

I have browsed the online documentation and it is stated that PySpark only
read text files as sources. Is it still the case?

>From what I understand, the RDD can after this first step be any serialized
python structure if the class definitions are well distributed.

Is it not possible to read back those RDDs? That is create a flow to parse
everything and then, e.g. the next week, start from the binary, structured
data?

Technically, what is the difficulty? I would assume the code reading a
binary python RDD or a binary python file to be quite similar. Where can I
know more about this subject?

Thanks in advance

Bertrand


Re: Spark program thows OutOfMemoryError

2014-04-16 Thread Andre Bois-Crettez

Seem you have not enough memory on the spark driver. Hints below :

On 2014-04-15 12:10, Qin Wei wrote:

 val resourcesRDD = jsonRDD.map(arg =>
arg.get("rid").toString.toLong).distinct

 // the program crashes at this line of code
 val bcResources = sc.broadcast(resourcesRDD.collect.toList)

what is returned by resources.RDD.count() ?


The data file “/home/deployer/uris.dat” is 2G  with lines like this : {
"id" : 1, "a" : { "0" : 1 }, "rid" : 5487628, "zid" : "10550869" }

And here is my spark-env.sh
 export SCALA_HOME=/usr/local/scala-2.10.3
 export SPARK_MASTER_IP=192.168.2.184
 export SPARK_MASTER_PORT=7077
 export SPARK_LOCAL_IP=192.168.2.182
 export SPARK_WORKER_MEMORY=20g
 export SPARK_MEM=10g
 export SPARK_JAVA_OPTS="-Xms4g -Xmx40g -XX:MaxPermSize=10g
-XX:-UseGCOverheadLimit"

/try setting SPARK_DRIVER_MEMORY to a bigger value, as default 512m is
probably too small for the resourcesRDD.collect()/
By the way, are you really sure you need to collect all that ?

/André Bois-Crettez

Software Architect
Big Data Developer
http://www.kelkoo.com/
/

Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Java heap space and spark.akka.frameSize Inbox x

2014-04-16 Thread Chieh-Yen
Dear all,

I developed a application that the message size of communication
is greater than 10 MB sometimes.
For smaller datasets it works fine, but fails for larger datasets.
Please check the error message following.

I surveyed the situation online and lots of people said
the problem can be solved by modifying the property of spark.akka.frameSize
and spark.reducer.maxMbInFlight.
It may look like:

134 val conf = new SparkConf()
135 .setMaster(master)
136 .setAppName("SparkLR")
137
.setSparkHome("/home/user/spark-0.9.0-incubating-bin-hadoop2")
138 .setJars(List(jarPath))
139 .set("spark.akka.frameSize", "100")
140 .set("spark.reducer.maxMbInFlight", "100")
141 val sc = new SparkContext(conf)

However, the task still fails with the same error message.
The communication message is the weight vectors of each sub-problem,
it may be larger than 10 MB for higher dimensional dataset.

Is there anybody can help me?
Thanks a lot.


[error] (run-main) org.apache.spark.SparkException: Job aborted: Exception
while deserializing and fetching task:*java.lang.OutOfMemoryError: Java
heap space*
org.apache.spark.SparkException: Job aborted: Exception while deserializing
and fetching task: java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[trace] Stack trace suppressed: run last compile:run for the full output.


Chieh-Yen


Re: Proper caching method

2014-04-16 Thread Arpit Tak
Thanks Cheng , that was helpful..


On Wed, Apr 16, 2014 at 1:29 PM, Cheng Lian  wrote:

> You can remove cached rdd1 from the cache manager by calling
> rdd1.unpersist(). But here comes some subtleties: RDD.cache() is *lazy*while
> RDD.unpersist() is *eager*. When .cache() is called, it just tells Spark
> runtime to cache the RDD *later* when corresponding job that uses this
> RDD is submitted; when .unpersist() is called, the cached RDD is removed
> immediately. So you may want to do something like this to avoid rdd1taking 
> too much memory:
>
> val rdd1 = sc.textFile(path).cache()val rdd2 = rdd1.filter(...).cache()val 
> rdd3 = rdd1.filter(...).cache()
> // Trigger a job to materialize and cache rdd1, rdd2 & rdd3
> (rdd2 ++ rdd3).count()
> // Remove rdd1
> rdd1.unpersist()
> // Use rdd2 & rdd3 for later logics.
>
> In this way, an additional job is required so that you have chance to
> evict rdd1 as early as possible.
>
>
> On Wed, Apr 16, 2014 at 2:43 PM, Arpit Tak wrote:
>
>> Hi Cheng,
>>
>> Is it possibe to delete or replicate an rdd ??
>>
>>
>> > rdd1 = textFile("hdfs...").cache()
>> >
>> > rdd2 = rdd1.filter(userDefinedFunc1).cache()
>> > rdd3 = rdd1.filter(userDefinedFunc2).cache()
>>
>> I reframe above question , if rdd1 is around 50G and after filtering its
>> come around say 4G.
>> then to increase computing performance we just cached it .. but rdd2 and
>> rdd3 are on disk ..
>> so this will show somehow show good performance than performing filter on
>> disk , then caching rdd2 and rdd3.
>>
>> or can we also remove a particular rdd from cache say rdd1(if cached)
>> after filtered operation as its not required and we save memory usage.
>>
>> Regards,
>> Arpit
>>
>>
>> On Tue, Apr 15, 2014 at 7:14 AM, Cheng Lian wrote:
>>
>>> Hi Joe,
>>>
>>> You need to make sure which RDD is used most frequently. In your case,
>>> rdd2 & rdd3 are filtered result of rdd1, so usually they are relatively
>>> smaller than rdd1, and it would be more reasonable to cache rdd2 and/or
>>> rdd3 if rdd1 is not referenced elsewhere.
>>>
>>> Say rdd1 takes 10G, rdd2 takes 1G after filtering, if you cache both of
>>> them, you end up with 11G memory consumption, which might not be what you
>>> want.
>>>
>>> Regards
>>> Cheng
>>>
>>>
>>> On Mon, Apr 14, 2014 at 8:32 PM, Joe L  wrote:
>>>
 Hi I am trying to cache 2Gbyte data and to implement the following
 procedure.
 In order to cache them I did as follows: Is it necessary to cache rdd2
 since
 rdd1 is already cached?

 rdd1 = textFile("hdfs...").cache()

 rdd2 = rdd1.filter(userDefinedFunc1).cache()
 rdd3 = rdd1.filter(userDefinedFunc2).cache()






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Proper-caching-method-tp4206.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

>>>
>>>
>>
>


Re: Proper caching method

2014-04-16 Thread Cheng Lian
You can remove cached rdd1 from the cache manager by calling
rdd1.unpersist(). But here comes some subtleties: RDD.cache() is *lazy*while
RDD.unpersist() is *eager*. When .cache() is called, it just tells Spark
runtime to cache the RDD *later* when corresponding job that uses this RDD
is submitted; when .unpersist() is called, the cached RDD is removed
immediately. So you may want to do something like this to avoid rdd1 taking
too much memory:

val rdd1 = sc.textFile(path).cache()val rdd2 =
rdd1.filter(...).cache()val rdd3 = rdd1.filter(...).cache()
// Trigger a job to materialize and cache rdd1, rdd2 & rdd3
(rdd2 ++ rdd3).count()
// Remove rdd1
rdd1.unpersist()
// Use rdd2 & rdd3 for later logics.

In this way, an additional job is required so that you have chance to evict
rdd1 as early as possible.


On Wed, Apr 16, 2014 at 2:43 PM, Arpit Tak wrote:

> Hi Cheng,
>
> Is it possibe to delete or replicate an rdd ??
>
>
> > rdd1 = textFile("hdfs...").cache()
> >
> > rdd2 = rdd1.filter(userDefinedFunc1).cache()
> > rdd3 = rdd1.filter(userDefinedFunc2).cache()
>
> I reframe above question , if rdd1 is around 50G and after filtering its
> come around say 4G.
> then to increase computing performance we just cached it .. but rdd2 and
> rdd3 are on disk ..
> so this will show somehow show good performance than performing filter on
> disk , then caching rdd2 and rdd3.
>
> or can we also remove a particular rdd from cache say rdd1(if cached)
> after filtered operation as its not required and we save memory usage.
>
> Regards,
> Arpit
>
>
> On Tue, Apr 15, 2014 at 7:14 AM, Cheng Lian  wrote:
>
>> Hi Joe,
>>
>> You need to make sure which RDD is used most frequently. In your case,
>> rdd2 & rdd3 are filtered result of rdd1, so usually they are relatively
>> smaller than rdd1, and it would be more reasonable to cache rdd2 and/or
>> rdd3 if rdd1 is not referenced elsewhere.
>>
>> Say rdd1 takes 10G, rdd2 takes 1G after filtering, if you cache both of
>> them, you end up with 11G memory consumption, which might not be what you
>> want.
>>
>> Regards
>> Cheng
>>
>>
>> On Mon, Apr 14, 2014 at 8:32 PM, Joe L  wrote:
>>
>>> Hi I am trying to cache 2Gbyte data and to implement the following
>>> procedure.
>>> In order to cache them I did as follows: Is it necessary to cache rdd2
>>> since
>>> rdd1 is already cached?
>>>
>>> rdd1 = textFile("hdfs...").cache()
>>>
>>> rdd2 = rdd1.filter(userDefinedFunc1).cache()
>>> rdd3 = rdd1.filter(userDefinedFunc2).cache()
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Proper-caching-method-tp4206.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>


Re: Why these operations are slower than the equivalent on Hadoop?

2014-04-16 Thread Yanzhe Chen
Hi Eugen,  

Sorry if I haven’t catch your point. In the second example,  

val result = data.mapPartitions(points => skyline(points.toArray).iterator)
 .reduce {
case (left, right) =>
skyline(left ++ right)
 }

In my understanding, if the data is type RDD, then both left and right 
in reduce case is type Point, so why skyline will run on two partitions rather 
than two single points?

Besides, the definition of my skyline function is skyline(points: Array[Point]) 
: Array[Point].

Thanks,

Yanzhe

On Wednesday, April 16, 2014 at 2:43 PM, Eugen Cepoi wrote:  
> Yes, the second example does that. It transforms all the points of a 
> partition into a single element the skyline, thus reduce will run on the 
> skyline of two partitions and not on single points.  
> Le 16 avr. 2014 06:47, "Yanzhe Chen"  (mailto:yanzhe...@gmail.com)> a écrit :
> > Eugen,
> >  
> > Thanks for your tip and I do want to merge the result of a partition with 
> > another one but I am still not quite clear how to do it.  
> >  
> > Say the original data rdd has 32 partitions and since mapPartitions won’t 
> > change the number of partitions, it will remain 32 partitions which each 
> > contains the partial skyline of points in its partition. Now I want to 
> > merge those 32 partitions to generate a new skyline. It will be better if I 
> > can use reduce to merge each two of them (than just collect them in to 
> > one), but I think simply calling reduce method on the rdd won’t work 
> > because it reduce the data at the granularity of point rather than the 
> > partition results (which is the collection of points). So is there a way to 
> > reduce the data at the granularity of partitions?  
> >  
> > Thanks,
> >  
> > Yanzhe  
> >  
> > On Wednesday, April 16, 2014 at 2:24 AM, Eugen Cepoi wrote:
> >  
> > > It depends on your algorithm but I guess that you probably should use 
> > > reduce (the code probably doesn't compile but it shows you the idea).
> > >  
> > > val result = data.reduce { case (left, right) =>
> > >   skyline(left ++ right)
> > > }
> > >  
> > > Or in the case you want to merge the result of a partition with another 
> > > one you could do:
> > >  
> > > val result = data.mapPartitions { points =>  
> > >  
> > > // transforms all the partition into a single 
> > > element, but this may incur some other problems, especially if you use 
> > > Kryo serialization...
> > > Seq(skyline(points.toArray))  
> > >  }.reduce { case (left, right) =>
> > >  
> > > skyline(left ++ right)
> > >  }
> > >  
> > >  
> > >  
> > >  
> > > 2014-04-15 19:37 GMT+02:00 Cheng Lian  > > (mailto:lian.cs@gmail.com)>:
> > > >  
> > > > Your Spark solution first reduces partial results into a single 
> > > > partition, computes the final result, and then collects to the driver 
> > > > side. This involves a shuffle and two waves of network traffic. 
> > > > Instead, you can directly collect partial results to the driver and 
> > > > then computes the final results on driver side:
> > > >  
> > > > val data = sc.textFile(...).map(line => 
> > > > line.split(",").map(_.toDouble)) val partialResults = 
> > > > data.mapPartitions(points => 
> > > > skyline(points.toArray).iterator).collect() val results = 
> > > > skyline(partialResults)  
> > > >  
> > > > On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen  > > > (mailto:yanzhe...@gmail.com)> wrote:
> > > >  
> > > >  
> > > >  
> > > >  
> > > >  
> > > > > Hi all,  
> > > > >  
> > > > > As a previous thread, I am asking how to implement a 
> > > > > divide-and-conquer algorithm (skyline) in Spark.
> > > > > Here is my current solution:
> > > > >  
> > > > > val data = sc.textFile(…).map(line => 
> > > > > line.split(“,”).map(_.toDouble))  
> > > > >  
> > > > > val result = data.mapPartitions(points => 
> > > > > skyline(points.toArray).iterator).coalesce(1, true)  
> > > > >  .mapPartitions(points => 
> > > > > skyline(points.toArray).iterator).collect()
> > > > >  
> > > > > where skyline is a local algorithm to compute the results:  
> > > > >  
> > > > > def skyline(points: Array[Point]) : Array[Point]
> > > > >  
> > > > > Basically, I find this implement runs slower than the corresponding 
> > > > > Hadoop version (the identity map phase plus local skyline for both 
> > > > > combine and reduce phases).  
> > > > >  
> > > > > Below are my questions:
> > > > >  
> > > > > 1. Why this implementation is much slower than the Hadoop one?  
> > > > >  
> > > > > I can find two possible reasons: one is the shuffle overhead in 
> > > > > coalesce, another is calling the toArray and iterator repeatedly when 
> > > > > invoking local skyline algorithm. But I am not sure which one is 
> > > > > true.  
> > > >  
> > > >  
> > > >  
> > > >  
> > > >  
> > > >  
> > > >  
> > > > I haven’t seen your Hadoop version. But if this assumption

Re: Why these operations are slower than the equivalent on Hadoop?

2014-04-16 Thread Cheng Lian
Hmm… The good part of reduce is that it performs local combining within a
single partition automatically, but since you turned each partition into a
single-value one, local combining is not applicable, and reduce simply
degrades to collect and then perform a skyline over all collected partial
results on the driver side.


On Wed, Apr 16, 2014 at 2:43 PM, Eugen Cepoi  wrote:

> Yes, the second example does that. It transforms all the points of a
> partition into a single element the skyline, thus reduce will run on the
> skyline of two partitions and not on single points.
> Le 16 avr. 2014 06:47, "Yanzhe Chen"  a écrit :
>
> Eugen,
>>
>> Thanks for your tip and I do want to merge the result of a partition with
>> another one but I am still not quite clear how to do it.
>>
>> Say the original data rdd has 32 partitions and since mapPartitionswon’t 
>> change the number of partitions, it will remain 32 partitions which
>> each contains the partial skyline of points in its partition. Now I want to
>> merge those 32 partitions to generate a new skyline. It will be better if I
>> can use reduce to merge each two of them (than just collect them in to
>> one), but I think simply calling reduce method on the rdd won’t work
>> because it reduce the data at the granularity of point rather than the
>> partition results (which is the collection of points). So is there a way to
>> reduce the data at the granularity of partitions?
>>
>> Thanks,
>>
>> Yanzhe
>>
>> On Wednesday, April 16, 2014 at 2:24 AM, Eugen Cepoi wrote:
>>
>> It depends on your algorithm but I guess that you probably should use
>> reduce (the code probably doesn't compile but it shows you the idea).
>>
>> val result = data.reduce { case (left, right) =>
>>   skyline(left ++ right)
>> }
>>
>> Or in the case you want to merge the result of a partition with another
>> one you could do:
>>
>> val result = data.mapPartitions { points =>
>>
>> // transforms all the partition into a single
>> element, but this may incur some other problems, especially if you use Kryo
>> serialization...
>> *Seq(skyline*(points.toArray))
>>  }.reduce { case (left, right) =>
>>
>> skyline(left ++ right)
>>  }
>>
>>
>>
>>
>> 2014-04-15 19:37 GMT+02:00 Cheng Lian :
>>
>> Your Spark solution first reduces partial results into a single
>> partition, computes the final result, and then collects to the driver side.
>> This involves a shuffle and two waves of network traffic. Instead, you can
>> directly collect partial results to the driver and then computes the final
>> results on driver side:
>>
>> val data = sc.textFile(...).map(line => line.split(",").map(_.toDouble))val 
>> partialResults = data.mapPartitions(points => 
>> skyline(points.toArray).iterator).collect()val results = 
>> skyline(partialResults)
>>
>> On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen  wrote:
>>
>>  Hi all,
>>
>> As a previous thread, I am asking how to implement a divide-and-conquer
>> algorithm (skyline) in Spark.
>> Here is my current solution:
>>
>> val data = sc.textFile(…).map(line => line.split(“,”).map(_.toDouble))
>>
>> val result = data.mapPartitions(points => 
>> *skyline*(points.toArray).iterator).coalesce(1,
>> true)
>>  .mapPartitions(points => *skyline*
>> (points.toArray).iterator).collect()
>>
>> where skyline is a local algorithm to compute the results:
>>
>> def *skyline*(points: Array[Point]) : Array[Point]
>>
>> Basically, I find this implement runs slower than the corresponding
>> Hadoop version (the identity map phase plus local skyline for both combine
>> and reduce phases).
>>
>> Below are my questions:
>>
>> 1. Why this implementation is much slower than the Hadoop one?
>>
>> I can find two possible reasons: one is the shuffle overhead in coalesce,
>> another is calling the toArray and iterator repeatedly when invoking
>> local skyline algorithm. But I am not sure which one is true.
>>
>> I haven’t seen your Hadoop version. But if this assumption is right, the
>> above version should help.
>>
>>
>> 2. One observation is that while Hadoop version almost used up all the
>> CPU resources during execution, the CPU seems not that hot on Spark. Is
>> that a clue to prove that the shuffling might be the real bottleneck?
>>
>> How many parallel tasks are there when running your Spark code? I doubt
>> tasks are queued and run sequentially.
>>
>>
>> 3. Is there any difference between coalesce(1, true) and reparation? It
>> seems that both opeartions need shuffling data. What’s the proper
>> situations using the coalesce method?
>>
>> repartition(n) is just an alias of coalesce(n, true), so yes, they both
>> involve data shuffling. coalesce can be used to shrink partition number
>> when dataset size shrinks dramatically after operations like filter. Say
>> you have an RDD containing 1TB of data with 100 partitions, after a
>> .filter(...) call, only 20GB data left, then you may want to 

what is a partition? how it works?

2014-04-16 Thread Joe L
I want to know as follows:

what is a partition? how it works?
how it is different from hadoop partition?

For example:
>>> sc.parallelize([1,2,3,4]).map(lambda x:
>>> (x,x)).partitionBy(2).glom().collect()
[[(2,2), (4,4)], [(1,1), (3,3)]]

from this, we will get 2 partitions but what does it mean? how do they
reside in memory in the cluster?

I am sorry for such a simple question but I couldn't find any specific
information about what happens underneath partitioning. 

Thank you, Joe



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-a-partition-how-it-works-tp4325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: StackOverflow Error when run ALS with 100 iterations

2014-04-16 Thread Nick Pentreath
I'd also say that running for 100 iterations is a waste of resources, as
ALS will typically converge pretty quickly, as in within 10-20 iterations.


On Wed, Apr 16, 2014 at 3:54 AM, Xiaoli Li  wrote:

> Thanks a lot for your information. It really helps me.
>
>
> On Tue, Apr 15, 2014 at 7:57 PM, Cheng Lian  wrote:
>
>> Probably this JIRA 
>> issuesolves your 
>> problem. When running with large iteration number, the lineage
>> DAG of ALS becomes very deep, both DAGScheduler and Java serializer may
>> overflow because they are implemented in a recursive way. You may resort to
>> checkpointing as a workaround.
>>
>>
>> On Wed, Apr 16, 2014 at 5:29 AM, Xiaoli Li wrote:
>>
>>> Hi,
>>>
>>> I am testing ALS using 7 nodes. Each node has 4 cores and 8G memeory.
>>> ALS program cannot run  even with a very small size of training data (about
>>> 91 lines) due to StackVverFlow error when I set the number of iterations to
>>> 100. I think the problem may be caused by updateFeatures method which
>>> updates products RDD iteratively by join previous products RDD.
>>>
>>>
>>> I am writing a program which has a similar update process with ALS.
>>> This problem also appeared when I iterate too many times (more than 80).
>>>
>>> The iterative part of my code is as following:
>>>
>>> solution = outlinks.join(solution). map {
>>>  ...
>>>  }
>>>
>>>
>>> Has anyone had similar problem?  Thanks.
>>>
>>>
>>> Xiaoli
>>>
>>
>>
>