Re: Broadcast failure with variable size of ~ 500mb with key already cancelled ?

2014-11-11 Thread Davies Liu
There is a open PR [1] to support broadcast larger than 2G, could you try it?

[1] https://github.com/apache/spark/pull/2659

On Tue, Nov 11, 2014 at 6:39 AM, Tom Seddon mr.tom.sed...@gmail.com wrote:
 Hi,

 Just wondering if anyone has any advice about this issue, as I am
 experiencing the same thing.  I'm working with multiple broadcast variables
 in PySpark, most of which are small, but one of around 4.5GB, using 10
 workers at 31GB memory each and driver with same spec.  It's not running out
 of memory as far as I can see, but definitely only happens when I add the
 large broadcast.  Would be most grateful for advice.

 I tried playing around with the last 3 conf settings below, but no luck:

 SparkConf().set(spark.master.memory, 26)
 .set(spark.executor.memory, 26)
 .set(spark.worker.memory, 26)
 .set(spark.driver.memory, 26).
 .set(spark.storage.memoryFraction,1)
 .set(spark.core.connection.ack.wait.timeout,6000)
 .set(spark.akka.frameSize,50)

There is some invalid configs here, spark.master.memory and
spark.worker.memory are not valid.

spark.storage.memoryFraction is too large, then you will have not
memory for general use (such as shuffle).

The Python jobs run in separated processes, so you should leave some
memory for them in slaves. For example, if it has 8 CPUs, each Python
process will need at least 8G (for 4G broadcast plus object overhead
in Python), then you can use only 3 process in the same time, use
spark.cores.max=2 OR spark.task.cpus=4. Also you can only set
spark.executor.memory to 10G, leave 16G for Python.

Also, you could specify spark.python.worker.memory = 8G to have better
shuffle performance in Python. (it's not necessary)

So, for large broadcast, maybe you should use Scala, which uses
multiple threads, the broadcast will be shared by multiple tasks in
same executor.

 Thanks,

 Tom


 On 24 October 2014 12:31, htailor hemant.tai...@live.co.uk wrote:

 Hi All,

 I am relatively new to spark and currently having troubles with
 broadcasting
 large variables ~500mb in size. Th
 e broadcast fails with an error shown below and the memory usage on the
 hosts also blow up.

 Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb
 (workers))
 and we are using Spark 1.1 (Python) via Cloudera CDH 5.2.

 We have managed to replicate the error using a test script shown below. I
 would be interested to know if anyone has seen this before with
 broadcasting
 or know of a fix.

 === ERROR ==

 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally
 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ?
 sun.nio.ch.SelectionKeyImpl@fbc6caf
 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@fbc6caf
 java.nio.channels.CancelledKeyException
 at

 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
 at

 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ?
 sun.nio.ch.SelectionKeyImpl@3ecfdb7e
 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@3ecfdb7e
 java.nio.channels.CancelledKeyException
 at

 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
 at

 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 14/10/24 

Re: Pyspark Error when broadcast numpy array

2014-11-11 Thread Davies Liu
This PR fix the problem: https://github.com/apache/spark/pull/2659

cc @josh

Davies

On Tue, Nov 11, 2014 at 7:47 PM, bliuab bli...@cse.ust.hk wrote:
 In spark-1.0.2, I have come across an error when I try to broadcast a quite
 large numpy array(with 35M dimension). The error information except the
 java.lang.NegativeArraySizeException error and details is listed below.
 Moreover, when broadcast a relatively smaller numpy array(30M dimension),
 everything works fine. And 30M dimension numpy array takes 230M memory
 which, in my opinion, not very large.
 As far as I have surveyed, it seems related with py4j. However, I have no
 idea how to fix  this. I would be appreciated if I can get some hint.
 
 py4j.protocol.Py4JError: An error occurred while calling o23.broadcast.
 Trace:
 java.lang.NegativeArraySizeException
 at py4j.Base64.decode(Base64.java:292)
 at py4j.Protocol.getBytes(Protocol.java:167)
 at py4j.Protocol.getObject(Protocol.java:276)
 at
 py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
 at py4j.commands.CallCommand.execute(CallCommand.java:77)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 -
 And the test code is a follows:
 conf =
 SparkConf().setAppName('brodyliu_LR').setMaster('spark://10.231.131.87:5051')
 conf.set('spark.executor.memory', '4000m')
 conf.set('spark.akka.timeout', '10')
 conf.set('spark.ui.port','8081')
 conf.set('spark.cores.max','150')
 #conf.set('spark.rdd.compress', 'True')
 conf.set('spark.default.parallelism', '300')
 #configure the spark environment
 sc = SparkContext(conf=conf, batchSize=1)

 vec = np.random.rand(3500)
 a = sc.broadcast(vec)






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pyspark Error when broadcast numpy array

2014-11-11 Thread Davies Liu
Yes, your broadcast should be about 300M, much smaller than 2G, I
didn't read your post carefully.

The broadcast in Python had been improved much since 1.1, I think it
will work in 1.1 or upcoming 1.2 release, could you upgrade to 1.1?

Davies

On Tue, Nov 11, 2014 at 8:37 PM, bliuab bli...@cse.ust.hk wrote:
 Dear Liu:

 Thank you very much for your help. I will update that patch. By the way, as
 I have succeed to broadcast an array of size(30M) the log said that such
 array takes around 230MB memory. As a result, I think the numpy array that
 leads to error is much smaller than 2G.

 On Wed, Nov 12, 2014 at 12:29 PM, Davies Liu-2 [via Apache Spark User List]
 [hidden email] wrote:

 This PR fix the problem: https://github.com/apache/spark/pull/2659

 cc @josh

 Davies

 On Tue, Nov 11, 2014 at 7:47 PM, bliuab [hidden email] wrote:

  In spark-1.0.2, I have come across an error when I try to broadcast a
  quite
  large numpy array(with 35M dimension). The error information except the
  java.lang.NegativeArraySizeException error and details is listed below.
  Moreover, when broadcast a relatively smaller numpy array(30M
  dimension),
  everything works fine. And 30M dimension numpy array takes 230M memory
  which, in my opinion, not very large.
  As far as I have surveyed, it seems related with py4j. However, I have
  no
  idea how to fix  this. I would be appreciated if I can get some hint.
  
  py4j.protocol.Py4JError: An error occurred while calling o23.broadcast.
  Trace:
  java.lang.NegativeArraySizeException
  at py4j.Base64.decode(Base64.java:292)
  at py4j.Protocol.getBytes(Protocol.java:167)
  at py4j.Protocol.getObject(Protocol.java:276)
  at
  py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
  at py4j.commands.CallCommand.execute(CallCommand.java:77)
  at py4j.GatewayConnection.run(GatewayConnection.java:207)
  -
  And the test code is a follows:
  conf =
 
  SparkConf().setAppName('brodyliu_LR').setMaster('spark://10.231.131.87:5051')
  conf.set('spark.executor.memory', '4000m')
  conf.set('spark.akka.timeout', '10')
  conf.set('spark.ui.port','8081')
  conf.set('spark.cores.max','150')
  #conf.set('spark.rdd.compress', 'True')
  conf.set('spark.default.parallelism', '300')
  #configure the spark environment
  sc = SparkContext(conf=conf, batchSize=1)
 
  vec = np.random.rand(3500)
  a = sc.broadcast(vec)
 
 
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: [hidden email]
  For additional commands, e-mail: [hidden email]
 

 -
 To unsubscribe, e-mail: [hidden email]
 For additional commands, e-mail: [hidden email]



 
 If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662p18673.html
 To unsubscribe from Pyspark Error when broadcast numpy array, click here.
 NAML




 --
 My Homepage: www.cse.ust.hk/~bliuab
 MPhil student in Hong Kong University of Science and Technology.
 Clear Water Bay, Kowloon, Hong Kong.
 Profile at LinkedIn.

 
 View this message in context: Re: Pyspark Error when broadcast numpy array

 Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: error when importing HiveContext

2014-11-07 Thread Davies Liu
bin/pyspark will setup the PYTHONPATH of py4j for you, or you need to
setup it by yourself.

export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip

On Fri, Nov 7, 2014 at 8:15 AM, Pagliari, Roberto
rpagli...@appcomsci.com wrote:
 I’m getting this error when importing hive context



 from pyspark.sql import HiveContext

 Traceback (most recent call last):

   File stdin, line 1, in module

   File /path/spark-1.1.0/python/pyspark/__init__.py, line 63, in module

 from pyspark.context import SparkContext

   File /path/spark-1.1.0/python/pyspark/context.py, line 30, in module

 from pyspark.java_gateway import launch_gateway

   File /path/spark-1.1.0/python/pyspark/java_gateway.py, line 26, in
 module

 from py4j.java_gateway import java_import, JavaGateway, GatewayClient

 ImportError: No module named py4j.java_gateway



 I cannot find py4j on my system. Where is it?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark issue with sortByKey: IndexError: list index out of range

2014-11-07 Thread Davies Liu
Could you tell how large is the data set? It will help us to debug this issue.

On Thu, Nov 6, 2014 at 10:39 AM, skane sk...@websense.com wrote:
 I don't have any insight into this bug, but on Spark version 1.0.0 I ran into
 the same bug running the 'sort.py' example. On a smaller data set, it worked
 fine. On a larger data set I got this error:

 Traceback (most recent call last):
   File /home/skane/spark/examples/src/main/python/sort.py, line 30, in
 module
 .sortByKey(lambda x: x)
   File /usr/lib/spark/python/pyspark/rdd.py, line 480, in sortByKey
 bounds.append(samples[index])
 IndexError: list index out of range



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-issue-with-sortByKey-IndexError-list-index-out-of-range-tp16445p18288.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark issue with sortByKey: IndexError: list index out of range

2014-11-06 Thread Davies Liu
It should be fixed in 1.1+.

Could you have a script to reproduce it?

On Thu, Nov 6, 2014 at 10:39 AM, skane sk...@websense.com wrote:
 I don't have any insight into this bug, but on Spark version 1.0.0 I ran into
 the same bug running the 'sort.py' example. On a smaller data set, it worked
 fine. On a larger data set I got this error:

 Traceback (most recent call last):
   File /home/skane/spark/examples/src/main/python/sort.py, line 30, in
 module
 .sortByKey(lambda x: x)
   File /usr/lib/spark/python/pyspark/rdd.py, line 480, in sortByKey
 bounds.append(samples[index])
 IndexError: list index out of range



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-issue-with-sortByKey-IndexError-list-index-out-of-range-tp16445p18288.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext._lock Error

2014-11-05 Thread Davies Liu
What's the version of Python? 2.4?

Davies

On Wed, Nov 5, 2014 at 4:21 PM, Pagliari, Roberto
rpagli...@appcomsci.com wrote:
 I’m using this system



 Hadoop 1.0.4

 Scala 2.9.3

 Hive 0.9.0





 With spark 1.1.0. When importing pyspark, I’m getting this error:



 from pyspark.sql import *

 Traceback (most recent call last):

   File stdin, line 1, in ?

   File /path/spark-1.1.0/python/pyspark/__init__.py, line 63, in ?

 from pyspark.context import SparkContext

   File /path/spark-1.1.0/python/pyspark/context.py, line 209

 with SparkContext._lock:

 ^

 SyntaxError: invalid syntax



 How do I fix it?



 Thank you,

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parquet files are only 6-20MB in size?

2014-11-03 Thread Davies Liu
Befire saveAsParquetFile(), you can call coalesce(N), then you will
have N files,
it will keep the order as before (repartition() will not).


On Mon, Nov 3, 2014 at 1:16 AM, ag007 agre...@mac.com wrote:
 Thanks Akhil,

 Am I right in saying that the repartition will spread the data randomly so I
 loose chronological order?

 I really just want the csv -- parquet format in the same order it came in.
 If I set repartition with 1 will this not be random?

 cheers,
 Ag



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-files-are-only-6-20MB-in-size-tp17935p17941.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on Yarn probably trying to load all the data to RAM

2014-11-03 Thread Davies Liu
On Sun, Nov 2, 2014 at 1:35 AM,  jan.zi...@centrum.cz wrote:
 Hi,

 I am using Spark on Yarn, particularly Spark in Python. I am trying to run:

 myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json)

How many files do you have? and the average size of each file?

 myrdd.getNumPartitions()

 Unfortunately it seems that Spark tries to load everything to RAM, or at
 least after while of running this everything slows down and then I am
 getting errors with log below. Everything works fine for datasets smaller
 than RAM, but I would expect Spark doing this without storing everything to
 RAM. So I would like to ask if I'm not missing some settings in Spark on
 Yarn?


 Thank you in advance for any help.


 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
 thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
 ActorSystem [sparkDriver]

 java.lang.OutOfMemoryError: GC overhead limit exceeded

 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
 thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
 ActorSystem [sparkDriver]

 java.lang.OutOfMemoryError: GC overhead limit exceeded

 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs]

 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs]

 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs]

 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs]

 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs]

 Traceback (most recent call last):

   File stdin, line 1, in module

   File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in
 getNumPartitions

 return self._jrdd.partitions().size()

   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__

   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
 300, in get_return_value

 py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
 Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2

 : An error occurred while calling o112.partitions.

 : java.lang.OutOfMemoryError: GC overhead limit exceeded



 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs]

 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Shutting down remote daemon.

 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
 thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
 ActorSystem [sparkDriver]

 java.lang.OutOfMemoryError: GC overhead limit exceeded

 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
 thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
 ActorSystem [sparkDriver]

 java.lang.OutOfMemoryError: GC overhead limit exceeded

 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Remote daemon shut down; proceeding with flushing remote transports.

 14/11/01 22:07:09 INFO Remoting: Remoting shut down

 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Remoting shut down.

 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
 ReceivingConnection to
 ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)

 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
 to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)

 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
 to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)

 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ?
 sun.nio.ch.SelectionKeyImpl@5ca1c790

 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@5ca1c790

 java.nio.channels.CancelledKeyException

 at
 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)

 at
 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)

 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
 to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)

 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
 ReceivingConnection to
 ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)

 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding
 SendingConnection to
 ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not
 found

 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application
 already ended: FINISHED

 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
 o.e.j.s.ServletContextHandler{/metrics/json,null}

 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
 o.e.j.s.ServletContextHandler{/stages/stage/kill,null}

 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
 o.e.j.s.ServletContextHandler{/,null}

 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
 o.e.j.s.ServletContextHandler{/static,null}

 

Re: pySpark - convert log/txt files into sequenceFile

2014-10-29 Thread Davies Liu
Without the second line, it's will much faster:

 infile = sc.wholeTextFiles(sys.argv[1])
 infile.saveAsSequenceFile(sys.argv[2])


On Wed, Oct 29, 2014 at 3:31 AM, Csaba Ragany rag...@gmail.com wrote:
 Thank you Holden, it works!

 infile = sc.wholeTextFiles(sys.argv[1])
 rdd = sc.parallelize(infile.collect())
 rdd.saveAsSequenceFile(sys.argv[2])

 Csaba


 2014-10-28 17:56 GMT+01:00 Holden Karau hol...@pigscanfly.ca:

 Hi Csaba,

 It sounds like the API you are looking for is sc.wholeTextFiles :)

 Cheers,

 Holden :)


 On Tuesday, October 28, 2014, Csaba Ragany rag...@gmail.com wrote:

 Dear Spark Community,

 Is it possible to convert text files (.log or .txt files) into
 sequencefiles in Python?

 Using PySpark I can create a parallelized file with
 rdd=sc.parallelize([('key1', 1.0)]) and I can save it as a sequencefile with
 rdd.saveAsSequenceFile(). But how can I put the whole content of my text
 files into the 'value' of 'key1' ?

 I want a sequencefile where the keys are the filenames of the text files
 and the values are their content.

 Thank you for any help!
 Csaba



 --
 Cell : 425-233-8271



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sampling in spark

2014-10-28 Thread Davies Liu
_cumm = [p[0]]
for i in range(1, len(p)):
_cumm.append(_cumm[-1] + p[i])
index = set([bisect(_cumm, random.random()) for i in range(k)])

chosed_x = X.zipWithIndex().filter(lambda (v, i): i in
index).map(lambda (v, i): v)
chosed_y = [v for i, v in enumerate(y) if i in index]


On Tue, Oct 28, 2014 at 12:26 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 Oops, the reference for the above code:
 http://stackoverflow.com/questions/26583462/selecting-corresponding-k-rows-from-matrix-and-vector/26583945#26583945

 On Tue, Oct 28, 2014 at 12:26 AM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 Hi,
   I have three rdds.. X,y and p
 X is matrix rdd (mXn), y is (mX1) dimension vector
 and p is (mX1) dimension probability vector.
 Now, I am trying to sample k rows from X and corresponding entries in y
 based on probability vector p.
 Here is the python implementation

 import random
 from bisect import bisect
 from operator import itemgetter

 def sample(population, k, prob):

 def cdf(population, k, prob):
 population = map(itemgetter(1), sorted(zip(prob, population)))
 cumm = [prob[0]]
 for i in range(1, len(prob)):

 cumm.append(_cumm[-1] + prob[i])
 return [population[bisect(cumm, random.random())] for i in
 range(k)]


  return cdf(population, k, prob)



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python code crashing on ReduceByKey if I return custom class object

2014-10-27 Thread Davies Liu
This is known issue with PySpark, the class and objects of custom
class in current script can not serialized by pickle between driver
and worker

You can workaround this by put 'testing' in a module, and sending this
module to cluster by `sc.addPyFile`

Davies

On Sun, Oct 26, 2014 at 11:57 PM, sid siddardha.i...@gmail.com wrote:
 Hi , I am new to spark and I am trying to use pyspark.

 I am trying to find mean of 128 dimension vectors present in a file .

 Below is the code
 

 from cStringIO import StringIO

 class testing:
 def __str__(self):
 file_str = StringIO()
 file_str.write(str(self.first))
 file_str.write( )
 for n in self.vector:
 file_str.write(str(n))
 file_str.write( )
 file_str.write(self.filename)
 return file_str.getvalue()
 def __init__(self,txt=):
 self.vector = [0.0]*128
 if len(txt)==0:
 return
 i=0
 for n in txt.split():
 if i==0 :
 self.key = float(n)
 i = i+1
 continue
 if i128:
 self.vector[i-1]=float(n)
 i = i+1
 continue
 break
 def addVec(self,r):
 a = testing()
 for n in xrange(0,128):
 a.vector[n] = self.vector[n] + r.vector[n]
 return a

 def InitializeAndReturnPair(string):
 vec = testing(string)
 return vec.key,vec


 from pyspark import SparkConf, SparkContext
 conf = (SparkConf()
  .setMaster(local)
  .setAppName(My app)
  .set(spark.executor.memory, 1g))
 sc = SparkContext(conf = conf)

 inp = sc.textFile(input.txt)
 output = inp.map(lambda s: InitializeAndReturnPair(s)).cache()
 #Below line is throwing error
 print output.reduceByKey(lambda a,b : addVec(a,b)).collect()

 -

 I am not able to figure out where I am missing out , I tried changing the
 serializer but still getting similar error.

 Error

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties 14/10/25 05:02:39 WARN Utils:
 Your hostname, Sid resolves to a loopback address: 127.0.1.1; using
 192.168.0.15 instead (on interface wlan0) 14/10/25 05:02:39 WARN Utils: Set
 SPARK_LOCAL_IP if you need to bind to another address 14/10/25 05:02:39 INFO
 SecurityManager: Changing view acls to: sid, 14/10/25 05:02:39 INFO
 SecurityManager: Changing modify acls to: sid, 14/10/25 05:02:39 INFO
 SecurityManager: SecurityManager: authentication disabled; ui acls disabled;
 users with view permissions: Set(sid, ); users with modify permissions:
 Set(sid, ) 14/10/25 05:02:40 INFO Slf4jLogger: Slf4jLogger started 14/10/25
 05:02:40 INFO Remoting: Starting remoting 14/10/25 05:02:40 INFO Remoting:
 Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@192.168.0.15:52124] 14/10/25 05:02:40 INFO
 Remoting: Remoting now listens on addresses:
 [akka.tcp://sparkDriver@192.168.0.15:52124] 14/10/25 05:02:40 INFO Utils:
 Successfully started service 'sparkDriver' on port 52124. 14/10/25 05:02:40
 INFO SparkEnv: Registering MapOutputTracker 14/10/25 05:02:40 INFO SparkEnv:
 Registering BlockManagerMaster 14/10/25 05:02:40 INFO DiskBlockManager:
 Created local directory at /tmp/spark-local-20141025050240-56c5 14/10/25
 05:02:40 INFO Utils: Successfully started service 'Connection manager for
 block manager' on port 51705. 14/10/25 05:02:40 INFO ConnectionManager:
 Bound socket to port 51705 with id = ConnectionManagerId(192.168.0.15,51705)
 14/10/25 05:02:40 INFO MemoryStore: MemoryStore started with capacity 265.4
 MB 14/10/25 05:02:40 INFO BlockManagerMaster: Trying to register
 BlockManager 14/10/25 05:02:40 INFO BlockManagerMasterActor: Registering
 block manager 192.168.0.15:51705 with 265.4 MB RAM 14/10/25 05:02:40 INFO
 BlockManagerMaster: Registered BlockManager 14/10/25 05:02:40 INFO
 HttpFileServer: HTTP File server directory is
 /tmp/spark-ce172226-4732-4633-aa45-1cbd45b7ec98 14/10/25 05:02:40 INFO
 HttpServer: Starting HTTP Server 14/10/25 05:02:40 INFO Utils: Successfully
 started service 'HTTP file server' on port 55111. 14/10/25 05:02:40 INFO
 Utils: Successfully started service 'SparkUI' on port 4040. 14/10/25
 05:02:40 INFO SparkUI: Started SparkUI at http ://192.168.0.15:4040 14/10/25
 05:02:41 WARN NativeCodeLoader: Unable to load native-hadoop library for
 your platform... using builtin-java classes where applicable 14/10/25
 05:02:41 INFO Utils: Copying /home/sid/Downloads/spark/pdsWork/AskHelp.py to
 /tmp/spark-80005f48-e41d-48ff-b249-bdf87130de5d/AskHelp.py 14/10/25 05:02:41
 INFO SparkContext: Added file
 file:/home/sid/Downloads/spark/pdsWork/AskHelp.py at http
 ://192.168.0.15:55111/files/AskHelp.py with timestamp 1414227761547 14/10/25
 05:02:41 INFO AkkaUtils: Connecting to 

Re: spark is running extremely slow with larger data set, like 2G

2014-10-24 Thread Davies Liu
On Thu, Oct 23, 2014 at 3:14 PM, xuhongnever xuhongne...@gmail.com wrote:
 my code is here:

 from pyspark import SparkConf, SparkContext

 def Undirect(edge):
 vector = edge.strip().split('\t')
 if(vector[0].isdigit()):
 return [(vector[0], vector[1])]
 return []


 conf = SparkConf()
 conf.setMaster(spark://compute-0-14:7077)
 conf.setAppName(adjacencylist)
 conf.set(spark.executor.memory, 1g)

Use more memory to gain better performance, or spark will keep
spilling the data into disks, that is much slower.
You also could give more memory to Python worker by set
spark.python.worker.memory=1g or 2g

 sc = SparkContext(conf = conf)

 file = sc.textFile(file:///home/xzhang/data/soc-LiveJournal1.txt)
 records = file.flatMap(lambda line: Undirect(line)).reduceByKey(lambda a, b:
 a + \t + b )

a + \t + b will be very slow, if the number of values is large,
groupByKey() will be better than it.

 #print(records.count())
 #records = records.sortByKey()
 records = records.map(lambda line: line[0] + \t + line[1])
 records.saveAsTextFile(file:///home/xzhang/data/result)



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152p17153.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark is running extremely slow with larger data set, like 2G

2014-10-24 Thread Davies Liu
On Fri, Oct 24, 2014 at 1:37 PM, xuhongnever xuhongne...@gmail.com wrote:
 Thank you very much.
 Changing to groupByKey works, it runs much more faster.

 By the way, could you give me some explanation of the following
 configurations, after reading the official explanation, i'm still confused,
 what's the relationship between them? is there any memory overlap between
 them?

 *spark.python.worker.memory
 spark.executor.memory
 spark.driver.memory*

spark.driver.memory is used for JVM together with you local python
scripts (called driver),
spark.executor.memory is used for JVM in spark cluster (called slave
or executor),

In local mode, driver and executor share the same JVM, so
spark.driver.memory is used.

spark.python.worker.memory is used for Python worker in executor.
Because of GIL,
pyspark use multiple python process in the executor, one for each task.
spark.python.worker.memory will tell the python worker to when to
spill the data into disk.
It's not hard limit, so the memory used in python worker maybe is
little higher than it.
If you have enough memory in executor, increase spark.python.worker.memory will
let python worker to use more memory during shuffle (like groupBy()),
which will increase
the performance.

 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-tp17152p17231.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python vs Scala performance

2014-10-22 Thread Davies Liu
In the master, you can easily profile you job, find the bottlenecks,
see https://github.com/apache/spark/pull/2556

Could you try it and show the stats?

Davies

On Wed, Oct 22, 2014 at 7:51 AM, Marius Soutier mps@gmail.com wrote:
 It’s an AWS cluster that is rather small at the moment, 4 worker nodes @ 28
 GB RAM and 4 cores, but fast enough for the currently 40 Gigs a day. Data is
 on HDFS in EBS volumes. Each file is a Gzip-compress collection of JSON
 objects, each one between 115-120 MB to be near the HDFS block size.

 One core per worker is permanently used by a job that allows SQL queries
 over Parquet files.

 On 22.10.2014, at 16:18, Arian Pasquali ar...@arianpasquali.com wrote:

 Interesting thread Marius,
 Btw, I'm curious about your cluster size.
 How small it is in terms of ram and cores.

 Arian

 2014-10-22 13:17 GMT+01:00 Nicholas Chammas nicholas.cham...@gmail.com:

 Total guess without knowing anything about your code: Do either of these
 two notes from the 1.1.0 release notes affect things at all?

 PySpark now performs external spilling during aggregations. Old behavior
 can be restored by setting spark.shuffle.spill to false.
 PySpark uses a new heuristic for determining the parallelism of shuffle
 operations. Old behavior can be restored by setting
 spark.default.parallelism to the number of cores in the cluster.

 Nick


 On Wed, Oct 22, 2014 at 7:29 AM, Marius Soutier mps@gmail.com wrote:

 We’re using 1.1.0. Yes I expected Scala to be maybe twice as fast, but
 not that...

 On 22.10.2014, at 13:02, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 What version of Spark are you running? Some recent changes to how PySpark
 works relative to Scala Spark may explain things.

 PySpark should not be that much slower, not by a stretch.

 On Wed, Oct 22, 2014 at 6:11 AM, Ashic Mahtab as...@live.com wrote:

 I'm no expert, but looked into how the python bits work a while back
 (was trying to assess what it would take to add F# support). It seems 
 python
 hosts a jvm inside of it, and talks to scala spark in that jvm. The 
 python
 server bit translates the python calls to those in the jvm. The python
 spark context is like an adapter to the jvm spark context. If you're seeing
 performance discrepancies, this might be the reason why. If the code can be
 organised to require fewer interactions with the adapter, that may improve
 things. Take this with a pinch of salt...I might be way off on this :)

 Cheers,
 Ashic.

  From: mps@gmail.com
  Subject: Python vs Scala performance
  Date: Wed, 22 Oct 2014 12:00:41 +0200
  To: user@spark.apache.org

 
  Hi there,
 
  we have a small Spark cluster running and are processing around 40 GB
  of Gzip-compressed JSON data per day. I have written a couple of word
  count-like Scala jobs that essentially pull in all the data, do some 
  joins,
  group bys and aggregations. A job takes around 40 minutes to complete.
 
  Now one of the data scientists on the team wants to do write some jobs
  using Python. To learn Spark, he rewrote one of my Scala jobs in Python.
  From the API-side, everything looks more or less identical. However his 
  jobs
  take between 5-8 hours to complete! We can also see that the execution 
  plan
  is quite different, I’m seeing writes to the output much later than in
  Scala.
 
  Is Python I/O really that slow?
 
 
  Thanks
  - Marius
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python vs Scala performance

2014-10-22 Thread Davies Liu
Sorry, there is not, you can try clone from github and build it from
scratch, see [1]

[1] https://github.com/apache/spark

Davies

On Wed, Oct 22, 2014 at 2:31 PM, Marius Soutier mps@gmail.com wrote:
 Can’t install that on our cluster, but I can try locally. Is there a 
 pre-built binary available?

 On 22.10.2014, at 19:01, Davies Liu dav...@databricks.com wrote:

 In the master, you can easily profile you job, find the bottlenecks,
 see https://github.com/apache/spark/pull/2556

 Could you try it and show the stats?

 Davies


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to aggregate data in Apach Spark

2014-10-20 Thread Davies Liu
You also could use Spark SQL:

from pyspark.sql import Row, SQLContext
row = Row('id', 'C1',  'C2', 'C3')
# convert each
data = sc.textFile(test.csv).map(lambda line: line.split(','))
sqlContext = SQLContext(sc)
rows = data.map(lambda r: row(*r))
sqlContext.inferSchema(rows).registerTempTable(data)
result = sqlContext.sql(select id, C1, C2, sum(C3) from data group by
id, C1, C2) # is SchemaRDD


On Mon, Oct 20, 2014 at 2:52 AM, Gen gen.tan...@gmail.com wrote:
 Hi,

 I will write the code in python

 {code:title=test.py}
 data = sc.textFile(...).map(...) ## Please make sure that the rdd is
 like[[id, c1, c2, c3], [id, c1, c2, c3],...]
 keypair = data.map(lambda l: ((l[0],l[1],l[2]), float(l[3])))
 keypair = keypair.reduceByKey(add)
 out = keypair.map(lambda l: list(l[0]) + [l[1]])
 {code}


 Kalyan wrote
 I have a distribute system on 3 nodes and my dataset is distributed among
 those nodes. for example, I have a test.csv file which is exist on all 3
 nodes and it contains 4 columns of

 **row | id,  C1, C2,  C3
 --
 row1  | A1 , c1 , c2 ,2
 row2  | A1 , c1 , c2 ,1
 row3  | A1 , c11, c2 ,1
 row4  | A2 , c1 , c2 ,1
 row5  | A2 , c1 , c2 ,1
 row6  | A2 , c11, c2 ,1
 row7  | A2 , c11, c21,1
 row8  | A3 , c1 , c2 ,1
 row9  | A3 , c1 , c2 ,2
 row10 | A4 , c1 , c2 ,1

 I need help, how to aggregate data set by id, c1,c2,c3 columns and output
 like this

 **row | id,  C1, C2,  C3
 --
 row1  | A1 , c1 , c2 ,3
 row2  | A1 , c11, c2 ,1
 row3  | A2 , c1 , c2 ,2
 row4  | A2 , c11, c2 ,1
 row5  | A2 , c11, c21,1
 row6  | A3 , c1 , c2 ,3
 row7  | A4 , c1 , c2 ,1

 Thanks
 Kalyan





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-aggregate-data-in-Apach-Spark-tp16764p16803.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to disable input split

2014-10-18 Thread Davies Liu
You can call coalesce() to merge the small splits into bigger ones.

Davies

On Fri, Oct 17, 2014 at 5:35 PM, Larry Liu larryli...@gmail.com wrote:
 Is it possible to disable input split if input is already small?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark speed performance

2014-10-18 Thread Davies Liu
How many CPUs on the slave?

Because the overhead between JVM and Python, single task will be
slower than your local Python scripts, but it's very easy to scale to
many CPUs.

Even one CPUs, it's not common that PySpark was 100 times slower. You
have many small files, each file will be processed by a task, which
will have about 100ms overhead (scheduled and executed), but the small
file can be processed in your single thread Python script in less than
1ms.

You could pack your json files into larger ones, or you could try to
merge the small tasks into larger one by coalesce(N), such as:

distData = sc.textFile(sys.argv[2]).coalesce(10)  # which will have 10
partitons (tasks)

Davies

On Sat, Oct 18, 2014 at 12:07 PM,  jan.zi...@centrum.cz wrote:
 Hi,

 I have program that I have for single computer (in Python) exection and also
 implemented the same for Spark. This program basically only reads .json from
 which it takes one field and saves it back. Using Spark my program runs
 aproximately 100 times slower on 1 master and 1 slave. So I would like to
 ask where possibly might be the problem?

 My Spark program looks like:



 sc = SparkContext(appName=Json data preprocessor)

 distData = sc.textFile(sys.argv[2])

 json_extractor = JsonExtractor(sys.argv[1])

 cleanedData = distData.flatMap(json_extractor.extract_json)

 cleanedData.saveAsTextFile(sys.argv[3])

 JsonExtractor only selects the data from field that is given by sys.argv[1].



 My data are basically many small one json files, where is one json per line.

 I have tried both, reading and writing the data from/to Amazon S3, local
 disc on all the machines.

 I would like to ask if there is something that I am missing or if Spark is
 supposed to be so slow in comparison with the local non parallelized single
 node program.



 Thank you in advance for any suggestions or hints.



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark joins fail - please help

2014-10-17 Thread Davies Liu
Hey Russell,

join() can only work with RDD of pairs (key, value), such as

rdd1:  (k, v1)
rdd2: (k, v2)

rdd1.join(rdd2) will be  (k1, v1, v2)

Spark SQL will be more useful for you, see
http://spark.apache.org/docs/1.1.0/sql-programming-guide.html

Davies


On Fri, Oct 17, 2014 at 5:01 PM, Russell Jurney russell.jur...@gmail.com
wrote:

 https://gist.github.com/rjurney/fd5c0110fe7eb686afc9

 Any way I try to join my data fails. I can't figure out what I'm doing
 wrong.

 --
 Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome.
 com
 ᐧ



Re: PySpark Error on Windows with sc.wholeTextFiles

2014-10-16 Thread Davies Liu
It's a bug, could you file a JIRA for this? Thanks!

Davies

On Thu, Oct 16, 2014 at 8:28 AM, Griffiths, Michael (NYC-RPM)
michael.griffi...@reprisemedia.com wrote:
 Hi,



 I’m running into an error on Windows (x64, 8.1) running Spark 1.1.0
 (pre-builet for Hadoop 2.4: spark-1.1.0-bin-hadoop2.4.tgz) with Java SE
 Version 8 Update 20 (build 1.8.0_20-b26); just getting started with Spark.



 When running sc.wholeTextFiles() on a directory, I can run the command but
 not do anything with the resulting RDD – specifically, I get an error in
 py4j.protocol.Py4JJavaError; the error is unspecified, though the location
 is included. I’ve attached the traceback below.



 In this situation, I’m trying to load all files from a folder on the local
 filesystem, located at D:\testdata. The folder contains one file, which can
 be loaded successfully with sc.textFile(“d:/testdata/filename”) – no
 problems at all – so I do not believe the file is throwing the error.



 Is there any advice on what I should look at further to isolate or fix the
 error? Am I doing something obviously wrong?



 Thanks,

 Michael





 Welcome to

     __

  / __/__  ___ _/ /__

 _\ \/ _ \/ _ `/ __/  '_/

/__ / .__/\_,_/_/ /_/\_\   version 1.1.0

   /_/



 Using Python version 2.7.7 (default, Jun 11 2014 10:40:02)

 SparkContext available as sc.

 file =
 sc.textFile(d:/testdata/cbcc5b470ec06f212990c68c8f76e887b884)

 file.count()

 732

 file.first()

 u'!DOCTYPE html'

 data = sc.wholeTextFiles('d:/testdata')

 data.first()

 Traceback (most recent call last):

   File stdin, line 1, in module

   File D:\spark\python\pyspark\rdd.py, line 1167, in first

 return self.take(1)[0]

   File D:\spark\python\pyspark\rdd.py, line 1126, in take

 totalParts = self._jrdd.partitions().size()

   File D:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py, line
 538, in __call__

   File D:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py, line
 300, in get_return_value

 py4j.protocol.Py4JJavaError: An error occurred while calling o21.partitions.

 : java.lang.NullPointerException

 at java.lang.ProcessBuilder.start(Unknown Source)

 at org.apache.hadoop.util.Shell.runCommand(Shell.java:445)

 at org.apache.hadoop.util.Shell.run(Shell.java:418)

 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)

 at org.apache.hadoop.util.Shell.execCommand(Shell.java:739)

 at org.apache.hadoop.util.Shell.execCommand(Shell.java:722)

 at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)

 at
 org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:559)

 at
 org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:534)

 at
 org.apache.hadoop.fs.LocatedFileStatus.init(LocatedFileStatus.java:42)

at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1697)

 at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1679)

 at
 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:302)

 at
 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:263)

 at
 org.apache.spark.input.WholeTextFileInputFormat.setMaxSplitSize(WholeTextFileInputFormat.scala:54)

 at
 org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:219)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)

 at
 org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50)

 at
 org.apache.spark.api.java.JavaPairRDD.partitions(JavaPairRDD.scala:44)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

 at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

 at java.lang.reflect.Method.invoke(Unknown Source)

 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

 at py4j.Gateway.invoke(Gateway.java:259)

 at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

 at py4j.commands.CallCommand.execute(CallCommand.java:79)

 at py4j.GatewayConnection.run(GatewayConnection.java:207)

 at java.lang.Thread.run(Unknown Source)



 data.count()

 Traceback (most recent call last):

   File stdin, line 1, in module

   File D:\spark\python\pyspark\rdd.py, line 847, in count

 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()

   File 

Re: ALS implicit error pyspark

2014-10-16 Thread Davies Liu
It seems a bug, Could you create a JIRA for it? thanks!

Davies

On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote:
 I tried the same data with scala. It works pretty well.
 It seems that it is the problem of pyspark.
 In the console, it shows the following logs:

 Traceback (most recent call last):
   File stdin, line 1, in module
 *  File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in
 trainImplicit
 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage
 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed
 intentionally)
 ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)*
   File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
 300, in get_return_value
 py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager:
 Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal):
 TaskKilled (killed intentionally)
 : An error occurred while calling o32.trainImplicitALSModel.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6
 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage
 975.0 (TID 1651, ip-172-31-35-237.ec2.internal):
 com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException:
 scala.collection.mutable.HashSet
 Serialization trace:
 shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)

 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
 at
 

Re: ALS implicit error pyspark

2014-10-16 Thread Davies Liu
On Thu, Oct 16, 2014 at 9:53 AM, Gen gen.tan...@gmail.com wrote:
 Hi,

 I am trying to use ALS.trainImplicit method in the
 pyspark.mllib.recommendation. However it didn't work. So I tried use the
 example in the python API documentation such as:

 /r1 = (1, 1, 1.0)
 r2 = (1, 2, 2.0)
 r3 = (2, 1, 2.0)
 ratings = sc.parallelize([r1, r2, r3])
 model = ALS.trainImplicit(ratings, 1) /

 It didn't work neither. After searching in google, I found that there are
 only two overloads for ALS.trainImplicit in the scala script. So I tried
 /model = ALS.trainImplicit(ratings, 1, 1)/, it worked. But if I set the
 iterations other than 1,  /model = ALS.trainImplicit(ratings, 1, 2)/ or
 /model = ALS.trainImplicit(ratings, 4, 2)/ for example, it generated error.
 The information is as follows:

The Python API has default values for all other arguments, so you should
call with only rank=1 (no default iterations in Scala).

I'm curious that how can you meet this problem?

 count at ALS.scala:314

 Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times, most
 recent failure: Lost task 6.3 in stage 189.0 (TID 626,
 ip-172-31-35-239.ec2.internal): com.esotericsoftware.kryo.KryoException:
 java.lang.ArrayStoreException: scala.collection.mutable.HashSet
 Serialization trace:
 shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)

 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:

 It is really strange, because count at ALS.scala:314 is already out the loop
 of iterations. Any idea?
 Thanks a lot for advance.

 FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the cases.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: ALS implicit error pyspark

2014-10-16 Thread Davies Liu
Could you post the code that have problem with pyspark? thanks!

Davies

On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote:
 I tried the same data with scala. It works pretty well.
 It seems that it is the problem of pyspark.
 In the console, it shows the following logs:

 Traceback (most recent call last):
   File stdin, line 1, in module
 *  File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in
 trainImplicit
 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage
 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed
 intentionally)
 ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)*
   File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
 300, in get_return_value
 py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager:
 Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal):
 TaskKilled (killed intentionally)
 : An error occurred while calling o32.trainImplicitALSModel.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6
 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage
 975.0 (TID 1651, ip-172-31-35-237.ec2.internal):
 com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException:
 scala.collection.mutable.HashSet
 Serialization trace:
 shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)

 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
 at
 

Re: ALS implicit error pyspark

2014-10-16 Thread Davies Liu
I can run the following code against Spark 1.1

sc = SparkContext()
r1 = (1, 1, 1.0)
r2 = (1, 2, 2.0)
r3 = (2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
model = ALS.trainImplicit(ratings, 1)

Davies

On Thu, Oct 16, 2014 at 2:45 PM, Davies Liu dav...@databricks.com wrote:
 Could you post the code that have problem with pyspark? thanks!

 Davies

 On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote:
 I tried the same data with scala. It works pretty well.
 It seems that it is the problem of pyspark.
 In the console, it shows the following logs:

 Traceback (most recent call last):
   File stdin, line 1, in module
 *  File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in
 trainImplicit
 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage
 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed
 intentionally)
 ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)*
   File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
 300, in get_return_value
 py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager:
 Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal):
 TaskKilled (killed intentionally)
 : An error occurred while calling o32.trainImplicitALSModel.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6
 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage
 975.0 (TID 1651, ip-172-31-35-237.ec2.internal):
 com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException:
 scala.collection.mutable.HashSet
 Serialization trace:
 shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)

 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173

Re: pyspark - extract 1 field from string

2014-10-14 Thread Davies Liu
rdd.map(lambda line: int(line.split(',')[3]))

On Tue, Oct 14, 2014 at 6:58 PM, Chop thomrog...@att.net wrote:
 I'm stumped with how to take 1 RDD that has lines like:

  4,01012009,00:00,1289,4
  5,01012009,00:00,1326,4
  6,01012009,00:00,1497,7

 and produce a new RDD with just the 4th field from each line (1289, 1326,
 1497)

 I don't want to apply a conditional, I just want to grab that one field from
 each line in the existing RDD

 TIA






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-extract-1-field-from-string-tp16456.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: where are my python lambda functions run in yarn-client mode?

2014-10-11 Thread Davies Liu
Created JIRA for this: https://issues.apache.org/jira/browse/SPARK-3915

On Sat, Oct 11, 2014 at 12:40 PM, Evan Samanas evan.sama...@gmail.com wrote:
 It's true that it is an implementation detail, but it's a very important one
 to document because it has the possibility of changing results depending on
 when I use take or collect.  The issue I was running in to was when the
 executor had a different operating system than the driver, and I was using
 'pipe' with a binary I compiled myself.  I needed to make sure I used the
 binary compiled for the operating system I expect it to run on.  So in cases
 where I was only interested in the first value, my code was breaking
 horribly on 1.0.2, but working fine on 1.1.

 My only suggestion would be to backport 'spark.localExecution.enabled' to
 the 1.0 line.  Thanks for all your help!

 Evan

 On Fri, Oct 10, 2014 at 10:40 PM, Davies Liu dav...@databricks.com wrote:

 This is some kind of implementation details, so not documented :-(

 If you think this is a blocker for you, you could create a JIRA, maybe
 it's could be fixed in 1.0.3+.

 Davies

 On Fri, Oct 10, 2014 at 5:11 PM, Evan evan.sama...@gmail.com wrote:
  Thank you!  I was looking for a config variable to that end, but I was
  looking in Spark 1.0.2 documentation, since that was the version I had
  the
  problem with.  Is this behavior documented in 1.0.2's documentation?
 
  Evan
 
  On 10/09/2014 04:12 PM, Davies Liu wrote:
 
  When you call rdd.take() or rdd.first(), it may[1] executor the job
  locally (in driver),
  otherwise, all the jobs are executed in cluster.
 
  There is config called `spark.localExecution.enabled` (since 1.1+) to
  change this,
  it's not enabled by default, so all the functions will be executed in
  cluster.
  If you change set this to `true`, then you get the same behavior as
  1.0.
 
  [1] If it did not get enough items from the first partitions, it will
  try multiple partitions
  in a time, so they will be executed in cluster.
 
  On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com
  wrote:
 
  Hi,
 
  I am using pyspark and I'm trying to support both Spark 1.0.2 and
  1.1.0
  with
  my app, which will run in yarn-client mode.  However, it appears when
  I
  use
  'map' to run a python lambda function over an RDD, they appear to be
  run
  on
  different machines, and this is causing problems.
 
  In both cases, I am using a Hadoop cluster that runs linux on all of
  its
  nodes.  I am submitting my jobs with a machine running Mac OS X 10.9.
  As
  a
  reproducer, here is my script:
 
  import platform
  print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0]
 
  The answer in Spark 1.1.0:
  'Linux'
 
  The answer in Spark 1.0.2:
  'Darwin'
 
  In other experiments I changed the size of the list that gets
  parallelized,
  thinking maybe 1.0.2 just runs jobs on the driver node if they're
  small
  enough.  I got the same answer (with only 1 million numbers).
 
  This is a troubling difference.  I would expect all functions run on
  an
  RDD
  to be executed on my worker nodes in the Hadoop cluster, but this is
  clearly
  not the case for 1.0.2.  Why does this difference exist?  How can I
  accurately detect which jobs will run where?
 
  Thank you,
 
  Evan
 
 
 
 
  --
  View this message in context:
 
  http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html
  Sent from the Apache Spark User List mailing list archive at
  Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help with using combineByKey

2014-10-10 Thread Davies Liu
Maybe this version is easier to use:

plist.mapValues((v) = (if (v 0) 1 else 0, 1)).reduceByKey((x, y) =
(x._1 + y._1, x._2 + y._2))

It has similar behavior with combineByKey(), will by faster than
groupByKey() version.

On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA
aharipriy...@gmail.com wrote:
 Sean,

 Thank you. It works. But I am still confused about the function. Can you
 kindly throw some light on it?
 I was going through the example mentioned in
 https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

 Is there any better source through which I can learn more about these
 functions? It would be helpful if I can get a chance to look at more
 examples.
 Also, I assume using combineByKey helps us solve it parallel than using
 simple functions provided by scala as mentioned by Yana. Am I correct?

 On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen so...@cloudera.com wrote:

 Oh duh, sorry. The initialization should of course be (v) = (if (v 
 0) 1 else 0, 1)
 This gives the answer you are looking for. I don't see what Part2 is
 supposed to do differently.

 On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
 aharipriy...@gmail.com wrote:
  Hello Sean,
 
  Thank you, but changing from v to 1 doesn't help me either.
 
  I am trying to count the number of non-zero values using the first
  accumulator.
  val newlist = List ((LAX,6), (LAX,0), (LAX,7), (SFO,0),
  (SFO,0),
  (SFO,9))
 
  val plist = sc.parallelize(newlist)
 
   val part1 = plist.combineByKey(
 (v) = (1, 1),
 (acc: (Int, Int), v) = ( if(v  0) acc._1 + 1 else acc._1, acc._2 +
  1),
 (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 +
  acc2._2)
 )
 
 val Part2 = part1.map{ case (key, value) = (key,
  (value._1,value._2)) }
 
  This should give me the result
  (LAX,(2,3))
  (SFO,(1,3))
 
 
 
  On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen so...@cloudera.com wrote:
 
  You have a typo in your code at var acc:, and the map from opPart1
  to opPart2 looks like a no-op, but those aren't the problem I think.
  It sounds like you intend the first element of each pair to be a count
  of nonzero values, but you initialize the first element of the pair to
  v, not 1, in v = (v,1). Try v = (1,1)
 
 
  On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
  aharipriy...@gmail.com wrote:
  
   I am a beginner to Spark and finding it difficult to implement a very
   simple
   reduce operation. I read that is ideal to use combineByKey for
   complex
   reduce operations.
  
   My input:
  
   val input = sc.parallelize(List((LAX,6), (LAX,8), (LAX,7),
   (SFO,0),
   (SFO,1),
   (SFO,9),(PHX,65),(PHX,88),(KX,7),(KX,6),(KX,1),
   (KX,9),
  
  
   (HOU,56),(HOU,5),(HOU,59),(HOU,0),(MA,563),(MA,545),(MA,5),(MA,0),(MA,0)))
  
  
val opPart1 = input.combineByKey(
  (v) = (v, 1),
  (var acc: (Int, Int), v) = ( if(v  0) acc._1 + 1 else acc._1,
   acc._2 +
   1),
  (acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1,
   acc1._2 +
   acc2._2)
  )
  
  val opPart2 = opPart1.map{ case (key, value) = (key,
   (value._1,value._2)) }
  
opPart2.collectAsMap().map(println(_))
  
   If the value is greater than 0, the first accumulator should be
   incremented
   by 1, else it remains the same. The second accumulator is a simple
   counter
   for each value.  I am getting an incorrect output (garbage values
   )for
   the
   first accumulator. Please help.
  
The equivalent reduce operation in Hadoop MapReduce is :
  
   public static class PercentageCalcReducer extends
   ReducerText,IntWritable,Text,FloatWritable
  
   {
  
   private FloatWritable pdelay = new FloatWritable();
  
  
   public void reduce(Text key, IterableIntWritable values,Context
   context)throws IOException,InterruptedException
  
   {
  
   int acc2=0;
  
   float frac_delay, percentage_delay;
  
   int acc1=0;
  
   for(IntWritable val : values)
  
   {
  
   if(val.get()  0)
  
   {
  
   acc1++;
  
   }
  
   acc2++;
  
   }
  
  
  
   frac_delay = (float)acc1/acc2;
  
   percentage_delay = frac_delay * 100 ;
  
   pdelay.set(percentage_delay);
  
   context.write(key,pdelay);
  
   }
  
   }
  
  
   Please help. Thank you for your time.
  
   --
  
   Regards,
  
   Haripriya Ayyalasomayajula
   contact : 650-796-7112
 
 
 
 
  --
  Regards,
  Haripriya Ayyalasomayajula
  contact : 650-796-7112




 --
 Regards,
 Haripriya Ayyalasomayajula
 contact : 650-796-7112

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.io.IOException Error in task deserialization

2014-10-10 Thread Davies Liu
Maybe, TorrentBroadcast is more complicated than HttpBroadcast, could
you tell us
how to reproduce this issue? That will help us a lot to improve
TorrentBroadcast.

Thanks!

On Fri, Oct 10, 2014 at 8:46 AM, Sung Hwan Chung
coded...@cs.stanford.edu wrote:
 I haven't seen this at all since switching to HttpBroadcast. It seems
 TorrentBroadcast might have some issues?

 On Thu, Oct 9, 2014 at 4:28 PM, Sung Hwan Chung coded...@cs.stanford.edu
 wrote:

 I don't think that I saw any other error message. This is all I saw.

 I'm currently experimenting to see if this can be alleviated by using
 HttpBroadcastFactory instead of TorrentBroadcast. So far, with
 HttpBroadcast, I haven't seen this recurring as of yet. I'll keep you
 posted.

 On Thu, Oct 9, 2014 at 4:21 PM, Davies Liu dav...@databricks.com wrote:

 This exception should be caused by another one, could you paste all of
 them here?

 Also, that will be great if you can provide a script to reproduce this
 problem.

 Thanks!

 On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:
  Has anyone else seen this erorr in task deserialization?  The task is
  processing a small amount of data and doesn't seem to have much data
  hanging
  to the closure?  I've only seen this with Spark 1.1
 
  Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times,
  most
  recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com):
  java.io.IOException: unexpected exception type
 
 
  java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
 
  java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 
 
  org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 
 
  org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 
 
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:744)

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: where are my python lambda functions run in yarn-client mode?

2014-10-10 Thread Davies Liu
This is some kind of implementation details, so not documented :-(

If you think this is a blocker for you, you could create a JIRA, maybe
it's could be fixed in 1.0.3+.

Davies

On Fri, Oct 10, 2014 at 5:11 PM, Evan evan.sama...@gmail.com wrote:
 Thank you!  I was looking for a config variable to that end, but I was
 looking in Spark 1.0.2 documentation, since that was the version I had the
 problem with.  Is this behavior documented in 1.0.2's documentation?

 Evan

 On 10/09/2014 04:12 PM, Davies Liu wrote:

 When you call rdd.take() or rdd.first(), it may[1] executor the job
 locally (in driver),
 otherwise, all the jobs are executed in cluster.

 There is config called `spark.localExecution.enabled` (since 1.1+) to
 change this,
 it's not enabled by default, so all the functions will be executed in
 cluster.
 If you change set this to `true`, then you get the same behavior as 1.0.

 [1] If it did not get enough items from the first partitions, it will
 try multiple partitions
 in a time, so they will be executed in cluster.

 On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote:

 Hi,

 I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0
 with
 my app, which will run in yarn-client mode.  However, it appears when I
 use
 'map' to run a python lambda function over an RDD, they appear to be run
 on
 different machines, and this is causing problems.

 In both cases, I am using a Hadoop cluster that runs linux on all of its
 nodes.  I am submitting my jobs with a machine running Mac OS X 10.9.  As
 a
 reproducer, here is my script:

 import platform
 print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0]

 The answer in Spark 1.1.0:
 'Linux'

 The answer in Spark 1.0.2:
 'Darwin'

 In other experiments I changed the size of the list that gets
 parallelized,
 thinking maybe 1.0.2 just runs jobs on the driver node if they're small
 enough.  I got the same answer (with only 1 million numbers).

 This is a troubling difference.  I would expect all functions run on an
 RDD
 to be executed on my worker nodes in the Hadoop cluster, but this is
 clearly
 not the case for 1.0.2.  Why does this difference exist?  How can I
 accurately detect which jobs will run where?

 Thank you,

 Evan




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GroupBy Key and then sort values with the group

2014-10-09 Thread Davies Liu
There is a new API called repartitionAndSortWithinPartitions() in
master, it may help in this case,
then you should do the `groupBy()` by yourself.

On Wed, Oct 8, 2014 at 4:03 PM, chinchu chinchu@gmail.com wrote:
 Sean,

 I am having a similar issue, but I have a lot of data for a group  I cannot
 materialize the iterable into a List or Seq in memory. [I tried  it runs
 into OOM]. is there any other way to do this ?

 I also tried a secondary-sort, with the key having the group::time, but
 the problem with that is the same group-name ends up in multiple partitions
  I am having to run sortByKey with one partition - sortByKey(true, 1) which
 shuffles a lot of data..

 Thanks,
 -C



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/GroupBy-Key-and-then-sort-values-with-the-group-tp14455p15990.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: where are my python lambda functions run in yarn-client mode?

2014-10-09 Thread Davies Liu
When you call rdd.take() or rdd.first(), it may[1] executor the job
locally (in driver),
otherwise, all the jobs are executed in cluster.

There is config called `spark.localExecution.enabled` (since 1.1+) to
change this,
it's not enabled by default, so all the functions will be executed in cluster.
If you change set this to `true`, then you get the same behavior as 1.0.

[1] If it did not get enough items from the first partitions, it will
try multiple partitions
in a time, so they will be executed in cluster.

On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote:
 Hi,

 I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0 with
 my app, which will run in yarn-client mode.  However, it appears when I use
 'map' to run a python lambda function over an RDD, they appear to be run on
 different machines, and this is causing problems.

 In both cases, I am using a Hadoop cluster that runs linux on all of its
 nodes.  I am submitting my jobs with a machine running Mac OS X 10.9.  As a
 reproducer, here is my script:

 import platform
 print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0]

 The answer in Spark 1.1.0:
 'Linux'

 The answer in Spark 1.0.2:
 'Darwin'

 In other experiments I changed the size of the list that gets parallelized,
 thinking maybe 1.0.2 just runs jobs on the driver node if they're small
 enough.  I got the same answer (with only 1 million numbers).

 This is a troubling difference.  I would expect all functions run on an RDD
 to be executed on my worker nodes in the Hadoop cluster, but this is clearly
 not the case for 1.0.2.  Why does this difference exist?  How can I
 accurately detect which jobs will run where?

 Thank you,

 Evan




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.io.IOException Error in task deserialization

2014-10-09 Thread Davies Liu
This exception should be caused by another one, could you paste all of
them here?

Also, that will be great if you can provide a script to reproduce this problem.

Thanks!

On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:
 Has anyone else seen this erorr in task deserialization?  The task is
 processing a small amount of data and doesn't seem to have much data hanging
 to the closure?  I've only seen this with Spark 1.1

 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most
 recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com):
 java.io.IOException: unexpected exception type

 java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)

 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:744)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.io.IOException Error in task deserialization

2014-10-09 Thread Davies Liu
Could you provide a script to reproduce this problem?

Thanks!

On Wed, Oct 8, 2014 at 9:13 PM, Sung Hwan Chung
coded...@cs.stanford.edu wrote:
 This is also happening to me on a regular basis, when the job is large with
 relatively large serialized objects used in each RDD lineage. A bad thing
 about this is that this exception always stops the whole job.


 On Fri, Sep 26, 2014 at 11:17 AM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 FWIW I suspect that each count operation is an opportunity for you to
 trigger the bug, and each filter operation increases the likelihood of
 setting up the bug.  I normally don't come across this error until my job
 has been running for an hour or two and had a chance to build up longer
 lineages for some RDDs.  It sounds like your data is a bit smaller and it's
 more feasible for you to build up longer lineages more quickly.

 If you can reduce your number of filter operations (for example by
 combining some into a single function) that may help.  It may also help to
 introduce persistence or checkpointing at intermediate stages so that the
 length of the lineages that have to get replayed isn't as long.

 On Fri, Sep 26, 2014 at 11:10 AM, Arun Ahuja aahuj...@gmail.com wrote:

 No for me as well it is non-deterministic.  It happens in a piece of code
 that does many filter and counts on a small set of records (~1k-10k).  The
 originally set is persisted in memory and we have a Kryo serializer set for
 it.  The task itself takes in just a few filtering parameters.  This with
 the same setting has sometimes completed to sucess and sometimes failed
 during this step.

 Arun

 On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 I've had multiple jobs crash due to java.io.IOException: unexpected
 exception type; I've been running the 1.1 branch for some time and am now
 running the 1.1 release binaries. Note that I only use PySpark. I haven't
 kept detailed notes or the tracebacks around since there are other problems
 that have caused my greater grief (namely key not found errors).

 For me the exception seems to occur non-deterministically, which is a
 bit interesting since the error message shows that the same stage has 
 failed
 multiple times.  Are you able to consistently re-produce the bug across
 multiple invocations at the same place?

 On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote:

 Has anyone else seen this erorr in task deserialization?  The task is
 processing a small amount of data and doesn't seem to have much data 
 hanging
 to the closure?  I've only seen this with Spark 1.1

 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times,
 most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com):
 java.io.IOException: unexpected exception type

 java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)

 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:744)






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parsing one big multiple line .xml loaded in RDD using Python

2014-10-07 Thread Davies Liu
Maybe sc.wholeTextFile() is what you want, you can get the whole text
and parse it by yourself.

On Tue, Oct 7, 2014 at 1:06 AM,  jan.zi...@centrum.cz wrote:
 Hi,

 I have already unsucesfully asked quiet simmilar question at stackoverflow,
 particularly here:
 http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim.
 I've also unsucessfully tryied some workaround, but unsucessfuly, workaround
 problem can be found at
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html.

 Particularly what I'm trying to do, I have .xml dump of wikipedia as the
 input. The .xml is quite big and it spreads across multiple lines. You can
 check it out at
 http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2.

 My goal is to parse this .xml in a same way as
 gensim.corpora.wikicorpus.extract_pages do, implementation is at
 https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py.
 Unfortunately this method does not work, because RDD.flatMap() process the
 RDD line by line as strings.

 Does anyone has some suggestion of how to possibly parse the wikipedia like
 .xml loaded in RDD using Python?

 Thank you in advance for any suggestions, advices or hints.



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()

2014-10-06 Thread Davies Liu
sc.parallelize() to distribute a list of data into numbers of partitions, but
generator can not be cut and serialized automatically.

If you can partition your generator, then you can try this:

sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x))

such as you want to generate xrange(M), M is huge, so
sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1))

On Mon, Oct 6, 2014 at 7:16 AM,  jan.zi...@centrum.cz wrote:
 Hi,


 I would like to ask if it is possible to use generator, that generates data
 bigger than size of RAM across all the machines as the input for sc =
 SparkContext(), sc.paralelize(generator). I would like to create RDD this
 way. When I am trying to create RDD by sc.TextFile(file) where file has even
 bigger size than data generated by the generator everything works fine, but
 unfortunately I need to use sc.parallelize(generator) and it makes my OS to
 kill the spark job.



 I'm getting only this log and then the job is killed:



 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root,

 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root,

 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(root, ); users with modify permissions: Set(root, )

 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started

 14/10/06 13:34:16 INFO Remoting: Starting remoting

 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]

 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]

 14/10/06 13:34:17 INFO util.Utils: Successfully started service
 'sparkDriver' on port 41016.

 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker

 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster

 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at
 /mnt/spark/spark-local-20141006133417-821e

 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection
 manager for block manager' on port 42438.

 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438
 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438)

 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with
 capacity 267.3 MB

 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register
 BlockManager

 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block
 manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM

 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager

 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is
 /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523

 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server

 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT

 14/10/06 13:34:17 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:44768

 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file
 server' on port 44768.

 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT

 14/10/06 13:34:17 INFO server.AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040

 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on
 port 4040.

 14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at
 http://ec2-54-164-72-236.compute-1.amazonaws.com:4040

 14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to
 /tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py

 14/10/06 13:34:18 INFO spark.SparkContext: Added file
 file:/root/generator_test.py at
 http://172.31.25.197:44768/files/generator_test.py with timestamp
 1412602458065

 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Connecting to master
 spark://ec2-54-164-72-236.compute-1.amazonaws.com:7077...

 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend
 is ready for scheduling beginning after reached minRegisteredResourcesRatio:
 0.0

 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Connected to
 Spark cluster with app ID app-20141006133418-0046

 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor added:
 app-20141006133418-0046/0 on
 worker-20141005074620-ip-172-31-30-40.ec2.internal-49979
 (ip-172-31-30-40.ec2.internal:49979) with 1 cores

 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Granted executor
 ID app-20141006133418-0046/0 on hostPort ip-172-31-30-40.ec2.internal:49979
 with 1 cores, 512.0 MB RAM

 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor updated:
 app-20141006133418-0046/0 is now RUNNING

 14/10/06 13:34:21 INFO cluster.SparkDeploySchedulerBackend: Registered
 executor:
 Actor[akka.tcp://sparkExecutor@ip-172-31-30-40.ec2.internal:50877/user/Executor#-1621441852]
 with ID 0

 

Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()

2014-10-06 Thread Davies Liu
On Mon, Oct 6, 2014 at 1:08 PM,  jan.zi...@centrum.cz wrote:
 Hi,


 Thank you for your advice. It really might work, but to specify my problem a
 bit more, think of my data more like one generated item is one parsed
 wikipedia page. I am getting this generator from the parser and I don't want
 to save it to the storage, but directly apply parallelize and create RDD,
 based on your advice I'm now thinking that something like batching and
 creating several RDDs and then applying union on them might possibly be the
 way to go.


 Originaly I was thinking of calling the parsing function in flatMap on the
 RDD loaded from the xml file, but then I unfortunately had this problem
 http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim
 so now I am trying to parse the xml on the master node an directly put it to
 the RDD.

gensim.corpora.wikicorpus.extract_pages should be call in flatMap() for better
performance (or it will be the bottleneck in master).

Because the function used in flatMap() is executed in worker, so you should make
sure that the files (accessed by extract_pages) should be accessable by workers,
putting them in a DFS or NFS in cluster mode. In local mode, may be you should
use absolute path for the files.

Davies

 __
 Od: Davies Liu dav...@databricks.com
 Komu: jan.zi...@centrum.cz
 Datum: 06.10.2014 18:09
 Předmět: Re: Spark and Python using generator of data bigger than RAM as
 input to sc.parallelize()


 sc.parallelize() to distribute a list of data into numbers of partitions,
 but
 generator can not be cut and serialized automatically.

 If you can partition your generator, then you can try this:

 sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x))

 such as you want to generate xrange(M), M is huge, so
 sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1))

 On Mon, Oct 6, 2014 at 7:16 AM,  jan.zi...@centrum.cz wrote:
 Hi,


 I would like to ask if it is possible to use generator, that generates
 data
 bigger than size of RAM across all the machines as the input for sc =
 SparkContext(), sc.paralelize(generator). I would like to create RDD this
 way. When I am trying to create RDD by sc.TextFile(file) where file has
 even
 bigger size than data generated by the generator everything works fine,
 but
 unfortunately I need to use sc.parallelize(generator) and it makes my OS
 to
 kill the spark job.



 I'm getting only this log and then the job is killed:



 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root,

 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to:
 root,

 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(root, ); users with modify permissions: Set(root, )

 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started

 14/10/06 13:34:16 INFO Remoting: Starting remoting

 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]

 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]

 14/10/06 13:34:17 INFO util.Utils: Successfully started service
 'sparkDriver' on port 41016.

 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker

 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster

 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory
 at
 /mnt/spark/spark-local-20141006133417-821e

 14/10/06 13:34:17 INFO util.Utils: Successfully started service
 'Connection
 manager for block manager' on port 42438.

 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port
 42438
 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438)

 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with
 capacity 267.3 MB

 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register
 BlockManager

 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block
 manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM

 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager

 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is
 /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523

 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server

 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT

 14/10/06 13:34:17 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:44768

 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file
 server' on port 44768.

 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT

 14/10/06 13:34:17 INFO server.AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040

 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI

Re: Trouble getting filtering on field correct

2014-10-03 Thread Davies Liu
rdd.filter(lambda line: int(line.split(' ')[8]) = 125)

On Fri, Oct 3, 2014 at 8:16 PM, Chop thomrog...@att.net wrote:
 Given an RDD  with multiple lines of the form:

 u'207.86.121.131 207.86.121.131 2012-11-27 13:02:17 titlestring 622592 27
 184464'
 (fields are separated by a  )

 What pyspark function/commands do I use to filter out those lines where
 line[8] = x? (i.e line[8] = 125)

 when I use line.split( ) I get an RDD of each field in each line.

 Thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-getting-filtering-on-field-correct-tp15728.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: IPython Notebook Debug Spam

2014-10-01 Thread Davies Liu
On Tue, Sep 30, 2014 at 10:14 PM, Rick Richardson
rick.richard...@gmail.com wrote:
 I am experiencing significant logging spam when running PySpark in IPython
 Notebok

 Exhibit A:  http://i.imgur.com/BDP0R2U.png

 I have taken into consideration advice from:
 http://apache-spark-user-list.1001560.n3.nabble.com/Disable-all-spark-logging-td1960.html

 also

 http://stackoverflow.com/questions/25193488/how-to-turn-off-info-logging-in-pyspark

 I have only one log4j.properties it is in /opt/spark-1.1.0/conf

 Just before I launch IPython Notebook with a pyspark profile, I add the dir
 and the properties file directly to CLASSPATH and SPARK_CLASSPATH env vars
 (as you can also see from the png)

 I still haven't been able to make any change which disables this infernal
 debug output.

 Any ideas (WAGs, Solutions, commiserating)  would be greatly appreciated.

 ---

 My log4j.properties:

 log4j.rootCategory=INFO, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
 %c{1}: %m%n

You should change log4j.rootCategory to WARN, console

 # Change this to set Spark log level
 log4j.logger.org.apache.spark=INFO

 # Silence akka remoting
 log4j.logger.Remoting=WARN

 # Ignore messages below warning level from Jetty, because it's a bit verbose
 log4j.logger.org.eclipse.jetty=WARN



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: IPython Notebook Debug Spam

2014-10-01 Thread Davies Liu
How do you setup IPython to access pyspark in notebook?

I did as following, it worked for me:

$ export SPARK_HOME=/opt/spark-1.1.0/
$ export 
PYTHONPATH=/opt/spark-1.1.0/python:/opt/spark-1.1.0/python/lib/py4j-0.8.2.1-src.zip
$ ipython notebook

All the logging will go into console (not in notebook),

If you want to reduce the logging in console, you should change
/opt/spark-1.1.0/conf/log4j.properties

log4j.rootCategory=WARN, console
og4j.logger.org.apache.spark=WARN


On Wed, Oct 1, 2014 at 11:49 AM, Rick Richardson
rick.richard...@gmail.com wrote:
 Thanks for your reply.  Unfortunately changing the log4j.properties within
 SPARK_HOME/conf has no effect on pyspark for me.  When I change it in the
 master or workers the log changes have the desired effect, but pyspark seems
 to ignore them.  I have changed the levels to WARN, changed the appender to
 rolling file, or removed it entirely, all with the same results.

 On Wed, Oct 1, 2014 at 1:49 PM, Davies Liu dav...@databricks.com wrote:

 On Tue, Sep 30, 2014 at 10:14 PM, Rick Richardson
 rick.richard...@gmail.com wrote:
  I am experiencing significant logging spam when running PySpark in
  IPython
  Notebok
 
  Exhibit A:  http://i.imgur.com/BDP0R2U.png
 
  I have taken into consideration advice from:
 
  http://apache-spark-user-list.1001560.n3.nabble.com/Disable-all-spark-logging-td1960.html
 
  also
 
 
  http://stackoverflow.com/questions/25193488/how-to-turn-off-info-logging-in-pyspark
 
  I have only one log4j.properties it is in /opt/spark-1.1.0/conf
 
  Just before I launch IPython Notebook with a pyspark profile, I add the
  dir
  and the properties file directly to CLASSPATH and SPARK_CLASSPATH env
  vars
  (as you can also see from the png)
 
  I still haven't been able to make any change which disables this
  infernal
  debug output.
 
  Any ideas (WAGs, Solutions, commiserating)  would be greatly
  appreciated.
 
  ---
 
  My log4j.properties:
 
  log4j.rootCategory=INFO, console
  log4j.appender.console=org.apache.log4j.ConsoleAppender
  log4j.appender.console.layout=org.apache.log4j.PatternLayout
  log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
  %c{1}: %m%n

 You should change log4j.rootCategory to WARN, console

  # Change this to set Spark log level
  log4j.logger.org.apache.spark=INFO
 
  # Silence akka remoting
  log4j.logger.Remoting=WARN
 
  # Ignore messages below warning level from Jetty, because it's a bit
  verbose
  log4j.logger.org.eclipse.jetty=WARN
 
 




 --


 “Science is the great antidote to the poison of enthusiasm and
 superstition.”  -- Adam Smith


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.NegativeArraySizeException in pyspark

2014-09-25 Thread Davies Liu
On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller
bmill...@eecs.berkeley.edu wrote:
 Hi Davies,

 Thanks for your help.

 I ultimately re-wrote the code to use broadcast variables, and then received
 an error when trying to broadcast self.all_models that the size did not fit
 in an int (recall that broadcasts use 32 bit ints to store size),

What is the error? Could you file a JIRA for it?

 that it was in fact over 2G.  I don't know why the previous tests (described
 above) where duplicated portions of self.all_models worked (it could have
 been an error in either my debugging or notes), but splitting the
 self.all_models into a separate broadcast variable for each element worked.
 I avoided broadcast variables for a while since there was no way to
 unpersist them in pyspark, but now that there is you're completely right
 that using broadcast is the correct way to code this.

In 1.1, you could use broadcast.unpersist() to release it, also the performance
of Python Broadcast was much improved in 1.1.


 best,
 -Brad

 On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote:

 Or maybe there is a bug related to the base64 in py4j, could you
 dumps the serialized bytes of closure to verify this?

 You could add a line in spark/python/pyspark/rdd.py:

 ser = CloudPickleSerializer()
 pickled_command = ser.dumps(command)
 +  print len(pickled_command), repr(pickled_command)


 On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:
  Hi Davies,
 
  That's interesting to know.  Here's more details about my code.  The
  object
  (self) contains pointers to the spark_context (which seems to generate
  errors during serialization) so I strip off the extra state using the
  outer
  lambda function and just pass the value self.all_models into the map.
  all_models is a list of length 9 where each element contains 3 numbers
  (ints
  or floats, can't remember) and then one LinearSVC object.  The
  classifier
  was trained over ~2.5M features, so the object isn't small, but probably
  shouldn't be 150M either.  Additionally, the call ran OK when I use
  either
  2x the first 5 objects or 2x the last 5 objects (another reason why it
  seems
  unlikely the bug was size related).
 
  def _predict_all_models(all_models, sample):
  scores = []
  for _, (_, _, classifier) in all_models:
  score = classifier.decision_function(sample[VALUE][RECORD])
  scores.append(float(score))
  return (sample[VALUE][LABEL], scores)
 
  # fails
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models)
  # works
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
  #return (lambda am: testing_feature_rdd.map(lambda x:
  _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])
 
  I've since written a work-around into my code, but if I get a chance
  I'll
  switch to broadcast variables and see whether that works.
 
  later,
  -brad
 
  On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com
  wrote:
 
  The traceback said that the serialized closure cannot be parsed
  (base64)
  correctly by py4j.
 
  The string in Java cannot be longer than 2G, so the serialized closure
  cannot longer than 1.5G (there are overhead in base64), is it possible
  that your data used in the map function is so big? If it's, you should
  use broadcast for it.
 
  In master of Spark, we will use broadcast automatically if the closure
  is too big. (but use broadcast explicitly is always better).
 
  On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
  bmill...@eecs.berkeley.edu wrote:
   Hi All,
  
   I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
   script
   I have.  I've pasted the full traceback at the end of this email.
  
   I have isolated the line of code in my script which causes the
   exception
   to occur. Although the exception seems to occur deterministically, it
   is
   very unclear why the different variants of the line would cause the
   exception to occur. Unfortunately, I am only able to reproduce the
   bug
   in
   the context of a large data processing job, and the line of code
   which
   must
   change to reproduce the bug has little meaning out of context.  The
   bug
   occurs when I call map on an RDD with a function that references
   some
   state outside of the RDD (which is presumably bundled up and
   distributed
   with the function).  The output of the function is a tuple where the
   first
   element is an int and the second element is a list of floats (same
   positive
   length every time, as verified by an 'assert' statement).
  
   Given that:
   -It's unclear why changes in the line would cause an exception
   -The exception comes from within pyspark code
   -The exception has to do with negative array sizes (and I couldn't
   have
   created a negative

Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-25 Thread Davies Liu
Maybe you have Python 2.7 on master but Python 2.6 in cluster,
you should upgrade python to 2.7 in cluster, or use python 2.6 in
master by set PYSPARK_PYTHON=python2.6

On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson
a...@santacruzintegration.com wrote:
 Hi

 I am running into trouble using iPython notebook on my cluster. Use the
 following command to set the cluster up

 $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE
 --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME


 On master I launch python as follows

 $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000
 $SPARK_HOME/bin/pyspark


 It looks like the problem is the cluster is using an old version of python
 and python. Any idea how I can easily upgrade ? The following version works
 on my mac

 Thanks

 Andy

 {'commit_hash': '681fd77',
  'commit_source': 'installation',
  'default_encoding': 'UTF-8',
  'ipython_path': '/Library/Python/2.7/site-packages/IPython',
  'ipython_version': '2.1.0',
  'os_name': 'posix',
  'platform': 'Darwin-13.3.0-x86_64-i386-64bit',
  'sys_executable': '/usr/bin/python',
  'sys_platform': 'darwin',
  'sys_version': '2.7.5 (default, Mar  9 2014, 22:15:05) \n[GCC 4.2.1
 Compatible Apple LLVM 5.0 (clang-500.0.68)]’}





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Null values in pyspark Row

2014-09-24 Thread Davies Liu
Could create a JIRA and add test cases for it? Thanks!

Davies

On Wed, Sep 24, 2014 at 11:56 AM, jamborta jambo...@gmail.com wrote:
 Hi all,

 I have just updated to spark 1.1.0. The new row representation of the data
 in spark SQL is very handy.

 I have noticed that it does not set None for NULL values coming from Hive if
 the column was string type - seems it works with other types.

 Is that something that will be implemented?

 Thanks,




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Null-values-in-pyspark-Row-tp15065.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: access javaobject in rdd map

2014-09-23 Thread Davies Liu
You should create a pure Python object (copy the attributes from Java object),
 then it could be used in map.

Davies

On Tue, Sep 23, 2014 at 8:48 AM, jamborta jambo...@gmail.com wrote:
 Hi all,

 I have a java object that contains a ML model which I would like to use for
 prediction (in python). I just want to iterate the data through a mapper and
 predict for each value. Unfortunately, this fails when it tries to serialise
 the object to sent it to the nodes.

 Is there a trick around this? Surely, this object could be picked up by
 reference at the nodes.

 many thanks,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-tp14898.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: access javaobject in rdd map

2014-09-23 Thread Davies Liu
Right now, there is no way to access JVM in Python worker, in order
to make this happen, we need to do:

1. setup py4j in Python worker
2. serialize the JVM objects and transfer to executors
3. link the JVM objects and py4j together to get an interface

Before these happens, maybe you could try to setup a service
for the model (such as RESTful service), access it map via RPC.

On Tue, Sep 23, 2014 at 9:48 AM, Tamas Jambor jambo...@gmail.com wrote:
 Hi Davies,

 Thanks for the reply. I saw that you guys do that way in the code. Is
 there no other way?

 I have implemented all the predict functions in scala, so I prefer not
 to reimplement the whole thing in python.

 thanks,


 On Tue, Sep 23, 2014 at 5:40 PM, Davies Liu dav...@databricks.com wrote:
 You should create a pure Python object (copy the attributes from Java 
 object),
  then it could be used in map.

 Davies

 On Tue, Sep 23, 2014 at 8:48 AM, jamborta jambo...@gmail.com wrote:
 Hi all,

 I have a java object that contains a ML model which I would like to use for
 prediction (in python). I just want to iterate the data through a mapper and
 predict for each value. Unfortunately, this fails when it tries to serialise
 the object to sent it to the nodes.

 Is there a trick around this? Surely, this object could be picked up by
 reference at the nodes.

 many thanks,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-tp14898.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.NegativeArraySizeException in pyspark

2014-09-23 Thread Davies Liu
Or maybe there is a bug related to the base64 in py4j, could you
dumps the serialized bytes of closure to verify this?

You could add a line in spark/python/pyspark/rdd.py:

ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
+  print len(pickled_command), repr(pickled_command)


On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
bmill...@eecs.berkeley.edu wrote:
 Hi Davies,

 That's interesting to know.  Here's more details about my code.  The object
 (self) contains pointers to the spark_context (which seems to generate
 errors during serialization) so I strip off the extra state using the outer
 lambda function and just pass the value self.all_models into the map.
 all_models is a list of length 9 where each element contains 3 numbers (ints
 or floats, can't remember) and then one LinearSVC object.  The classifier
 was trained over ~2.5M features, so the object isn't small, but probably
 shouldn't be 150M either.  Additionally, the call ran OK when I use either
 2x the first 5 objects or 2x the last 5 objects (another reason why it seems
 unlikely the bug was size related).

 def _predict_all_models(all_models, sample):
 scores = []
 for _, (_, _, classifier) in all_models:
 score = classifier.decision_function(sample[VALUE][RECORD])
 scores.append(float(score))
 return (sample[VALUE][LABEL], scores)

 # fails
 #return (lambda am: testing_feature_rdd.map(lambda x:
 _predict_all_models(am, x))) (self.all_models)
 # works
 #return (lambda am: testing_feature_rdd.map(lambda x:
 _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5])
 #return (lambda am: testing_feature_rdd.map(lambda x:
 _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:])

 I've since written a work-around into my code, but if I get a chance I'll
 switch to broadcast variables and see whether that works.

 later,
 -brad

 On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote:

 The traceback said that the serialized closure cannot be parsed (base64)
 correctly by py4j.

 The string in Java cannot be longer than 2G, so the serialized closure
 cannot longer than 1.5G (there are overhead in base64), is it possible
 that your data used in the map function is so big? If it's, you should
 use broadcast for it.

 In master of Spark, we will use broadcast automatically if the closure
 is too big. (but use broadcast explicitly is always better).

 On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:
  Hi All,
 
  I'm experiencing a java.lang.NegativeArraySizeException in a pyspark
  script
  I have.  I've pasted the full traceback at the end of this email.
 
  I have isolated the line of code in my script which causes the
  exception
  to occur. Although the exception seems to occur deterministically, it is
  very unclear why the different variants of the line would cause the
  exception to occur. Unfortunately, I am only able to reproduce the bug
  in
  the context of a large data processing job, and the line of code which
  must
  change to reproduce the bug has little meaning out of context.  The bug
  occurs when I call map on an RDD with a function that references some
  state outside of the RDD (which is presumably bundled up and distributed
  with the function).  The output of the function is a tuple where the
  first
  element is an int and the second element is a list of floats (same
  positive
  length every time, as verified by an 'assert' statement).
 
  Given that:
  -It's unclear why changes in the line would cause an exception
  -The exception comes from within pyspark code
  -The exception has to do with negative array sizes (and I couldn't have
  created a negative sized array anywhere in my python code)
  I suspect this is a bug in pyspark.
 
  Has anybody else observed or reported this bug?
 
  best,
  -Brad
 
  Traceback (most recent call last):
File /home/bmiller1/pipeline/driver.py, line 214, in module
  main()
File /home/bmiller1/pipeline/driver.py, line 203, in main
  bl.write_results(iteration_out_dir)
File /home/bmiller1/pipeline/layer/svm_layer.py, line 137, in
  write_results
  fig, accuracy = _get_results(self.prediction_rdd)
File /home/bmiller1/pipeline/layer/svm_layer.py, line 56, in
  _get_results
  predictions = np.array(prediction_rdd.collect())
File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line
  723, in collect
  bytesInJava = self._jrdd.collect().iterator()
File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line
  2026, in _jrdd
  broadcast_vars, self.ctx._javaAccumulator)
File
 
  /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
  line 701, in __call__
File
 
  /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
  line 304, in get_return_value
  py4j.protocol.Py4JError: An error occurred

Re: Time difference between Python and Scala

2014-09-19 Thread Davies Liu
I think it's normal.

On Fri, Sep 19, 2014 at 12:07 AM, Luis Guerra luispelay...@gmail.com wrote:
 Hello everyone,

 What should be the normal time difference between Scala and Python using
 Spark? I mean running the same program in the same cluster environment.

 In my case I am using numpy array structures for the Python code and vectors
 for the Scala code, both for handling my data. The time difference I have so
 far is Scala being around 6x times faster than Python...is it normal?

 Best regards

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: schema for schema

2014-09-18 Thread Davies Liu
Thanks for reporting this, it will be fixed by
https://github.com/apache/spark/pull/2448

On Thu, Sep 18, 2014 at 12:32 PM, Michael Armbrust
mich...@databricks.com wrote:
 This looks like a bug, we are investigating.

 On Thu, Sep 18, 2014 at 8:49 AM, Eric Friedman eric.d.fried...@gmail.com
 wrote:

 I have a SchemaRDD which I've gotten from a parquetFile.

 Did some transforms on it and now want to save it back out as parquet
 again.

 Getting a SchemaRDD proves challenging because some of my fields can be
 null/None and SQLContext.inferSchema abjects those.

 So, I decided to use the schema on the original RDD with
 SQLContext.applySchema.

 This works, but only if I add a map function to turn my Row objects into a
 list. (pyspark)

 applied = sq.applySchema(transformed_rows.map(lambda r: list(r)),
 original_parquet_file.schema())


 This seems a bit kludgy.  Is there a better way?  Should there be?



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: The difference between pyspark.rdd.PipelinedRDD and pyspark.rdd.RDD

2014-09-17 Thread Davies Liu
PipelinedRDD is an RDD generated by Python mapper/reducer, such as
rdd.map(func) will be PipelinedRDD.

PipelinedRDD is an subclass of RDD, so it should have all the APIs which
RDD has.

 sc.parallelize(range(10)).map(lambda x: (x, str(x))).sortByKey().count()
10

I'm wondering that how can you trigger this error?

Davies

On Tue, Sep 16, 2014 at 10:03 PM, edmond_huo huoxiang5...@gmail.com wrote:
 Hi,

 I am a freshman about spark. I tried to run a job like wordcount example in
 python. But when I tried to get the top 10 popular words in the file, I got
 the message:AttributeError: 'PipelinedRDD' object has no attribute
 'sortByKey'.

 So my question is what is the difference between PipelinedRDD and RDD? and
 if I want to sort the data in PipelinedRDD, how can I do it?

 Thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/The-difference-between-pyspark-rdd-PipelinedRDD-and-pyspark-rdd-RDD-tp14421.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark on yarn - lost executor

2014-09-17 Thread Davies Liu
Maybe the Python worker use too much memory during groupByKey(),
groupByKey() with larger numPartitions can help.

Also, can you upgrade your cluster to 1.1? It can spilling the data
into disks if the memory can not hold all the data during groupByKey().

Also, If there is hot key with dozens of millions of values, the PR [1]
can help it, it actually helped someone with large datasets (3T).

Davies

[1] https://github.com/apache/spark/pull/1977

On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 Sure, I'll post to the mail list.

 groupByKey(self, numPartitions=None)

 source code

 Group the values for each key in the RDD into a single sequence. 
 Hash-partitions the resulting RDD with into numPartitions partitions.


 So instead of using default I'll provide numPartitions , but what is the best 
 practice to calculate the number of partitions? and how number of partitions 
 related to my original problem?


 Thanks

 Oleg.


 http://spark.apache.org/docs/1.0.2/api/python/frames.html



 On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman eric.d.fried...@gmail.com 
 wrote:

 Look at the API for text file and groupByKey. Please don't take threads off 
 list. Other people have the same questions.

 
 Eric Friedman

 On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 Can hou please explain how to configure partitions?
 Thanks
 Oleg

 On Wednesday, September 17, 2014, Eric Friedman eric.d.fried...@gmail.com 
 wrote:

 Yeah, you need to increase partitions. You only have one on your text file. 
 On groupByKey you're getting the pyspark default, which is too low.

 
 Eric Friedman

 On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 This is very good question :-).

 Here is my code:

 sc = SparkContext(appName=CAD)
 lines = sc.textFile(sys.argv[1], 1)
 result = lines.map(doSplit).groupByKey().mapValues(lambda vc: 
 my_custom_function(vc))
 result.saveAsTextFile(sys.argv[2])

 Should I configure partitioning manually ? Where should I configure it? 
 Where can I read about partitioning best practices?

 Thanks
 Oleg.

 On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman eric.d.fried...@gmail.com 
 wrote:

 How many partitions do you have in your input rdd?  Are you specifying 
 numPartitions in subsequent calls to groupByKey/reduceByKey?

 On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com wrote:

 Hi ,
   I am execution pyspark on yarn.
 I have successfully executed initial dataset but now I growed it 10 times 
 more.

 during execution I got all the time this error:
   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost 
 executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated

  tasks are failed a resubmitted again:

 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 
 29, 32, 33, 48, 75, 86, 91, 93, 94
 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 
 93
 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 
 27, 39, 51, 64
 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 
 42, 61, 67, 77, 81, 91
 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 
 29, 34, 40, 46, 67, 69, 86
 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 
 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 
 85, 89
 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 
 79, 92
 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 
 24, 31, 43, 65, 73
 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD 
 at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 
 72, 75, 84



 QUESTION:
how to debug / tune the problem.
 What can cause to such behavior?
 I have 5 machine cluster with 32 GB ram.
  Dataset - 3G.

 command for execution:

  
 /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit 
 --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g 
 --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py 
  /input/tad/inpuut.csv  /output/cad_model_500_2


 Where can I find 

Re: Number of partitions when saving (pyspark)

2014-09-17 Thread Davies Liu
On Wed, Sep 17, 2014 at 5:21 AM, Luis Guerra luispelay...@gmail.com wrote:
 Hi everyone,

 Is it possible to fix the number of tasks related to a saveAsTextFile in
 Pyspark?

 I am loading several files from HDFS, fixing the number of partitions to X
 (let's say 40 for instance). Then some transformations, like joins and
 filters are carried out. The weird thing here is that the number of tasks
 involved in these transformations are 80, i.e. the double of the fixed
 number of partitions. However, when the saveAsTextFile action is carried
 out, there are only 4 tasks to do this (and I have not been able to increase
 that number). My problem here is that those 4 tasks make rapidly increase
 the used memory and take too long to finish.

 I am launching my process from windows to a cluster in ubuntu, with 13
 computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2.

The saveAsTextFile() is an mapper RDD, so the number of partitions of it
is determined by previous RDD.

In Spark 1.0.2, groupByKey() or reduceByKey() will take the number of CPUs
on driver (locally) as the default partitions, so it's 4. You need to change it
to 40 or 80 in this case.

BTW, In Spark 1.1, groupByKey() and reduceByKey() will use the number of
partitions of previous RDD as the default value.

Davies

 Any clue with this?

 Thanks in advance

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to ship external Python libraries in PYSPARK

2014-09-16 Thread Davies Liu
Yes, sc.addFile() is what you want:

 |  addFile(self, path)
 |  Add a file to be downloaded with this Spark job on every node.
 |  The C{path} passed can be either a local file, a file in HDFS
 |  (or other Hadoop-supported filesystems), or an HTTP, HTTPS or
 |  FTP URI.
 |
 |  To access the file in Spark jobs, use
 |  L{SparkFiles.get(fileName)pyspark.files.SparkFiles.get} with the
 |  filename to find its download location.
 |
 |   from pyspark import SparkFiles
 |   path = os.path.join(tempdir, test.txt)
 |   with open(path, w) as testFile:
 |  ...testFile.write(100)
 |   sc.addFile(path)
 |   def func(iterator):
 |  ...with open(SparkFiles.get(test.txt)) as testFile:
 |  ...fileVal = int(testFile.readline())
 |  ...return [x * fileVal for x in iterator]
 |   sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
 |  [100, 200, 300, 400]

On Tue, Sep 16, 2014 at 7:02 PM, daijia jia_...@intsig.com wrote:
 Is there some way to ship textfile just like ship python libraries?

 Thanks in advance
 Daijia



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-ship-external-Python-libraries-in-PYSPARK-tp14074p14412.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Broadcast error

2014-09-15 Thread Davies Liu
(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 and
 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to
 akka.tcp://sparkMaster@hostname:7077:
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkMaster@ hostname:7077]

 ??
 Any suggestions??


 On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu dav...@databricks.com
 wrote:

 Hey Chengi,

 What's the version of Spark you are using? It have big improvements
 about broadcast in 1.1, could you try it?

 On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu chengi.liu...@gmail.com
 wrote:
  Any suggestions.. I am really blocked on this one
 
  On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu
  chengi.liu...@gmail.com wrote:
 
  And when I use sparksubmit script, I get the following error:
 
  py4j.protocol.Py4JJavaError: An error occurred while calling
  o26.trainKMeansModel.
  : org.apache.spark.SparkException: Job aborted due to stage
  failure: All
  masters are unresponsive! Giving up.
  at
 
  org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
  at
 
  org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
  at
 
  org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
  at
 
  scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 
  org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
  at
 
  org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
  at
 
  org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
  at scala.Option.foreach(Option.scala:236)
  at
 
  org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
  at
 
  org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 
  akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at
  scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
 
  scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
 
  scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
 
  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
 
  My spark submit code is
 
  conf = SparkConf().set(spark.executor.memory,
  32G).set(spark.akka.frameSize, 1000)
  sc = SparkContext(conf = conf)
  rdd = sc.parallelize(matrix,5)
 
  from pyspark.mllib.clustering import KMeans
  from math import sqrt
  clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
  initializationMode=random)
  def error(point):
  center = clusters.centers[clusters.predict(point)]
  return sqrt(sum([x**2 for x in (point - center)]))
 
  WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x
  + y)
  print Within Set Sum of Squared Error =  + str(WSSSE)
 
  Which is executed as following:
  spark-submit --master $SPARKURL clustering_example.py
  --executor-memory
  32G  --driver-memory 60G
 
  On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu
  chengi.liu...@gmail.com
  wrote:
 
  How? Example please..
  Also, if I am running this in pyspark shell.. how do i configure
  spark.akka.frameSize ??
 
 
  On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das
  ak...@sigmoidanalytics.com
  wrote:
 
  When the data size is huge, you better of use the
  torrentBroadcastFactory.
 
  Thanks
  Best Regards
 
  On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu
  chengi.liu...@gmail.com
  wrote:
 
  Specifically the error I see when I try to operate on rdd
  created by
  sc.parallelize method
  : org.apache.spark.SparkException: Job aborted due to stage
  failure:
  Serialized task 12:12 was 12062263 bytes which exceeds
  spark.akka.frameSize
  (10485760 bytes). Consider using broadcast variables for large
  values.
 
  On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu
  chengi.liu...@gmail.com
  wrote:
 
  Hi,
 I am trying to create an rdd out of large matrix
  sc.parallelize
  suggest to use broadcast
  But when I do
 
  sc.broadcast(data)
  I get this error:
 
  Traceback (most recent call last):
File stdin, line 1, in module
File
  /usr/common/usg/spark/1.0.2/python/pyspark/context.py, line
  370, in broadcast
  pickled = pickleSer.dumps(value)
File
  /usr/common/usg/spark/1.0.2/python/pyspark/serializers.py,
  line 279, in dumps
  def dumps(self, obj): return cPickle.dumps(obj, 2)
  SystemError: error return without exception set

Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Davies Liu
In PySpark, I think you could enumerate all the valid files, and create RDD by
newAPIHadoopFile(), then union them together.

On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman
eric.d.fried...@gmail.com wrote:
 I neglected to specify that I'm using pyspark. Doesn't look like these APIs 
 have been bridged.

 
 Eric Friedman

 On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com wrote:

 Hi Eric,

 Something along the lines of the following should work

 val fs = getFileSystem(...) // standard hadoop API call
 val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath,
 pathFilter).map(_.getPath.toString).mkString(,)  // pathFilter is an
 instance of org.apache.hadoop.fs.PathFilter
 val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths,
 classOf[ParquetInputFormat[Something]], classOf[Void],
 classOf[SomeAvroType], getConfiguration(...))

 You have to do some initializations on ParquetInputFormat such as
 AvroReadSetup/AvroWriteSupport etc but that you should be doing
 already I am guessing.

 Cheers,
 Nat


 On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
 Hi,

 I have a directory structure with parquet+avro data in it. There are a
 couple of administrative files (.foo and/or _foo) that I need to ignore when
 processing this data or Spark tries to read them as containing parquet
 content, which they do not.

 How can I set a PathFilter on the FileInputFormat used to construct an RDD?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Write 1 RDD to multiple output paths in one go

2014-09-15 Thread Davies Liu
Maybe we should provide an API like saveTextFilesByKey(path),
could you create an JIRA for it ?

There is one in DPark [1] actually.

[1] https://github.com/douban/dpark/blob/master/dpark/rdd.py#L309

On Mon, Sep 15, 2014 at 7:08 AM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 Any tips from anybody on how to do this in PySpark? (Or regular Spark, for
 that matter.)

 On Sat, Sep 13, 2014 at 1:25 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 Howdy doody Spark Users,

 I’d like to somehow write out a single RDD to multiple paths in one go.
 Here’s an example.

 I have an RDD of (key, value) pairs like this:

  a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben',
  'Frankie']).keyBy(lambda x: x[0])
  a.collect()
 [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F',
 'Frankie')]

 Now I want to write the RDD out to different paths depending on the keys,
 so that I have one output directory per distinct key. Each output directory
 could potentially have multiple part- files or whatever.

 So my output would be something like:

 /path/prefix/n [/part-1, /part-2, etc]
 /path/prefix/b [/part-1, /part-2, etc]
 /path/prefix/f [/part-1, /part-2, etc]

 How would you do that?

 I suspect I need to use saveAsNewAPIHadoopFile or saveAsHadoopFile along
 with the MultipleTextOutputFormat output format class, but I’m not sure how.

 By the way, there is a very similar question to this here on Stack
 Overflow.

 Nick


 
 View this message in context: Write 1 RDD to multiple output paths in one
 go
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Davies Liu
There is one way by do it in bash: hadoop fs -ls , maybe you could
end up with a bash scripts to do the things.

On Mon, Sep 15, 2014 at 1:01 PM, Eric Friedman
eric.d.fried...@gmail.com wrote:
 That's a good idea and one I had considered too.  Unfortunately I'm not
 aware of an API in PySpark for enumerating paths on HDFS.  Have I overlooked
 one?

 On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote:

 In PySpark, I think you could enumerate all the valid files, and create
 RDD by
 newAPIHadoopFile(), then union them together.

 On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
  I neglected to specify that I'm using pyspark. Doesn't look like these
  APIs have been bridged.
 
  
  Eric Friedman
 
  On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com
  wrote:
 
  Hi Eric,
 
  Something along the lines of the following should work
 
  val fs = getFileSystem(...) // standard hadoop API call
  val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath,
  pathFilter).map(_.getPath.toString).mkString(,)  // pathFilter is an
  instance of org.apache.hadoop.fs.PathFilter
  val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths,
  classOf[ParquetInputFormat[Something]], classOf[Void],
  classOf[SomeAvroType], getConfiguration(...))
 
  You have to do some initializations on ParquetInputFormat such as
  AvroReadSetup/AvroWriteSupport etc but that you should be doing
  already I am guessing.
 
  Cheers,
  Nat
 
 
  On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman
  eric.d.fried...@gmail.com wrote:
  Hi,
 
  I have a directory structure with parquet+avro data in it. There are a
  couple of administrative files (.foo and/or _foo) that I need to
  ignore when
  processing this data or Spark tries to read them as containing parquet
  content, which they do not.
 
  How can I set a PathFilter on the FileInputFormat used to construct an
  RDD?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PathFilter for newAPIHadoopFile?

2014-09-15 Thread Davies Liu
Or maybe you could give this one a try:
https://labs.spotify.com/2013/05/07/snakebite/

On Mon, Sep 15, 2014 at 2:51 PM, Davies Liu dav...@databricks.com wrote:
 There is one way by do it in bash: hadoop fs -ls , maybe you could
 end up with a bash scripts to do the things.

 On Mon, Sep 15, 2014 at 1:01 PM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
 That's a good idea and one I had considered too.  Unfortunately I'm not
 aware of an API in PySpark for enumerating paths on HDFS.  Have I overlooked
 one?

 On Mon, Sep 15, 2014 at 10:01 AM, Davies Liu dav...@databricks.com wrote:

 In PySpark, I think you could enumerate all the valid files, and create
 RDD by
 newAPIHadoopFile(), then union them together.

 On Mon, Sep 15, 2014 at 5:49 AM, Eric Friedman
 eric.d.fried...@gmail.com wrote:
  I neglected to specify that I'm using pyspark. Doesn't look like these
  APIs have been bridged.
 
  
  Eric Friedman
 
  On Sep 14, 2014, at 11:02 PM, Nat Padmanabhan reachn...@gmail.com
  wrote:
 
  Hi Eric,
 
  Something along the lines of the following should work
 
  val fs = getFileSystem(...) // standard hadoop API call
  val filteredConcatenatedPaths = fs.listStatus(topLevelDirPath,
  pathFilter).map(_.getPath.toString).mkString(,)  // pathFilter is an
  instance of org.apache.hadoop.fs.PathFilter
  val parquetRdd = sc.hadoopFile(filteredConcatenatedPaths,
  classOf[ParquetInputFormat[Something]], classOf[Void],
  classOf[SomeAvroType], getConfiguration(...))
 
  You have to do some initializations on ParquetInputFormat such as
  AvroReadSetup/AvroWriteSupport etc but that you should be doing
  already I am guessing.
 
  Cheers,
  Nat
 
 
  On Sun, Sep 14, 2014 at 7:37 PM, Eric Friedman
  eric.d.fried...@gmail.com wrote:
  Hi,
 
  I have a directory structure with parquet+avro data in it. There are a
  couple of administrative files (.foo and/or _foo) that I need to
  ignore when
  processing this data or Spark tries to read them as containing parquet
  content, which they do not.
 
  How can I set a PathFilter on the FileInputFormat used to construct an
  RDD?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Broadcast error

2014-09-14 Thread Davies Liu
Hey Chengi,

What's the version of Spark you are using? It have big improvements
about broadcast in 1.1, could you try it?

On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu chengi.liu...@gmail.com wrote:
 Any suggestions.. I am really blocked on this one

 On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu chengi.liu...@gmail.com wrote:

 And when I use sparksubmit script, I get the following error:

 py4j.protocol.Py4JJavaError: An error occurred while calling
 o26.trainKMeansModel.
 : org.apache.spark.SparkException: Job aborted due to stage failure: All
 masters are unresponsive! Giving up.
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 My spark submit code is

 conf = SparkConf().set(spark.executor.memory,
 32G).set(spark.akka.frameSize, 1000)
 sc = SparkContext(conf = conf)
 rdd = sc.parallelize(matrix,5)

 from pyspark.mllib.clustering import KMeans
 from math import sqrt
 clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
 initializationMode=random)
 def error(point):
 center = clusters.centers[clusters.predict(point)]
 return sqrt(sum([x**2 for x in (point - center)]))

 WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
 print Within Set Sum of Squared Error =  + str(WSSSE)

 Which is executed as following:
 spark-submit --master $SPARKURL clustering_example.py  --executor-memory
 32G  --driver-memory 60G

 On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 How? Example please..
 Also, if I am running this in pyspark shell.. how do i configure
 spark.akka.frameSize ??


 On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 When the data size is huge, you better of use the
 torrentBroadcastFactory.

 Thanks
 Best Regards

 On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 Specifically the error I see when I try to operate on rdd created by
 sc.parallelize method
 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Serialized task 12:12 was 12062263 bytes which exceeds 
 spark.akka.frameSize
 (10485760 bytes). Consider using broadcast variables for large values.

 On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu chengi.liu...@gmail.com
 wrote:

 Hi,
I am trying to create an rdd out of large matrix sc.parallelize
 suggest to use broadcast
 But when I do

 sc.broadcast(data)
 I get this error:

 Traceback (most recent call last):
   File stdin, line 1, in module
   File /usr/common/usg/spark/1.0.2/python/pyspark/context.py, line
 370, in broadcast
 pickled = pickleSer.dumps(value)
   File /usr/common/usg/spark/1.0.2/python/pyspark/serializers.py,
 line 279, in dumps
 def dumps(self, obj): return cPickle.dumps(obj, 2)
 SystemError: error return without exception set
 Help?







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: coalesce on SchemaRDD in pyspark

2014-09-12 Thread Davies Liu
This is a bug, I had create an issue to track this:
https://issues.apache.org/jira/browse/SPARK-3500

Also, there is PR to fix this: https://github.com/apache/spark/pull/2369

Before next bugfix release, you can workaround this by:

srdd = sqlCtx.jsonRDD(rdd)
srdd2 = SchemaRDD(srdd._schema_rdd.coalesce(N, false, None), sqlCtx)


On Thu, Sep 11, 2014 at 6:12 PM, Brad Miller bmill...@eecs.berkeley.edu wrote:
 Hi All,

 I'm having some trouble with the coalesce and repartition functions for
 SchemaRDD objects in pyspark.  When I run:

 sqlCtx.jsonRDD(sc.parallelize(['{foo:bar}',
 '{foo:baz}'])).coalesce(1)

 I get this error:

 Py4JError: An error occurred while calling o94.coalesce. Trace:
 py4j.Py4JException: Method coalesce([class java.lang.Integer, class
 java.lang.Boolean]) does not exist

 For context, I have a dataset stored in a parquet file, and I'm using
 SQLContext to make several queries against the data.  I then register the
 results of these as queries new tables in the SQLContext.  Unfortunately
 each new table has the same number of partitions as the original (despite
 being much smaller).  Hence my interest in coalesce and repartition.

 Has anybody else encountered this bug?  Is there an alternate workflow I
 should consider?

 I am running the 1.1.0 binaries released today.

 best,
 -Brad

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: coalesce on SchemaRDD in pyspark

2014-09-12 Thread Davies Liu
On Fri, Sep 12, 2014 at 8:55 AM, Brad Miller bmill...@eecs.berkeley.edu wrote:
 Hi Davies,

 Thanks for the quick fix. I'm sorry to send out a bug report on release day
 - 1.1.0 really is a great release.  I've been running the 1.1 branch for a
 while and there's definitely lots of good stuff.

 For the workaround, I think you may have meant:

 srdd2 = SchemaRDD(srdd._jschema_rdd.coalesce(N, False, None), sqlCtx)

Yes, thanks for the correction.

 Note:
 _schema_rdd - _jschema_rdd
 false - False

 That workaround seems to work fine (in that I've observed the correct number
 of partitions in the web-ui, although haven't tested it any beyond that).

 Thanks!
 -Brad

 On Thu, Sep 11, 2014 at 11:30 PM, Davies Liu dav...@databricks.com wrote:

 This is a bug, I had create an issue to track this:
 https://issues.apache.org/jira/browse/SPARK-3500

 Also, there is PR to fix this: https://github.com/apache/spark/pull/2369

 Before next bugfix release, you can workaround this by:

 srdd = sqlCtx.jsonRDD(rdd)
 srdd2 = SchemaRDD(srdd._schema_rdd.coalesce(N, false, None), sqlCtx)


 On Thu, Sep 11, 2014 at 6:12 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:
  Hi All,
 
  I'm having some trouble with the coalesce and repartition functions for
  SchemaRDD objects in pyspark.  When I run:
 
  sqlCtx.jsonRDD(sc.parallelize(['{foo:bar}',
  '{foo:baz}'])).coalesce(1)
 
  I get this error:
 
  Py4JError: An error occurred while calling o94.coalesce. Trace:
  py4j.Py4JException: Method coalesce([class java.lang.Integer, class
  java.lang.Boolean]) does not exist
 
  For context, I have a dataset stored in a parquet file, and I'm using
  SQLContext to make several queries against the data.  I then register
  the
  results of these as queries new tables in the SQLContext.  Unfortunately
  each new table has the same number of partitions as the original
  (despite
  being much smaller).  Hence my interest in coalesce and repartition.
 
  Has anybody else encountered this bug?  Is there an alternate workflow I
  should consider?
 
  I am running the 1.1.0 binaries released today.
 
  best,
  -Brad



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to ship external Python libraries in PYSPARK

2014-09-12 Thread Davies Liu
By SparkContext.addPyFile(xx.zip), the xx.zip will be copies to all
the workers
and stored in temporary directory, the path to xx.zip will be in the sys.path on
worker machines, so you can import xx in your jobs, it does not need to be
installed on worker machines.

PS: the package or module should be in the top level in xx.zip, or it cannot
be imported. such as :

daviesliu@dm:~/work/tmp$ zipinfo textblob.zip
Archive:  textblob.zip   3245946 bytes   517 files
drwxr-xr-x  3.0 unx0 bx stor 12-Sep-14 10:10 textblob/
-rw-r--r--  3.0 unx  203 tx defN 12-Sep-14 10:10 textblob/__init__.py
-rw-r--r--  3.0 unx  563 bx defN 12-Sep-14 10:10 textblob/__init__.pyc
-rw-r--r--  3.0 unx61510 tx defN 12-Sep-14 10:10 textblob/_text.py
-rw-r--r--  3.0 unx68316 bx defN 12-Sep-14 10:10 textblob/_text.pyc
-rw-r--r--  3.0 unx 2962 tx defN 12-Sep-14 10:10 textblob/base.py
-rw-r--r--  3.0 unx 5501 bx defN 12-Sep-14 10:10 textblob/base.pyc
-rw-r--r--  3.0 unx27621 tx defN 12-Sep-14 10:10 textblob/blob.py

you can get this textblob.zip by:

pip install textblob
cd /xxx/xx/site-package/
zip -r path_to_store/textblob.zip textblob

Davies


On Fri, Sep 12, 2014 at 1:39 AM, yh18190 yh18...@gmail.com wrote:
 Hi all,

 I am currently working on pyspark for NLP processing etc.I am using TextBlob
 python library.Normally in a standalone mode it easy to install the external
 python libraries .In case of cluster mode I am facing problem to install
 these libraries on worker nodes remotely.I cannot access each and every
 worker machine to install these libs in python path.I tried to use
 Sparkcontext pyfiles option to ship .zip files..But the problem is  these
 python packages needs to be get installed on worker machines.Could anyone
 let me know wat are different ways of doing it so that this lib-Textblob
 could be available in python path.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-ship-external-Python-libraries-in-PYSPARK-tp14074.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Setting up jvm in pyspark from shell

2014-09-11 Thread Davies Liu
The heap size of JVM can not been changed dynamically, so you
 need to config it before running pyspark.

If you run it in local mode, you should config spark.driver.memory
 (in 1.1 or master).

Or, you can use --driver-memory 2G (should work in 1.0+)

On Wed, Sep 10, 2014 at 10:43 PM, Mohit Singh mohit1...@gmail.com wrote:
 Hi,
   I am using pyspark shell and am trying to create an rdd from numpy matrix
 rdd = sc.parallelize(matrix)
 I am getting the following error:
 JVMDUMP039I Processing dump event systhrow, detail
 java/lang/OutOfMemoryError at 2014/09/10 22:41:44 - please wait.
 JVMDUMP032I JVM requested Heap dump using
 '/global/u2/m/msingh/heapdump.20140910.224144.29660.0005.phd' in response to
 an event
 JVMDUMP010I Heap dump written to
 /global/u2/m/msingh/heapdump.20140910.224144.29660.0005.phd
 JVMDUMP032I JVM requested Java dump using
 '/global/u2/m/msingh/javacore.20140910.224144.29660.0006.txt' in response to
 an event
 JVMDUMP010I Java dump written to
 /global/u2/m/msingh/javacore.20140910.224144.29660.0006.txt
 JVMDUMP032I JVM requested Snap dump using
 '/global/u2/m/msingh/Snap.20140910.224144.29660.0007.trc' in response to an
 event
 JVMDUMP010I Snap dump written to
 /global/u2/m/msingh/Snap.20140910.224144.29660.0007.trc
 JVMDUMP013I Processed dump event systhrow, detail
 java/lang/OutOfMemoryError.
 Exception AttributeError: 'SparkContext' object has no attribute '_jsc' in
 bound method SparkContext.__del__ of pyspark.context.SparkContext object
 at 0x11f9450 ignored
 Traceback (most recent call last):
   File stdin, line 1, in module
   File /usr/common/usg/spark/1.0.2/python/pyspark/context.py, line 271, in
 parallelize
 jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
   File
 /usr/common/usg/spark/1.0.2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__
   File
 /usr/common/usg/spark/1.0.2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
 : java.lang.OutOfMemoryError: Java heap space
 at
 org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)
 at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
 at java.lang.reflect.Method.invoke(Method.java:618)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:804)

 I did try to setSystemProperty
 sc.setSystemProperty(spark.executor.memory, 20g)
 How do i increase jvm heap from the shell?

 --
 Mohit

 When you want success as badly as you want the air, then you will get it.
 There is no other secret of success.
 -Socrates

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupBy gives non deterministic results

2014-09-10 Thread Davies Liu
I think the mails to spark.incubator.apache.org will be forwarded to
spark.apache.org.

Here is the header of the first mail:

from: redocpot julien19890...@gmail.com
to: u...@spark.incubator.apache.org
date: Mon, Sep 8, 2014 at 7:29 AM
subject: groupBy gives non deterministic results
mailing list: user.spark.apache.org Filter messages from this mailing list
mailed-by: spark.apache.org

I only subscribe spark.apache.org, and I do see all the mails from he.

On Wed, Sep 10, 2014 at 6:29 AM, Ye Xianjin advance...@gmail.com wrote:
 |  Do the two mailing lists share messages ?
 I don't think so.  I didn't receive this message from the user list. I am
 not in databricks, so I can't answer your other questions. Maybe Davies Liu
 dav...@databricks.com can answer you?

 --
 Ye Xianjin
 Sent with Sparrow

 On Wednesday, September 10, 2014 at 9:05 PM, redocpot wrote:

 Hi, Xianjin

 I checked user@spark.apache.org, and found my post there:
 http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/browser

 I am using nabble to send this mail, which indicates that the mail will be
 sent from my email address to the u...@spark.incubator.apache.org mailing
 list.

 Do the two mailing lists share messages ?

 Do we have a nabble interface for user@spark.apache.org mail list ?

 Thank you.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13876.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD memory questions

2014-09-10 Thread Davies Liu
On Wed, Sep 10, 2014 at 1:05 AM, Boxian Dong box...@indoo.rs wrote:
 Thank you very much for your kindly help. I rise some another questions:

- If the RDD is stored in serialized format, is that means that whenever
 the RDD is processed, it will be unpacked and packed again from and back to
 the JVM even they are located on the same machine?
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n13862/rdd_img.png

In PySpark, Yes. But in Spark generally, no, you have several choice
to cache RDD
in Scala, serialized or not.

- Can the RDD be partially unpacked from the serialized state? or when
 every a RDD is touched, it must be fully unpacked, and of course pack again
 afterword.

The items in RDD are deserialized batch by batch, so if you call rdd.take(),
only first small parts of items are deserialized.

The cache of RDD are kept in JVM, you do not need to pack again after
visiting them.

   -  When a RDD is cached, is it saved in a unserialized format or
 serialized format? If it's saved in a unserialized format, is the partially
 reading of RDD from JVM to PYTHON runtime possible?

For PySpark, they are all saved in serialized format. During a transformation
of RDD, you can only see the current partition, you can not access other
partitions or other RDD.

The RDD always are read-only, so you can not modify them any time.
(all the modification will be dropped.)

 Thank you very much



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805p13862.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupBy gives non deterministic results

2014-09-09 Thread Davies Liu
What's the type of the key?

If the hash of key is different across slaves, then you could get this confusing
results. We had met this similar results in Python, because of hash of None
is different across machines.

Davies

On Mon, Sep 8, 2014 at 8:16 AM, redocpot julien19890...@gmail.com wrote:
 Update:

 Just test with HashPartitioner(8) and count on each partition:

 List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
 *(5,657591*), (*6,658327*), (*7,658434*)),
 List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
 *(5,657594)*, (6,658326), (*7,658434*)),
 List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
 *(5,657592)*, (6,658326), (*7,658435*)),
 List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
 *(5,657591)*, (6,658326), (7,658434)),
 List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
 *(5,657592)*, (6,658326), (7,658435)),
 List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
 *(5,657592)*, (6,658326), (7,658435)),
 List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
 *(5,657592)*, (6,658326), (7,658435)),
 List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
 *(5,657591)*, (6,658326), (7,658435))

 The result is not identical for each execution.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13702.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupBy gives non deterministic results

2014-09-09 Thread Davies Liu
Which version of Spark are you using?

This bug had been fixed in 0.9.2, 1.0.2 and 1.1, could you upgrade to
one of these versions
to verify it?

Davies

On Tue, Sep 9, 2014 at 7:03 AM, redocpot julien19890...@gmail.com wrote:
 Thank you for your replies.

 More details here:

 The prog is executed on local mode (single node). Default env params are
 used.

 The test code and the result are in this gist:
 https://gist.github.com/coderh/0147467f0b185462048c

 Here is 10 first lines of the data: 3 fields each row, the delimiter is ;

 3801959;11775022;118
 3801960;14543202;118
 3801984;11781380;20
 3801984;13255417;20
 3802003;11777557;91
 3802055;11781159;26
 3802076;11782793;102
 3802086;17881551;102
 3802087;19064728;99
 3802105;12760994;99
 ...

 There are 27 partitions(small files). Total size is about 100 Mb.

 We find that this problem is highly probably caused by the bug SPARK-2043:
 https://issues.apache.org/jira/browse/SPARK-2043

 Could someone give more details on this bug ?

 The pull request say:

 The current implementation reads one key with the next hash code as it
 finishes reading the keys with the current hash code, which may cause it to
 miss some matches of the next key. This can cause operations like join to
 give the wrong result when reduce tasks spill to disk and there are hash
 collisions, as values won't be matched together. This PR fixes it by not
 reading in that next key, using a peeking iterator instead.

 I don't understand why reading a key with the next hash code will cause it
 to miss some matches of the next key. If someone could show me some code to
 dig in, it's highly appreciated. =)

 Thank you.

 Hao.











 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13797.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD memory questions

2014-09-09 Thread Davies Liu
On Tue, Sep 9, 2014 at 10:07 AM, Boxian Dong box...@indoo.rs wrote:
 I currently working on a machine learning project, which require the RDDs'
 content to be (mostly partially) updated during each iteration. Because the
 program will be converted directly from traditional python object-oriented
 code, the content of the RDD will be modified in the mapping function. To
 test the functionality and memory , I writed a testing program:

class TestClass(object):
 def __init__(self):
 self.data = []

 def setup(self):
 self.data = range(2)
 return self

 def addNumber(self, number):
 length = len(self.data)
 for i in range(length):
 self.data[i] += number
 return self

 def sumUp(self):
 totoal = 0
 for n in self.data:
 totoal += n
 return totoal

 and Spark main:

 origData = []
 for i in range(50):
 origData.append((i, TestClass()))
 # create the RDD and cache it
 rddData = sc.parallelize(origData).mapValues(lambda v : v.setup())
 rddData.cache()

 # modifying the content of RDD in map function
 scD = rddData
 for i in range(100):
  scD = scD.mapValues(lambda v : v.addNumber(10))

 scDSum = scD.map(lambda (k, v) : v.sumUp())
 v = scDSum.reduce(lambda a, b: a + b)

 print  -- after the transfermation, the value is , v

 scDSum = rddData .map(lambda (k, v) : v.sumUp())
 v = scDSum.reduce(lambda a, b: a + b)

 print  -- after the transformation, the cached value is , v

   - By judging the results, it seems to me that when the RDDs is cached, the
 directed modification doesn't affect the content
   - By the monitoring of the memory usage, it seems to me that the memory
 are not duplicated during each RDD (narrow dependence) transformation (or I
 am wrong)

 therefore, my question is:
   - how the cache works, does it make a copy of the data separately ?
   - How the memory is managed in the MAP function? (in narrow dependence)
 Are the entire RDDs first duplicated, modified and then assigned to the new
 RDDs, and afterward the old RDDs will be deleted from the memory. Or the new
 RDDs will reuse the same memory of the old RDDs, without the
 duplication/copy of the memory?

I'm trying to answer some of your questions:

The RDD is cached in JVM (after serialized by pickle). In Python, it
reads the serialized
data from socket then deserialized it into Python objects, call mapper
or reducer on
them, finally sending them back to JVM via socket. The Python process only hold
a batch of objects, so the memory usage will be smaller than the whole
partition.

The cache in JVM is created after first iteration of them. So when you
process them
the second time (or even more), they will be read from cache in JVM.

RDDs are read only, you can not modify them, each transformation will create new
RDDs. During MAP function, the objects in RDDs are throwed away after accessing,
any modification to them will be lost.

   - If the new generated RDDs directly use the memory of the old RDDs (in
 narrow dependence) , why the cached RDDs still reserve old content. Are the
 cached RDDs treated differently from uncached RDDs in memory management.

There is no two RDDs sharing any memory, they are totally separated.






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark on Yarn - how group by data properly

2014-09-09 Thread Davies Liu
On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets oruchov...@gmail.com wrote:
 Hi ,

I came from map/reduce background and try to do quite trivial thing:

 I have a lot of files ( on hdfs ) - format is :

1 , 2 , 3
2 , 3 , 5
1 , 3,  5
 2, 3 , 4
 2 , 5, 1

   I am actually need to group by key (first column) :
   key   values
   1 -- (2,3),(3,5)
   2 -- (3,5),(3,4),(5,1)

   and I need to process (pass)  values to the function f ( my custom
 function)
   outcome of  function f()  should be  to hdfs with corresponding key:
 1 -- f() outcome
 2 -- f() outcome.

 My code is :

   def doSplit(x):
 y = x.split(',')
 if(len(y)==3):
return  y[0],(y[1],y[2])


 lines = sc.textFile(filename,1)
 counts = lines.map(doSplit).groupByKey()
 output = counts.collect()

 for (key, value) in output:
 print 'build model for key -' , key
 print value
 f(str(key) , value))


 Questions:
1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
 groupByKey( f() ) to process grouped values? how can I process grouped keys
 by custom function? function f has some not trivial logic.

The result of groupByKey() is still RDD with (key, ResultIterable(values)),
so you can continue to call map() or mapValues() on it:

lines.map(doSplit).groupByKey().map(f)

But your `f` need two parameters, the map() will assume that `f`
take one parameter, so you need to build a wrapper for `f`:

lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs))

If the `f` only accept values as list, then you need to convert `vs` into list:

result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs)))

finally, you could save the `result` into HDFS:

result.saveAsPickleFile(path, batch=1024)

 2) Using output ( I really don't like this approach )  to pass to
 function looks like not scalable and executed only on one machine?  What is
 the way using PySpark process grouped keys in distributed fashion.
 Multiprocessing and on different machine of the cluster.

 3)In case of  processing output how data can be stored on hdfs?

Currently, it's not easy to access files in HDFS, you could do it by

sc.parallelize(local_data).map(str).saveAsTextFile()

 Thanks
 Oleg.



















-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL check if query is completed (pyspark)

2014-09-06 Thread Davies Liu
The SQLContext.sql() will return an SchemaRDD, you need to call collect()
to pull the data in.

On Sat, Sep 6, 2014 at 6:02 AM, jamborta jambo...@gmail.com wrote:
 Hi,

 I am using Spark SQL to run some administrative queries and joins (e.g.
 create table, insert overwrite, etc), where the query does not return any
 data. I noticed if the query fails it prints some error message on the
 console, but does not actually throw an exception (this is spark 1.0.2).

 Is there any way to get these errors from the returned object?

 thanks,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-check-if-query-is-completed-pyspark-tp13630.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Getting the type of an RDD in spark AND pyspark

2014-09-06 Thread Davies Liu
But you can not get what you expected in PySpark, because the RDD
 in Scala is serialized, so it will always be RDD[Array[Byte]], whatever
 the type of RDD in Python is.

Davies

On Sat, Sep 6, 2014 at 4:09 AM, Aaron Davidson ilike...@gmail.com wrote:
 Pretty easy to do in Scala:

 rdd.elementClassTag.runtimeClass

 You can access this method from Python as well by using the internal _jrdd.
 It would look something like this (warning, I have not tested it):
 rdd._jrdd.classTag().runtimeClass()

 (The method name is classTag for JavaRDDLike, and elementClassTag for
 Scala's RDD.)


 On Thu, Sep 4, 2014 at 1:32 PM, esamanas evan.sama...@gmail.com wrote:

 Hi,

 I'm new to spark and scala, so apologies if this is obvious.

 Every RDD appears to be typed, which I can see by seeing the output in the
 spark-shell when I execute 'take':

 scala val t = sc.parallelize(Array(1,2,3))
 t: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize
 at console:12

 scala t.take(3)
 res4: Array[Int] = Array(1, 2, 3)


 scala val u = sc.parallelize(Array(1,Array(2,2,2,2,2),3))
 u: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[3] at parallelize
 at console:12

 scala u.take(3)
 res5: Array[Any] = Array(1, Array(2, 2, 2, 2, 2), 3)

 Array type stays the same even if only one type returned.
 scala u.take(1)
 res6: Array[Any] = Array(1)


 Is there some way to just get the name of the type of the entire RDD from
 some function call?  Also, I would really like this same functionality in
 pyspark, so I'm wondering if that exists on that side, since clearly the
 underlying RDD is typed (I'd be fine with either the Scala or Python type
 name).

 Thank you,

 Evan




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-type-of-an-RDD-in-spark-AND-pyspark-tp13498.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark on Yarn a lot of python scripts project

2014-09-05 Thread Davies Liu
Hi Oleg,

In order to simplify the process of package and distribute you codes,
you could deploy
an shared storage (such as NFS), and put your project in it, mount it
to all the slaves
as /projects.

In the spark job scripts, you can access your project by put the path
into sys.path, such
as:

import sys
sys.path.append(/projects)
import myproject

Davies

On Fri, Sep 5, 2014 at 1:28 AM, Oleg Ruchovets oruchov...@gmail.com wrote:
 Hi ,
We avaluating PySpark  and successfully executed examples of PySpark on
 Yarn.

 Next step what we want to do:
We have a python project ( bunch of python script using Anaconda
 packages).
 Question:
 What is the way to execute PySpark on Yarn having a lot of python
 files ( ~ 50)?
Should it be packaged in archive?
How the command to execute Pyspark on Yarn with a lot of files will
 looks like?
 Currently command looks like:

 ./bin/spark-submit --master yarn  --num-executors 3  --driver-memory 4g
 --executor-memory 2g --executor-cores 1
 examples/src/main/python/wordcount.py   1000

 Thanks
 Oleg.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark on Yarn a lot of python scripts project

2014-09-05 Thread Davies Liu
Here is a store about how shared storage simplify all the things:

In Douban, we use Moose FS[1] instead of HDFS as the distributed file system,
it's POSIX compatible and can be mounted just as NFS.

We put all the data and tools and code in it, so we can access them easily on
all the machines, just like local disks. You can modify them in anywhere, and
get the modified one from anywhere.

One example, you will want to know the fields in your compressed log files:

$ bunzip2 -k path | head

Then you will need to modify your code to deal with these fields in logs:

$ vim myjob.py

you have bunch of libraries or modules in the projects, but you will not need to
worry about them when run distributed jobs, you just need to do:

$ python myjob.py

If something wrong, you could modify myjob.py and save some RDDs into
disks, then check the results:

$ head path_to_result_of_rdd

maybe something wrong is your library, then fix them, and run again:

$ python myjob.py

dump the result as CSV file, then load them into MySQL

mysql xxx  path_of_the_result

In a summary, a shared storage can help a lot in distributed environment,
some simple solution (such as NFS) is natural to solve these problem.
setup once, benefit forever.

PS: I'm also a contributor of Moose FS, has a fork at github.com/davies/moosefs/

PPS: I'm sorry for my pool English, if the above sounds rude to you,

Davies

[1] http://moosefs.org/

On Fri, Sep 5, 2014 at 11:22 AM, Dimension Data, LLC.
subscripti...@didata.us wrote:

 I'd have to agree with Marcelo and Andrew here...

 Favoring a simple Build-and-Run/Submit wrapper-script that leverages 
 '--py-files file.zip'
 over adding another layer of complexity -- even if seemingly 'trivial' like 
 NFS -- is
 probably a good approach (... b/c more technology is never is 'trivial' over 
 time). =:).
 Less is more.



 On 09/05/2014 01:58 PM, Marcelo Vanzin wrote:

 On Fri, Sep 5, 2014 at 10:50 AM, Davies Liu dav...@databricks.com wrote:

 In daily development, it's common to modify your projects and re-run
 the jobs. If using zip or egg to package your code, you need to do
 this every time after modification, I think it will be boring.

 That's why shell scripts were invented. :-)

 Probably a lot easier than setting up and maintaining shared storage
 in a large cluster.


 --

 Sincerely yours,
 Team Dimension Data
 
 Dimension Data, LLC. | www.didata.us
 P: 212.882.1276 |  subscripti...@didata.us



 Data Analytics you can literally count on.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark sql results maintain order (in python)

2014-09-04 Thread Davies Liu
On Thu, Sep 4, 2014 at 3:42 AM, jamborta jambo...@gmail.com wrote:
 hi,

 I ran into a problem with spark sql, when run a query like this select
 count(*), city, industry from table group by hour and I would like to take
 the results from the shemaRDD

 1, I have to parse each line to get the values out of the dic (eg in order
 to convert it to a csv)

 2, The order is not kept in a python dict - I couldn't find a way to
 maintain the original order (especially a problem in this case, when the
 column names are derived).

In master and upcoming 1.1 release, you will got pyspark.sql.Row objects
from SchemaRDD, which is namedtuple, so it will keep the order as it in
the sql, you can easily convert them into tuple or list.

Also, you can access the fields just like attributes.

 thanks,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-results-maintain-order-in-python-tp13445.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: 2 python installations cause PySpark on Yarn problem

2014-09-04 Thread Davies Liu
Hey Oleg,

In pyspark, you MUST have the same version of Python in all the
machines of the cluster,
which means when you run `python` on these machines, all of them
should be the same
version ( 2.6 or 2.7).

With PYSPARK_PYTHON, you can run pyspark with a specified version of
Python. Also,
you should install this version on all the machines and in the same location.

Davies

On Thu, Sep 4, 2014 at 9:25 AM, Oleg Ruchovets oruchov...@gmail.com wrote:
 Hi  ,
I am  evaluating  the PySpark.
 I have hdp hortonworks installed with python 2.6.6. (I can't remove it since
 it is used by hortonworks). I can successfully execute PySpark on Yarn.

 We need to use Anaconda packages , so I install anaconda. Anaconda is
 installed with python 2.7.7 and it is added to classpath. After installing
 the anaconda Pi example stops to work - I used it for testing PySpark on
 Yarn.

 Question:
How PySpark the can be used with having 2 Python versions on one machine.
 In classpath I have 2.7.7 on every machine.

 How can I check what version is used in runtime executing PySpark 2.7.7?

 Exception I get are the same as in previous emails:

 [root@HDOP-B spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563]#
 ./bin/spark-submit --master yarn  --num-executors 3  --driver-memory 4g
 --executor-memory 2g --executor-cores 1   examples/src/main/python/pi.py
 1000
 /usr/jdk64/jdk1.7.0_45/bin/java
 ::/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/conf:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-
 563.jar:/etc/hadoop/conf
 -XX:MaxPermSize=128m -Djava.library.path= -Xms4g -Xmx4g
 14/09/04 12:53:11 INFO spark.SecurityManager: Changing view acls to: root
 14/09/04 12:53:11 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(root)
 14/09/04 12:53:12 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/09/04 12:53:12 INFO Remoting: Starting remoting
 14/09/04 12:53:12 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sp...@hdop-b.agt:45747]
 14/09/04 12:53:12 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sp...@hdop-b.agt:45747]
 14/09/04 12:53:12 INFO spark.SparkEnv: Registering MapOutputTracker
 14/09/04 12:53:12 INFO spark.SparkEnv: Registering BlockManagerMaster
 14/09/04 12:53:12 INFO storage.DiskBlockManager: Created local directory at
 /tmp/spark-local-20140904125312-c7ea
 14/09/04 12:53:12 INFO storage.MemoryStore: MemoryStore started with
 capacity 2.3 GB.
 14/09/04 12:53:12 INFO network.ConnectionManager: Bound socket to port 37363
 with id = ConnectionManagerId(HDOP-B.AGT,37363)
 14/09/04 12:53:12 INFO storage.BlockManagerMaster: Trying to register
 BlockManager
 14/09/04 12:53:12 INFO storage.BlockManagerInfo: Registering block manager
 HDOP-B.AGT:37363 with 2.3 GB RAM
 14/09/04 12:53:12 INFO storage.BlockManagerMaster: Registered BlockManager
 14/09/04 12:53:12 INFO spark.HttpServer: Starting HTTP Server
 14/09/04 12:53:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/09/04 12:53:12 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:33547
 14/09/04 12:53:12 INFO broadcast.HttpBroadcast: Broadcast server started at
 http://10.193.1.76:33547
 14/09/04 12:53:12 INFO spark.HttpFileServer: HTTP File server directory is
 /tmp/spark-054f4eda-b93b-47d3-87d5-c40e81fc1fe8
 14/09/04 12:53:12 INFO spark.HttpServer: Starting HTTP Server
 14/09/04 12:53:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/09/04 12:53:12 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:54594
 14/09/04 12:53:13 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/09/04 12:53:13 INFO server.AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040
 14/09/04 12:53:13 INFO ui.SparkUI: Started SparkUI at http://HDOP-B.AGT:4040
 14/09/04 12:53:13 WARN util.NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 --args is deprecated. Use --arg instead.
 14/09/04 12:53:14 INFO client.RMProxy: Connecting to ResourceManager at
 HDOP-N1.AGT/10.193.1.72:8050
 14/09/04 12:53:14 INFO yarn.Client: Got Cluster metric info from
 ApplicationsManager (ASM), number of NodeManagers: 6
 14/09/04 12:53:14 INFO yarn.Client: Queue info ... queueName: default,
 queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,
   queueApplicationCount = 0, queueChildQueueCount = 0
 14/09/04 12:53:14 INFO yarn.Client: Max mem capabililty of a single resource
 in this cluster 13824
 14/09/04 12:53:14 INFO yarn.Client: Preparing Local resources
 14/09/04 12:53:15 INFO yarn.Client: Uploading
 file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar
 to
 hdfs://HDOP-B.AGT:8020/user/root/.sparkStaging/application_1409805761292_0005/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar
 14/09/04 12:53:17 INFO yarn.Client: Uploading
 

Re: Spark on Mesos: Pyspark python libraries

2014-09-02 Thread Davies Liu
PYSPARK_PYTHON may work for you, it's used to specify which Python
interpreter should
be used in both driver and worker. For example, if  anaconda was
installed as /anaconda on all the machines, then you can specify
PYSPARK_PYTHON=/anaconda/bin/python to use anaconda virtual
environment in PySpark.

PYSPARK_PYTHON=/anaconda/bin/python spark-submit .py

Or if you want to use it by default, you can put this environment in somewhere:

export PYSPARK_PYTHON=/anaconda/bin/python

On Tue, Sep 2, 2014 at 9:31 AM, Daniel Rodriguez
df.rodriguez...@gmail.com wrote:

 Hi all,

 I am getting started with spark and mesos, I already have spark running on a
 mesos cluster and I am able to start the scala spark and pyspark shells,
 yay!. I still have questions on how to distribute 3rd party python libraries
 since i want to use stuff like nltk and mlib on pyspark that requires numpy.

 I am using salt for the configuration management so it is really easy for me
 to create an anaconda virtual environment and install all the libraries
 there on each mesos slave.

 My main question is if that's the recommended way of doing it 3rd party
 libraries?
 If the answer its yes, how do i tell pyspark to use that virtual environment
 (and not the default python) on the spark workers?

 I notice that there are some addFile addPyFile functions on the SparkContext
 but i don't want to distribute the libraries every single time if I can just
 do that once by writing some salt states for that. I am specially worried
 about numpy and its requirements.

 Hopefully this makes some sense.

 Thanks,
 Daniel Rodriguez

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: u'' notation with pyspark output data

2014-08-29 Thread Davies Liu
u'14.0' means a unicode string, you can convert into str by
u'14.0'.encode('utf8'), or you can convert it into float by
float(u'14.0')

Davies

On Thu, Aug 28, 2014 at 11:22 PM, Oleg Ruchovets oruchov...@gmail.com wrote:
 Hi ,
   I am working with pyspark and doing simple aggregation


 def doSplit(x):
 y = x.split(',')
 if(len(y)==3):
return  y[0],(y[1],y[2])

 counts = lines.map(doSplit).groupByKey()
 output = counts.collect()

 Iterating over output I got such format of the data u'1385501280' , u'14.0'
 , but actually I need to work with 14 instead of u'14.0' and  1385501280
 u'1385501280'

 Question:
how to get actually data without u'' notation?


 Thanks
 Oleg.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: repartitioning an RDD yielding imbalance

2014-08-28 Thread Davies Liu
On Thu, Aug 28, 2014 at 7:00 AM, Rok Roskar rokros...@gmail.com wrote:
 I've got an RDD where each element is a long string (a whole document). I'm 
 using pyspark so some of the handy partition-handling functions aren't 
 available, and I count the number of elements in each partition with:

 def count_partitions(id, iterator):
 c = sum(1 for _ in iterator)
 yield (id,c)

 rdd.mapPartitionsWithSplit(count_partitions).collectAsMap()

 This returns the following:

 {0: 866, 1: 1158, 2: 828, 3: 876}

 But if I do:

 rdd.repartition(8).mapPartitionsWithSplit(count_partitions).collectAsMap()

 I get

 {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 3594, 7: 134}

 Why this strange redistribution of elements? I'm obviously misunderstanding 
 how spark does the partitioning -- is it a problem with having a list of 
 strings as an RDD?

This imbalance was introduce by BatchedDeserializer. By default,
Python elements in RDD are serialized by pickle in batch (1024
elements in one batch),
so in the view of Scala, it only see one or two element of Array[Byte]
in the RDD, then imbalance happened.

To fix this, you could change the default batchSize to 10 (or less) or
reserialize your RDD as in unbatched mode, for example:

sc = SparkContext(batchSize=10)
rdd = sc.textFile().repartition(8)

OR

rdd._reserialize(PickleSerializer()).repartition(8)

PS: _reserialize() is not an public API, so it may be changed in the future.

Davies

 Help vey much appreciated!

 Thanks,

 Rok


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python script runs fine in local mode, errors in other modes

2014-08-19 Thread Davies Liu
Could you post the completed stacktrace?

On Tue, Aug 19, 2014 at 10:47 AM, Aaron aaron.doss...@target.com wrote:
 Hello, I have a relatively simple python program that works just find in
 local most (--master local) but produces a strange error when I try to run
 it via Yarn ( --deploy-mode client --master yarn) or just execute the code
 through pyspark. Here's the code: sc = SparkContext(appName=foo) input =
 sc.textFile(hdfs://[valid hdfs path]) mappedToLines = input.map(lambda
 myline: myline.split(,)) The third line yields this error: TypeError:
 'bool' object is not callable But myline seems to be a valid string if I
 look at it this way:  mappedToLines = input.map(lambda myline:
 len(myline))  mappedToLines.collect() [84, 104, 109, 89, 108, 92, 89, 90,
 93, 102] I just now have access to a Hadoop cluster with Spark installed, so
 hopefully I'm running into some simple issues that I never had to deal with
 when testing in my own sandbox in purely local mode before. Any help would
 be appreciated, thanks! -Aaron
 
 View this message in context: Python script runs fine in local mode, errors
 in other modes
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python script runs fine in local mode, errors in other modes

2014-08-19 Thread Davies Liu
This script run very well without your CSV file. Could download you
CSV file into local disks, and narrow down to the lines which triggle
this issue?

On Tue, Aug 19, 2014 at 12:02 PM, Aaron aaron.doss...@target.com wrote:
 These three lines of python code cause the error for me:

 sc = SparkContext(appName=foo)
 input = sc.textFile(hdfs://[valid hdfs path])
 mappedToLines = input.map(lambda myline: myline.split(,))

 The file I'm loading is a simple CSV.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Python-script-runs-fine-in-local-mode-errors-in-other-modes-tp12390p12398.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Segmented fold count

2014-08-18 Thread Davies Liu
 import itertools
 l = [1,1,1,2,2,3,4,4,5,1]
 gs = itertools.groupby(l)
 map(lambda (n, it): (n, sum(1 for _ in it)), gs)
[(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)]

def groupCount(l):
   gs = itertools.groupby(l)
   return map(lambda (n, it): (n, sum(1 for _ in it)), gs)

If you have an RDD, you can use RDD.mapPartitions(groupCount).collect()

On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote:
 Can anyone assist with a scan of the following kind (Python preferred, but
 whatever..)? I'm looking for a kind of segmented fold count.

 Input: [1,1,1,2,2,3,4,4,5,1]
 Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)]
 or preferably two output columns:
 id: [1,2,3,4,5,1]
 count: [3,2,1,2,1,1]

 I can use a groupby/count, except for the fact that I just want to scan -
 not resort. Ideally this would be as low-level as possible and perform in a
 simple single scan. It also needs to retain the original sort order.

 Thoughts?




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Segmented fold count

2014-08-18 Thread Davies Liu
On Sun, Aug 17, 2014 at 11:07 PM, Andrew Ash and...@andrewash.com wrote:
 What happens when a run of numbers is spread across a partition boundary?  I
 think you might end up with two adjacent groups of the same value in that
 situation.

Yes, need another scan to combine this continuous groups with same value.

 On Mon, Aug 18, 2014 at 2:05 AM, Davies Liu dav...@databricks.com wrote:

  import itertools
  l = [1,1,1,2,2,3,4,4,5,1]
  gs = itertools.groupby(l)
  map(lambda (n, it): (n, sum(1 for _ in it)), gs)
 [(1, 3), (2, 2), (3, 1), (4, 2), (5, 1), (1, 1)]

 def groupCount(l):
gs = itertools.groupby(l)
return map(lambda (n, it): (n, sum(1 for _ in it)), gs)

 If you have an RDD, you can use RDD.mapPartitions(groupCount).collect()

 On Sun, Aug 17, 2014 at 10:34 PM, fil f...@pobox.com wrote:
  Can anyone assist with a scan of the following kind (Python preferred,
  but
  whatever..)? I'm looking for a kind of segmented fold count.
 
  Input: [1,1,1,2,2,3,4,4,5,1]
  Output: [(1,3), (2, 2), (3, 1), (4, 2), (5, 1), (1,1)]
  or preferably two output columns:
  id: [1,2,3,4,5,1]
  count: [3,2,1,2,1,1]
 
  I can use a groupby/count, except for the fact that I just want to scan
  -
  not resort. Ideally this would be as low-level as possible and perform
  in a
  simple single scan. It also needs to retain the original sort order.
 
  Thoughts?
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: application as a service

2014-08-18 Thread Davies Liu
Another option is using Tachyon to cache the RDD, then the cache can
be shared by different applications.  See how to use Spark with
Tachyon: http://tachyon-project.org/Running-Spark-on-Tachyon.html

Davies

On Sun, Aug 17, 2014 at 4:48 PM, ryaminal tacmot...@gmail.com wrote:
 You can also look into using ooyala's job server at
 https://github.com/ooyala/spark-jobserver

 This already has a spary server built in that allows you to do what has
 already been explained above. Sounds like it should solve your problem.

 Enjoy!



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/application-as-a-service-tp12253p12267.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Merging complicated small matrices to one big matrix

2014-08-18 Thread Davies Liu
rdd.flatMap(lambda x:x) maybe could solve your problem, it will
convert an RDD from

[[[1,2,3],[4,5,6]],[[7,8,9,],[10,11,12]]]

into:

[[1,2,3], [4,5,6], [7,8,9,], [10,11,12]]

On Mon, Aug 18, 2014 at 2:42 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 I have an rdd in pyspark which looks like follows:
 It has two sub matrices..(array,array)
 [
   array([[-13.00771575,   0.2740844 ,   0.9752694 ,   0.67465999,
  -1.45741537,   0.546775  ,   0.7900841 ,  -0.59473707,
  -1.11752044,   0.61564356],
[ -0.,  12.20115746,  -0.49016935,  -0.9236129 ,
  -1.1693633 ,  -0.39135626,   1.10752864,   0.16920118,
  -1.098806  ,   1.10045185],
[  0.,   0., -11.26425992,   0.56309152,
   0.44872832,   0.69722768,   0.84200281,   0.89537327,
   0.10460865,  -0.62938474],
[ -0.,  -0.,   0.,  13.1112119 ,
   0.39986223,  -1.22218936,   0.72315955,   0.12208597,
  -0.6258082 ,  -0.91077504],
[  0.,  -0.,   0.,   0.,
 -11.04483145,  -1.71948244,  -0.73239228,  -0.19651712,
  -0.97931725,  -0.43263423],
[  0.,   0.,   0.,  -0.,
   0., -12.1996715 ,  -0.05580816,   0.20517336,
   0.53584998,   1.3370874 ],
[  0.,  -0.,  -0.,   0.,
   0.,   0.,  12.32603631,   0.47498103,
  -0.65901705,  -0.85713277],
[  0.,   0.,   0.,  -0.,
   0.,  -0.,   0.,  11.90030251,
   1.73036795,   0.70588443],
[ -0.,  -0.,   0.,   0.,
  -0.,  -0.,   0.,  -0.,
  13.00493769,   1.37753403],
[  0.,  -0.,   0.,   0.,
   0.,  -0.,   0.,   0.,
  -0., -10.89006283]]),

 array([[-12.43375184,   1.07703619,  -0.47818221,   1.65919732,
   0.96307502,  -1.6322447 ,  -1.09409297,  -0.64849112,
  -1.09349557,  -0.68706834],
[  0., -11.93616969,   0.08784614,   1.76677411,
  -0.0585134 ,  -0.70979485,   0.29757848,   1.19562173,
  -1.54176475,   1.71500862],
[  0.,  -0., -12.42060272,   2.17060365,
  -1.3212244 ,   0.73742297,   0.50410937,  -0.35278129,
  -0.40513689,  -0.81222302],
[ -0.,   0.,   0., -11.93419851,
  -1.15614929,   1.04085489,   0.69986351,  -1.3615322 ,
   0.43467842,  -1.33041858],
[ -0.,  -0.,   0.,   0.,
  11.22907137,  -0.12925322,   0.46293906,  -2.01577912,
  -2.26566926,  -0.17750339],
[  0.,   0.,   0.,   0.,
  -0., -12.0705513 ,  -0.19432359,   0.41226088,
   0.79436699,  -0.61288711],
[  0.,  -0.,   0.,   0.,
  -0.,  -0.,  11.99770753,  -1.24277228,
   1.32240282,   1.5140609 ],
[ -0.,   0.,  -0.,  -0.,
   0.,  -0.,   0., -13.07008472,
   0.52031563,  -1.56247391],
[  0.,  -0.,   0.,   0.,
  -0.,  -0.,  -0.,  -0.,
  13.16585107,   0.57741265],
[  0.,   0.,  -0.,  -0.,
   0.,   0.,   0.,  -0.,
  -0., -13.53719704]])
 ]

 So, basically I  have sub matrices like [sub_matrix_1, sub_matrix_2 ]
 (the above has just two matrices..

 I want to combine in one big matrix column wise

 [ sub_matrix_1
  sub_matrix_2
 
 ]
 Any suggestions?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Segmented fold count

2014-08-18 Thread Davies Liu
On Mon, Aug 18, 2014 at 7:41 PM, fil f...@pobox.com wrote:
 fil wrote
 - Python functions like groupCount; these get reflected from their Python
 AST and converted into a Spark DAG? Presumably if I try and do something
 non-convertible this transformation process will throw an error? In other
 words this runs in the JVM.

 Further to this - it seems that Python does run on each node in the cluster,
 meaning it runs outside the JVM. Presumably this means that writing this in
 Scala would be far more performant.

 Could I write groupCount() in Scala, and then use it from Pyspark? Care to
 supply an example, I'm finding them hard to find :)

It's doable, but not so convenient. If you really care about the performance
difference, you should write your program in Scala.


 fil wrote
 - I had considered that partitions were batches of distributable work,
 and generally large. Presumably the above is OK with small groups (eg.
 average size  10) - this won't kill performance?

 I'm still a bit confused about the dual meaning of partition: work
 segmentation, and key groups. Care to clarify anyone - when are partitions
 used to describe chunks of data for different nodes in the cluster (ie.
 large), and when are they used to describe groups of items in data (ie.
 small)..

An partition means a chunk of data in RDD, the computation on a partition
is a task, which will be sent to a node in the cluster.


 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Segmented-fold-count-tp12278p12342.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Question on mappartitionwithsplit

2014-08-17 Thread Davies Liu
On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 Hi,
   Thanks for the response..
 In the second case f2??
 foo will have to be declared globablly??right??

 My function is somthing like:
 def indexing(splitIndex, iterator):
   count = 0
   offset = sum(offset_lists[:splitIndex]) if splitIndex else 0
   indexed = []
   for i, e in enumerate(iterator):
 index = count + offset + i
 for j, ele in enumerate(e):
   indexed.append((index, j, ele))
   yield indexed

In this function `indexing`, `offset_lists` should be global.

 def another_funct(offset_lists):
 #get that damn offset_lists
 rdd.mapPartitionsWithSplit(indexing)
 But then, the issue is that offset_lists?
 Any suggestions?

Basically, you can do what you do in normal Python program, PySpark
will send the global variables or closures to worker processes automatically.

So, you can :

def indexing(splitIndex, iterator, offset_lists):
 pass

def another_func(offset_lists):
 rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it,
offset_lists))

Or:

def indexing(splitIndex, iterrator):
 # access offset_lists

def another_func(offset):
 global offset_lists
 offset_lists = offset
 rdd. mapPartitionsWithSplit(indexing)

Or:

def another_func(offset_lists):
  def indexing(index, iterator):
  # access offset_lists
  pass
  rdd.mapPartitionsWithIndex(indexing)



 On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu dav...@databricks.com wrote:

 The callback function f only accept 2 arguments, if you want to pass
 another objects to it, you need closure, such as:

 foo=xxx
 def f(index, iterator, foo):
  yield (index, foo)
 rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo))

 also you can make f become `closure`:

 def f2(index, iterator):
 yield (index, foo)
 rdd.mapPartitionsWithIndex(f2)

 On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu chengi.liu...@gmail.com
 wrote:
  Hi,
In this example:
 
  http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit
  Let say, f takes three arguments:
  def f(splitIndex, iterator, foo): yield splitIndex
  Now, how do i send this foo parameter to this method?
  rdd.mapPartitionsWithSplit(f)
  Thanks
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset

2014-08-13 Thread Davies Liu
Arpan,

Which version of Spark are you using? Could you try the master or 1.1
branch? which can spill the data into disk during groupByKey().

PS: it's better to use reduceByKey() or combineByKey() to reduce data
size during shuffle.

Maybe there is a huge key in the data sets, you can find it in this way:

rdd.countByKey().sortBy(lambda x:x[1], False).take(10)

Davies


On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh ar...@automatic.com wrote:
 Hi,

 Let me begin by describing my Spark setup on EC2 (launched using the
 provided spark-ec2.py script):

 100 c3.2xlarge workers (8 cores  15GB memory each)
 1 c3.2xlarge Master (only running master daemon)
 Spark 1.0.2
 8GB mounted at /  80 GB mounted at /mnt

 spark-defaults.conf (A lot of config options have been added here to try and
 fix the problem. I also encounter the problem while running with the default
 options)

 spark.executor.memory   12991m
 spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
 spark.executor.extraClassPath   /root/ephemeral-hdfs/conf
 spark.shuffle.file.buffer.kb1024
 spark.reducer.maxMbInFlight 96
 spark.serializer.objectStreamReset  10
 spark.akka.frameSize100
 spark.akka.threads  32
 spark.akka.timeout  1000
 spark.serializerorg.apache.spark.serializer.KryoSerializer

 spark-env.sh (A lot of config options have been added here to try and fix
 the problem. I also encounter the problem while running with the default
 options)

 export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark
 export SPARK_MASTER_OPTS=-Dspark.worker.timeout=900
 export SPARK_WORKER_INSTANCES=1
 export SPARK_WORKER_CORES=8
 export HADOOP_HOME=/root/ephemeral-hdfs
 export SPARK_MASTER_IP=Master's Public DNS, as added by spark-ec2.py
 script
 export MASTER=`cat /root/spark-ec2/cluster-url`
 export
 SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/
 export
 SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf
 export SPARK_PUBLIC_DNS=wget command to get the public hostname, as added
 by spark-ec2.py script

 # Set a high ulimit for large shuffles

 ulimit -n 1000


 I am trying to run a very simple Job which reads in CSV data (~ 124 GB) from
 a S3 bucket, tries to group it based on a key and counts the number of
 groups. The number of partitions for the input textFile() is set to 1600 and
 the number of partitions for the groupByKey() operation is also 1600

 conf = SparkConf().setAppName(JOB_NAME).setMaster(master)
 sc = SparkContext(conf=sconf)

 drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions)


 drive_grouped_by_user_vin_and_week =
 drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\

 .groupByKey(numPartitions=user_vin_week_group_partitions)\

 .count()


 Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159
 seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of
 which 1595 complete in under a minute. The same 5 TIDs consistently fail
 with the following errors in the logs of their respective Executors:

 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203

 org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)

 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141)

 at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145)

 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)

 at org.apache.spark.scheduler.Task.run(Task.scala:51)

 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

 Caused by: java.io.EOFException

 at java.io.DataInputStream.readInt(DataInputStream.java:392)

 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92)

 ... 10 more

 14/08/13 02:45:30 ERROR python.PythonRDD: Python worker exited unexpectedly
 (crashed)

 java.net.SocketException: Connection reset

 at java.net.SocketInputStream.read(SocketInputStream.java:196)

 at java.net.SocketInputStream.read(SocketInputStream.java:122)

 at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)

 at java.io.BufferedInputStream.read(BufferedInputStream.java:254)

 at java.io.DataInputStream.readInt(DataInputStream.java:387)

 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92)

 at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145)

 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

 at 

Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset

2014-08-13 Thread Davies Liu
The 1.1 release will come out this or next month, we will really appreciate that
if you could test it with you real case.

Davies

On Wed, Aug 13, 2014 at 1:57 PM, Arpan Ghosh ar...@automatic.com wrote:
 Thanks Davies. I am running Spark 1.0.2 (which seems to be the latest
 release)

 I'll try changing it to a reduceByKey() and check the size of the largest
 key and post the results here.

 UPDATE: If I run this job and DO NOT specify the number of partitions for
 the input textFile() (124 GB being read in from S3), Spark launches 41 tasks
 for the flatMap(). However, this time, none of the flatMap() tasks complete
 and I start seeing the same Connection Reset errors.


 On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu dav...@databricks.com wrote:

 Arpan,

 Which version of Spark are you using? Could you try the master or 1.1
 branch? which can spill the data into disk during groupByKey().

 PS: it's better to use reduceByKey() or combineByKey() to reduce data
 size during shuffle.

 Maybe there is a huge key in the data sets, you can find it in this way:

 rdd.countByKey().sortBy(lambda x:x[1], False).take(10)

 Davies


 On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh ar...@automatic.com wrote:
  Hi,
 
  Let me begin by describing my Spark setup on EC2 (launched using the
  provided spark-ec2.py script):
 
  100 c3.2xlarge workers (8 cores  15GB memory each)
  1 c3.2xlarge Master (only running master daemon)
  Spark 1.0.2
  8GB mounted at /  80 GB mounted at /mnt
 
  spark-defaults.conf (A lot of config options have been added here to try
  and
  fix the problem. I also encounter the problem while running with the
  default
  options)
 
  spark.executor.memory   12991m
  spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
  spark.executor.extraClassPath   /root/ephemeral-hdfs/conf
  spark.shuffle.file.buffer.kb1024
  spark.reducer.maxMbInFlight 96
  spark.serializer.objectStreamReset  10
  spark.akka.frameSize100
  spark.akka.threads  32
  spark.akka.timeout  1000
  spark.serializerorg.apache.spark.serializer.KryoSerializer
 
  spark-env.sh (A lot of config options have been added here to try and
  fix
  the problem. I also encounter the problem while running with the default
  options)
 
  export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark
  export SPARK_MASTER_OPTS=-Dspark.worker.timeout=900
  export SPARK_WORKER_INSTANCES=1
  export SPARK_WORKER_CORES=8
  export HADOOP_HOME=/root/ephemeral-hdfs
  export SPARK_MASTER_IP=Master's Public DNS, as added by spark-ec2.py
  script
  export MASTER=`cat /root/spark-ec2/cluster-url`
  export
 
  SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/
  export
 
  SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf
  export SPARK_PUBLIC_DNS=wget command to get the public hostname, as
  added
  by spark-ec2.py script
 
  # Set a high ulimit for large shuffles
 
  ulimit -n 1000
 
 
  I am trying to run a very simple Job which reads in CSV data (~ 124 GB)
  from
  a S3 bucket, tries to group it based on a key and counts the number of
  groups. The number of partitions for the input textFile() is set to 1600
  and
  the number of partitions for the groupByKey() operation is also 1600
 
  conf = SparkConf().setAppName(JOB_NAME).setMaster(master)
  sc = SparkContext(conf=sconf)
 
  drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions)
 
 
  drive_grouped_by_user_vin_and_week =
  drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\
 
  .groupByKey(numPartitions=user_vin_week_group_partitions)\
 
  .count()
 
 
  Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159
  seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of
  which 1595 complete in under a minute. The same 5 TIDs consistently fail
  with the following errors in the logs of their respective Executors:
 
  14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203
 
  org.apache.spark.SparkException: Python worker exited unexpectedly
  (crashed)
 
  at
  org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141)
 
  at
  org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145)
 
  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
 
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 
  at org.apache.spark.scheduler.Task.run(Task.scala:51)
 
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 
  at
 
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
  at
 
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 
  at java.lang.Thread.run(Thread.java:745)
 
  Caused by: java.io.EOFException
 
  at java.io.DataInputStream.readInt

Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset

2014-08-13 Thread Davies Liu
In Spark (Scala/Java), it will spill the data to disk, but in PySpark,
it will not.

On Wed, Aug 13, 2014 at 2:10 PM, Arpan Ghosh ar...@automatic.com wrote:
 So you are saying that in-spite of spark.shuffle.spill being set to true by
 default, version 1.0.2 does not spill data to disk during a groupByKey()?


 On Wed, Aug 13, 2014 at 2:05 PM, Davies Liu dav...@databricks.com wrote:

 The 1.1 release will come out this or next month, we will really
 appreciate that
 if you could test it with you real case.

 Davies

 On Wed, Aug 13, 2014 at 1:57 PM, Arpan Ghosh ar...@automatic.com wrote:
  Thanks Davies. I am running Spark 1.0.2 (which seems to be the latest
  release)
 
  I'll try changing it to a reduceByKey() and check the size of the
  largest
  key and post the results here.
 
  UPDATE: If I run this job and DO NOT specify the number of partitions
  for
  the input textFile() (124 GB being read in from S3), Spark launches 41
  tasks
  for the flatMap(). However, this time, none of the flatMap() tasks
  complete
  and I start seeing the same Connection Reset errors.
 
 
  On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu dav...@databricks.com
  wrote:
 
  Arpan,
 
  Which version of Spark are you using? Could you try the master or 1.1
  branch? which can spill the data into disk during groupByKey().
 
  PS: it's better to use reduceByKey() or combineByKey() to reduce data
  size during shuffle.
 
  Maybe there is a huge key in the data sets, you can find it in this
  way:
 
  rdd.countByKey().sortBy(lambda x:x[1], False).take(10)
 
  Davies
 
 
  On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh ar...@automatic.com
  wrote:
   Hi,
  
   Let me begin by describing my Spark setup on EC2 (launched using the
   provided spark-ec2.py script):
  
   100 c3.2xlarge workers (8 cores  15GB memory each)
   1 c3.2xlarge Master (only running master daemon)
   Spark 1.0.2
   8GB mounted at /  80 GB mounted at /mnt
  
   spark-defaults.conf (A lot of config options have been added here to
   try
   and
   fix the problem. I also encounter the problem while running with the
   default
   options)
  
   spark.executor.memory   12991m
   spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
   spark.executor.extraClassPath   /root/ephemeral-hdfs/conf
   spark.shuffle.file.buffer.kb1024
   spark.reducer.maxMbInFlight 96
   spark.serializer.objectStreamReset  10
   spark.akka.frameSize100
   spark.akka.threads  32
   spark.akka.timeout  1000
   spark.serializerorg.apache.spark.serializer.KryoSerializer
  
   spark-env.sh (A lot of config options have been added here to try and
   fix
   the problem. I also encounter the problem while running with the
   default
   options)
  
   export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark
   export SPARK_MASTER_OPTS=-Dspark.worker.timeout=900
   export SPARK_WORKER_INSTANCES=1
   export SPARK_WORKER_CORES=8
   export HADOOP_HOME=/root/ephemeral-hdfs
   export SPARK_MASTER_IP=Master's Public DNS, as added by spark-ec2.py
   script
   export MASTER=`cat /root/spark-ec2/cluster-url`
   export
  
  
   SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/
   export
  
  
   SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf
   export SPARK_PUBLIC_DNS=wget command to get the public hostname, as
   added
   by spark-ec2.py script
  
   # Set a high ulimit for large shuffles
  
   ulimit -n 1000
  
  
   I am trying to run a very simple Job which reads in CSV data (~ 124
   GB)
   from
   a S3 bucket, tries to group it based on a key and counts the number
   of
   groups. The number of partitions for the input textFile() is set to
   1600
   and
   the number of partitions for the groupByKey() operation is also 1600
  
   conf = SparkConf().setAppName(JOB_NAME).setMaster(master)
   sc = SparkContext(conf=sconf)
  
   drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions)
  
  
   drive_grouped_by_user_vin_and_week =
   drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\
  
   .groupByKey(numPartitions=user_vin_week_group_partitions)\
  
   .count()
  
  
   Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159
   seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out
   of
   which 1595 complete in under a minute. The same 5 TIDs consistently
   fail
   with the following errors in the logs of their respective Executors:
  
   14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203
  
   org.apache.spark.SparkException: Python worker exited unexpectedly
   (crashed)
  
   at
  
   org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141)
  
   at
  
   org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145)
  
   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
  
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262

Re: groupByKey() completes 99% on Spark + EC2 + S3 but then throws java.net.SocketException: Connection reset

2014-08-13 Thread Davies Liu
For the hottest key, it will need about 1-2 GB memory for Python
worker to do groupByKey().

These configurations can not help with the memory of Python worker.

So, two options:

1) use reduceByKey() or combineByKey() to reduce the memory
consumption in Python worker.
2) try master or 1.1 branch with the feature of spilling in Python.

Davies

On Wed, Aug 13, 2014 at 4:08 PM, Arpan Ghosh ar...@automatic.com wrote:
 Here are the biggest keys:

 [   (17634, 87874097),

 (8407, 38395833),

 (20092, 14403311),

 (9295, 4142636),

 (14359, 3129206),

 (13051, 2608708),

 (14133, 2073118),

 (4571, 2053514),

 (16175, 2021669),

 (5268, 1908557),

 (3669, 1687313),

 (14051, 1628416),

 (19660, 1619860),

 (10206, 1546037),

 (3740, 1527272),

 (426, 1522788),


 Should I try to increase spark.shuffle.memoryFraction and decrease
 spark.storage.memoryFraction ?



 On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu dav...@databricks.com wrote:

 Arpan,

 Which version of Spark are you using? Could you try the master or 1.1
 branch? which can spill the data into disk during groupByKey().

 PS: it's better to use reduceByKey() or combineByKey() to reduce data
 size during shuffle.

 Maybe there is a huge key in the data sets, you can find it in this way:

 rdd.countByKey().sortBy(lambda x:x[1], False).take(10)

 Davies


 On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh ar...@automatic.com wrote:
  Hi,
 
  Let me begin by describing my Spark setup on EC2 (launched using the
  provided spark-ec2.py script):
 
  100 c3.2xlarge workers (8 cores  15GB memory each)
  1 c3.2xlarge Master (only running master daemon)
  Spark 1.0.2
  8GB mounted at /  80 GB mounted at /mnt
 
  spark-defaults.conf (A lot of config options have been added here to try
  and
  fix the problem. I also encounter the problem while running with the
  default
  options)
 
  spark.executor.memory   12991m
  spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
  spark.executor.extraClassPath   /root/ephemeral-hdfs/conf
  spark.shuffle.file.buffer.kb1024
  spark.reducer.maxMbInFlight 96
  spark.serializer.objectStreamReset  10
  spark.akka.frameSize100
  spark.akka.threads  32
  spark.akka.timeout  1000
  spark.serializerorg.apache.spark.serializer.KryoSerializer
 
  spark-env.sh (A lot of config options have been added here to try and
  fix
  the problem. I also encounter the problem while running with the default
  options)
 
  export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark
  export SPARK_MASTER_OPTS=-Dspark.worker.timeout=900
  export SPARK_WORKER_INSTANCES=1
  export SPARK_WORKER_CORES=8
  export HADOOP_HOME=/root/ephemeral-hdfs
  export SPARK_MASTER_IP=Master's Public DNS, as added by spark-ec2.py
  script
  export MASTER=`cat /root/spark-ec2/cluster-url`
  export
 
  SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/
  export
 
  SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf
  export SPARK_PUBLIC_DNS=wget command to get the public hostname, as
  added
  by spark-ec2.py script
 
  # Set a high ulimit for large shuffles
 
  ulimit -n 1000
 
 
  I am trying to run a very simple Job which reads in CSV data (~ 124 GB)
  from
  a S3 bucket, tries to group it based on a key and counts the number of
  groups. The number of partitions for the input textFile() is set to 1600
  and
  the number of partitions for the groupByKey() operation is also 1600
 
  conf = SparkConf().setAppName(JOB_NAME).setMaster(master)
  sc = SparkContext(conf=sconf)
 
  drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions)
 
 
  drive_grouped_by_user_vin_and_week =
  drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\
 
  .groupByKey(numPartitions=user_vin_week_group_partitions)\
 
  .count()
 
 
  Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159
  seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of
  which 1595 complete in under a minute. The same 5 TIDs consistently fail
  with the following errors in the logs of their respective Executors:
 
  14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203
 
  org.apache.spark.SparkException: Python worker exited unexpectedly
  (crashed)
 
  at
  org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141)
 
  at
  org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145)
 
  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
 
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 
  at org.apache.spark.scheduler.Task.run(Task.scala:51)
 
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 
  at
 
  java.util.concurrent.ThreadPoolExecutor.runWorker

Re: error with pyspark

2014-08-10 Thread Davies Liu
On Fri, Aug 8, 2014 at 9:12 AM, Baoqiang Cao bqcaom...@gmail.com wrote:
 Hi There

 I ran into a problem and can’t find a solution.

 I was running bin/pyspark  ../python/wordcount.py

you could use bin/spark-submit  ../python/wordcount.py

 The wordcount.py is here:

 
 import sys
 from operator import add

 from pyspark import SparkContext

 datafile = '/mnt/data/m1.txt'

 sc = SparkContext()
 outfile = datafile + '.freq'
 lines = sc.textFile(datafile, 1)
 counts = lines.flatMap(lambda x: x.split(' ')) \
 .map(lambda x: (x, 1)) \
 .reduceByKey(add)
 output = counts.collect()

 outf = open(outfile, 'w')

 for (word, count) in output:
outf.write(word.encode('utf-8') + '\t' + str(count) + '\n')
 outf.close()
 


 The error message is here:

 14/08/08 16:01:59 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 0)
 java.io.FileNotFoundException:
 /tmp/spark-local-20140808160150-d36b/12/shuffle_0_0_468 (Too many open
 files)

This message means that the Spark (JVM) had reach  the max number of open files,
there are fd leak some where, unfortunately I can not reproduce this
problem.  What
is the version of Spark?

 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:107)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:175)
 at
 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
 at
 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)


 The m1.txt is about 4G, and I have 120GB Ram and used -Xmx120GB. It is on
 Ubuntu. Any help please?

 Best
 Baoqiang Cao
 Blog: http://baoqiang.org
 Email: bqcaom...@gmail.com





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark + executor lost

2014-08-07 Thread Davies Liu
What is the environment ? YARN or Mesos or Standalone?

It will be more helpful if you could show more loggings.

On Wed, Aug 6, 2014 at 7:25 PM, Avishek Saha avishek.s...@gmail.com wrote:
 Hi,

 I get a lot of executor lost error for saveAsTextFile with PySpark
 and Hadoop 2.4.

 For small datasets this error occurs but since the dataset is small it
 gets eventually written to the file.
 For large datasets, it takes forever to write the final output.

 Any help is appreciated.
 Avishek

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark, numpy arrays and binary data

2014-08-07 Thread Davies Liu
On Thu, Aug 7, 2014 at 12:06 AM, Rok Roskar rokros...@gmail.com wrote:
 sure, but if you knew that a numpy array went in on one end, you could safely 
 use it on the other end, no? Perhaps it would require an extension of the RDD 
 class and overriding the colect() method.

 Could you give a short example about how numpy array is used in your project?


 sure -- basically our main data structure is a container class (acts like a 
 dictionary) that holds various arrays that represent particle data. Each 
 particle has various properties, position, velocity, mass etc. you get at 
 these individual properties by calling something like

 s['pos']

 where 's' is the container object and 'pos' is the name of the array. A 
 really common use case then is to select particles based on their properties 
 and do some plotting, or take a slice of the particles, e.g. you might do

 r = np.sqrt((s['pos']**2).sum(axis=1))
 ind = np.where(r  5)
 plot(s[ind]['x'], s[ind]['y'])

 Internally, the various arrays are kept in a dictionary -- I'm hoping to 
 write a class that keeps them in an RDD instead. To the user, this would have 
 to be transparent, i.e. if the user wants to get at the data for specific 
 particles, she would just have to do

 s['pos'][1,5,10]

 for example, and the data would be fetched for her from the RDD just like it 
 would be if she were simply using the usual single-machine version. This is 
 why the writing to/from files when retrieving data from the RDD really is a 
 no-go -- can you recommend how this can be circumvented?

RDD is expected as distributed, so accessing the items in RDD by key
or indices directly will not be easy. So I think you can not mapping
this interface to an RDD, or the result will be what user expected,
such as very very slow.

In order to parallelize the computation, most of them should be done
by transformation of RDDs. Finally, fetch the data from RDD by
collect(), then do the plotting stuff. Can this kind of work flow work
for you cases?

Davies

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: trouble with jsonRDD and jsonFile in pyspark

2014-08-06 Thread Davies Liu
There is a PR to fix this: https://github.com/apache/spark/pull/1802

On Tue, Aug 5, 2014 at 10:11 PM, Brad Miller bmill...@eecs.berkeley.edu wrote:
 I concur that printSchema works; it just seems to be operations that use the
 data where trouble happens.

 Thanks for posting the bug.

 -Brad


 On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai yh...@databricks.com wrote:

 I tried jsonRDD(...).printSchema() and it worked. Seems the problem is
 when we take the data back to the Python side, SchemaRDD#javaToPython failed
 on your cases. I have created
 https://issues.apache.org/jira/browse/SPARK-2875 to track it.

 Thanks,

 Yin


 On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I checked out and built master.  Note that Maven had a problem building
 Kafka (in my case, at least); I was unable to fix this easily so I moved on
 since it seemed unlikely to have any influence on the problem at hand.

 Master improves functionality (including the example Nicholas just
 demonstrated) but unfortunately there still seems to be a bug related to
 using dictionaries as values.  I've put some code below to illustrate the
 bug.

 # dictionary as value works fine
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1:
  value}}'])).collect()
 [Row(key0=Row(key1=u'value'))]

 # dictionary as value works fine, even when inner keys are varied
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {key1: value1}}',
  '{key0: {key2: value2}}'])).collect()
 [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
 key2=u'value2'))]

 # dictionary as value works fine when inner keys are missing and outer
 key is present
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: {}}', '{key0: {key1:
  value1}}'])).collect()
 [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]

 # dictionary as value FAILS when outer key is missing
  print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: {key1:
  value1}}'])).collect()
 Py4JJavaError: An error occurred while calling o84.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage
 7.0 (TID 242, engelland.research.intel-research.net):
 java.lang.NullPointerException...

 # dictionary as value FAILS when outer key is present with null value
  print sqlCtx.jsonRDD(sc.parallelize(['{key0: null}', '{key0:
  {key1: value1}}'])).collect()
 Py4JJavaError: An error occurred while calling o98.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage
 9.0 (TID 305, kunitz.research.intel-research.net):
 java.lang.NullPointerException...

 # nested lists work even when outer key is missing
  print sqlCtx.jsonRDD(sc.parallelize(['{}', '{key0: [[item0,
  item1], [item2, item3]]}'])).collect()
 [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])]

 Is anyone able to replicate this behavior?

 -Brad




 On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We try to keep master very stable, but this is where active development
 happens. YMMV, but a lot of people do run very close to master without
 incident (myself included).

 branch-1.0 has been cut for a while and we only merge bug fixes into it
 (this is more strict for non-alpha components like spark core.).  For Spark
 SQL, this branch is pretty far behind as the project is very young and we
 are fixing bugs / adding features very rapidly compared with Spark core.

 branch-1.1 was just cut and is being QAed for a release, at this point
 its likely the same as master, but that will change as features start
 getting added to master in the coming weeks.



 On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:

 collect() works, too.

  sqlContext.jsonRDD(sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}',
  '{foo:[[1,2,3], [4,5,6]]}'])).collect()
 [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]

 Can’t answer your question about branch stability, though. Spark is a
 very active project, so stuff is happening all the time.

 Nick



 On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller
 bmill...@eecs.berkeley.edu wrote:

 Hi Nick,

 Can you check that the call to collect() works as well as
 printSchema()?  I actually experience that printSchema() works fine, 
 but
 then it crashes on collect().

 In general, should I expect the master (which seems to be on
 branch-1.1) to be any more/less stable than branch-1.0?  While it would 
 be
 great to have this fixed, it would be good to know if I should expect 
 lots
 of other instability.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:

 This looks to be fixed in master:

  from pyspark.sql import SQLContext
  sqlContext = SQLContext(sc)
  sc.parallelize(['{foo:[[1,2,3], [4,5,6]]}', '{foo:[[1,2,3],
  [4,5,6]]}'







 ])
 

Re: PySpark, numpy arrays and binary data

2014-08-06 Thread Davies Liu
numpy array only can support basic types, so we can not use it during collect()
by default.

Could you give a short example about how numpy array is used in your project?

On Wed, Aug 6, 2014 at 8:41 AM, Rok Roskar rokros...@gmail.com wrote:
 Hello,

 I'm interested in getting started with Spark to scale our scientific
 analysis package (http://pynbody.github.io) to larger data sets. The package
 is written in Python and makes heavy use of numpy/scipy and related
 frameworks. I've got a couple of questions that I have not been able to find
 easy answers to despite some research efforts... I hope someone here can
 clarify things for me a bit!

 * is there a preferred way to read binary data off a local disk directly
 into an RDD? Our I/O routines are built to read data in chunks and each
 chunk could be read by a different process/RDD, but it's not clear to me how
 to accomplish this with the existing API. Since the idea is to process data
 sets that don't fit into a single node's memory, reading first and then
 distributing via sc.parallelize is obviously not an option.

If you already know how to partition the data, then you could use
sc.parallelize()
to distribute the description of your data, then read the data in parallel by
given descriptions.

For examples, you can partition your data into (path, start, length), then

partitions = [(path1, start1, length), (path1, start2, length), ...]

def read_chunk(path, start, length):
  f = open(path)
  f.seek(start)
  data = f.read(length)
  #processing the data

rdd = sc.parallelize(partitions, len(partitions)).flatMap(read_chunk)

 * related to the first question -- when an RDD is created by parallelizing a
 numpy array, the array gets serialized and distributed. I see in the source
 that it actually gets written into a file first (!?) -- but surely the Py4J
 bottleneck for python array types (mentioned in the source comment) doesn't
 really apply to numpy arrays? Is it really necessary to dump the data onto
 disk first? Conversely, the collect() seems really slow and I suspect that
 this is due to the combination of disk I/O and python list creation. Are
 there any ways of getting around this if numpy arrays are being used?


 I'd be curious about any other best-practices tips anyone might have for
 running pyspark with numpy data...!

 Thanks!


 Rok


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark inferSchema

2014-08-05 Thread Davies Liu
On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 I was just about to ask about this.

 Currently, there are two methods, sqlContext.jsonFile() and
 sqlContext.jsonRDD(), that work on JSON text and infer a schema that covers
 the whole data set.

 For example:

 from pyspark.sql import SQLContext
 sqlContext = SQLContext(sc)

 a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
 '{foo:boom, baz:[1,2,3]}']))
 a.printSchema()
 root
  |-- baz: array (nullable = true)
  ||-- element: integer (containsNull = false)
  |-- foo: string (nullable = true)

 It works really well! It handles fields with inconsistent value types by
 inferring a value type that covers all the possible values.

 But say you’ve already deserialized the JSON to do some pre-processing or
 filtering. You’d commonly want to do this, say, to remove bad data. So now
 you have an RDD of Python dictionaries, as opposed to an RDD of JSON
 strings. It would be perfect if you could get the completeness of the
 json...() methods, but against dictionaries.

 Unfortunately, as you noted, inferSchema() only looks at the first element
 in the set. Furthermore, inferring schemata from RDDs of dictionaries is
 being deprecated in favor of doing so from RDDs of Rows.

 I’m not sure what the intention behind this move is, but as a user I’d like
 to be able to convert RDDs of dictionaries directly to SchemaRDDs with the
 completeness of the jsonRDD()/jsonFile() methods. Right now if I really want
 that, I have to serialize the dictionaries to JSON text and then call
 jsonRDD(), which is expensive.

Before upcoming 1.1 release, we did not support nested structures via
inferSchema,
the nested dictionary will be MapType. This introduces inconsistance
for dictionary that
the top level will be structure type (can be accessed by name of
field) but others will be
MapType (can be accesses as map).

So deprecated top level dictionary is try to solve this kind of inconsistance.

The Row class in pyspark.sql has a similar interface to dict, so you
can easily convert
you dic into a Row:

ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))

In order to get the correct schema, so we need another argument to specify
the number of rows to be infered? Such as:

inferSchema(rdd, sample=None)

with sample=None, it will take the first row, or it will do the
sampling to figure out the
complete schema.

Does this work for you?

 Nick



 On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

 Hi All,

 I have a data set where each record is serialized using JSON, and I'm
 interested to use SchemaRDDs to work with the data.  Unfortunately I've hit
 a snag since some fields in the data are maps and list, and are not
 guaranteed to be populated for each record.  This seems to cause inferSchema
 to throw an error:

 Produces error:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
 {'foo':'boom', 'baz':[1,2,3]}]))

 Works fine:
 srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
 {'foo':'boom', 'baz':[]}]))

 To be fair inferSchema says it peeks at the first row, so a possible
 work-around would be to make sure the type of any collection can be
 determined using the first instance.  However, I don't believe that items in
 an RDD are guaranteed to remain in an ordered, so this approach seems
 somewhat brittle.

 Does anybody know a robust solution to this problem in PySpark?  I'm am
 running the 1.0.1 release.

 -Brad



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.StackOverflowError

2014-08-05 Thread Davies Liu
Could you create an re-producable script (and data) to allow us to
investigate this?

Davies

On Tue, Aug 5, 2014 at 1:10 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 Hi,
   I am doing some basic preprocessing in pyspark (local mode as follows):

 files = [ input files]
 def read(filename,sc):
   #process file
   return rdd

 if __name__ ==__main__:
conf = SparkConf()
   conf.setMaster('local')
   sc = SparkContext(conf =conf)
   sc.setCheckpointDir(root+temp/)

   data = sc.parallelize([])

   for i,f in enumerate(files):

 data = data.union(read(f,sc))

union is an lazy transformation, you could union them at once,

rdds = [read(f,sc) for f in files]
rdd = sc.union(rdds)

 if i ==20:
   data.checkpoint()
   data.count()
 if i == 500:break
   #print data.count()
   #rdd_1 = read(files[0],sc)
   data.saveAsTextFile(root+output/)


 But I see this error:
   keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o9564.saveAsTextFile.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 serialization failed: java.lang.StackOverflowError
 java.io.Bits.putInt(Bits.java:93)
 java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark inferSchema

2014-08-05 Thread Davies Liu
This sample argument of inferSchema is still no in master, if will
try to add it if it make
sense.

On Tue, Aug 5, 2014 at 12:14 PM, Brad Miller bmill...@eecs.berkeley.edu wrote:
 Hi Davies,

 Thanks for the response and tips.  Is the sample argument to inferSchema
 available in the 1.0.1 release of pyspark?  I'm not sure (based on the
 documentation linked below) that it is.
 http://spark.apache.org/docs/latest/api/python/pyspark.sql.SQLContext-class.html#inferSchema

 It sounds like updating to master may help address my issue (and may also
 make the sample argument available), so I'm going to go ahead and do that.

 best,
 -Brad


 On Tue, Aug 5, 2014 at 12:01 PM, Davies Liu dav...@databricks.com wrote:

 On Tue, Aug 5, 2014 at 11:01 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  I was just about to ask about this.
 
  Currently, there are two methods, sqlContext.jsonFile() and
  sqlContext.jsonRDD(), that work on JSON text and infer a schema that
  covers
  the whole data set.
 
  For example:
 
  from pyspark.sql import SQLContext
  sqlContext = SQLContext(sc)
 
  a = sqlContext.jsonRDD(sc.parallelize(['{foo:bar, baz:[]}',
  '{foo:boom, baz:[1,2,3]}']))
  a.printSchema()
  root
   |-- baz: array (nullable = true)
   ||-- element: integer (containsNull = false)
   |-- foo: string (nullable = true)
 
  It works really well! It handles fields with inconsistent value types by
  inferring a value type that covers all the possible values.
 
  But say you’ve already deserialized the JSON to do some pre-processing
  or
  filtering. You’d commonly want to do this, say, to remove bad data. So
  now
  you have an RDD of Python dictionaries, as opposed to an RDD of JSON
  strings. It would be perfect if you could get the completeness of the
  json...() methods, but against dictionaries.
 
  Unfortunately, as you noted, inferSchema() only looks at the first
  element
  in the set. Furthermore, inferring schemata from RDDs of dictionaries is
  being deprecated in favor of doing so from RDDs of Rows.
 
  I’m not sure what the intention behind this move is, but as a user I’d
  like
  to be able to convert RDDs of dictionaries directly to SchemaRDDs with
  the
  completeness of the jsonRDD()/jsonFile() methods. Right now if I really
  want
  that, I have to serialize the dictionaries to JSON text and then call
  jsonRDD(), which is expensive.

 Before upcoming 1.1 release, we did not support nested structures via
 inferSchema,
 the nested dictionary will be MapType. This introduces inconsistance
 for dictionary that
 the top level will be structure type (can be accessed by name of
 field) but others will be
 MapType (can be accesses as map).

 So deprecated top level dictionary is try to solve this kind of
 inconsistance.

 The Row class in pyspark.sql has a similar interface to dict, so you
 can easily convert
 you dic into a Row:

 ctx.inferSchema(rdd_of_dict.map(lambda d: Row(**d)))

 In order to get the correct schema, so we need another argument to specify
 the number of rows to be infered? Such as:

 inferSchema(rdd, sample=None)

 with sample=None, it will take the first row, or it will do the
 sampling to figure out the
 complete schema.

 Does this work for you?

  Nick
 
 
 
  On Tue, Aug 5, 2014 at 1:31 PM, Brad Miller bmill...@eecs.berkeley.edu
  wrote:
 
  Hi All,
 
  I have a data set where each record is serialized using JSON, and I'm
  interested to use SchemaRDDs to work with the data.  Unfortunately I've
  hit
  a snag since some fields in the data are maps and list, and are not
  guaranteed to be populated for each record.  This seems to cause
  inferSchema
  to throw an error:
 
  Produces error:
  srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[]},
  {'foo':'boom', 'baz':[1,2,3]}]))
 
  Works fine:
  srdd = sqlCtx.inferSchema(sc.parallelize([{'foo':'bar', 'baz':[1,2,3]},
  {'foo':'boom', 'baz':[]}]))
 
  To be fair inferSchema says it peeks at the first row, so a possible
  work-around would be to make sure the type of any collection can be
  determined using the first instance.  However, I don't believe that
  items in
  an RDD are guaranteed to remain in an ordered, so this approach seems
  somewhat brittle.
 
  Does anybody know a robust solution to this problem in PySpark?  I'm am
  running the 1.0.1 release.
 
  -Brad
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Last step of processing is using too much memory.

2014-07-30 Thread Davies Liu
When you do groupBy(), it wish to load all the data into memory for best
performance, then you should specify the number of partitions carefully.

In Spark master or upcoming 1.1 release, PySpark can do external groupBy(),
it means that it will dumps the data into disks if there is not enough memory
to hold all the data. It also will help in this case.

On Fri, Jul 18, 2014 at 1:56 AM, Roch Denis rde...@exostatic.com wrote:
 Well, for what it's worth, I found the issue after spending the whole night
 running experiments;).

 Basically, I needed to give a higher number of partition for the groupByKey.
 I was simply using the default, which generated only 4 partitions and so the
 whole thing blew up.




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Last-step-of-processing-is-using-too-much-memory-tp10134p10147.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How do you debug a PythonException?

2014-07-30 Thread Davies Liu
The exception in Python means that the worker try to read command from
JVM, but it reach
the end of socket (socket had been closed). So it's possible that
there another exception
happened in JVM.

Could you change the log level of log4j, then check is there any
problem inside JVM?

Davies

On Wed, Jul 30, 2014 at 9:12 AM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 Any clues? This looks like a bug, but I can't report it without more precise
 information.


 On Tue, Jul 29, 2014 at 9:56 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 I’m in the PySpark shell and I’m trying to do this:

 a =
 sc.textFile('s3n://path-to-handful-of-very-large-files-totalling-1tb/*.json',
 minPartitions=sc.defaultParallelism * 3).cache()
 a.map(lambda x: len(x)).max()

 My job dies with the following:

 14/07/30 01:46:28 WARN TaskSetManager: Loss was due to
 org.apache.spark.api.python.PythonException
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File /root/spark/python/pyspark/worker.py, line 73, in main
 command = pickleSer._read_with_length(infile)
   File /root/spark/python/pyspark/serializers.py, line 142, in
 _read_with_length
 length = read_int(stream)
   File /root/spark/python/pyspark/serializers.py, line 337, in read_int
 raise EOFError
 EOFError

 at
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
 at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 14/07/30 01:46:29 ERROR TaskSchedulerImpl: Lost executor 19 on
 ip-10-190-171-217.ec2.internal: remote Akka client disassociated

 How do I debug this? I’m using 1.0.2-rc1 deployed to EC2.

 Nick


 
 View this message in context: How do you debug a PythonException?
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: zip two RDD in pyspark

2014-07-29 Thread Davies Liu
On Mon, Jul 28, 2014 at 12:58 PM, l lishu...@gmail.com wrote:
 I have a file in s3 that I want to map each line with an index. Here is my
 code:

 input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache()
 N input_data.count()
 index = sc.parallelize(range(N), 6)
 index.zip(input_data).collect()

I think you can not do zipWithIndex() in this way, because the number of
lines in each partition of input_data will be different than index. You need
get the exact number of lines for each partitions first, then generate correct
index. It will be easy to do with mapPartitions()

 nums = input_data.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
 starts = [sum(nums[:i]) for i in range(len(nums))]
 zipped = input_data.mapPartitionsWithIndex(lambda i,it: ((starts[i]+j, x) 
 for j,x in enumerate(it)))


 ...
 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4)
 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at stdin:1)
 finished in 0.031 s
 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at stdin:1,
 took 0.03707 s
 Traceback (most recent call last):
   File stdin, line 1, in module
   File /root/spark/python/pyspark/rdd.py, line 584, in collect
 return list(self._collect_iterator_through_file(bytesInJava))
   File /root/spark/python/pyspark/rdd.py, line 592, in
 _collect_iterator_through_file
 self.ctx._writeToFile(iterator, tempFile.name)
   File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__
   File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line
 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.writeToFile.
 : java.lang.ClassCastException: java.lang.String cannot be cast to [B
 at
 org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312)
 at
 org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309)
 at 
 org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342)
 at 
 org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337)
 at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala)
 at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at 
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:744)

 As I see it, the job is completed, but I don't understand what's happening
 to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD
 and it works fine. But here I have a MappedRDD at textFile. Not sure what's
 going on here.

Could you provide an script and dataset to reproduce this error? Maybe
there are some corner cases during serialization.


 Also, why Python does not have ZipWithIndex()?

The features in PySpark are much less than Spark, hopefully it will
catch up in next two releases.


 Thanks for any help.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


<    1   2   3   4   >