"java.lang.IllegalStateException: There is no space for new record" in GraphFrames

2017-04-28 Thread rok
When running the connectedComponents algorithm in GraphFrames on a
sufficiently large dataset, I get the following error I have not encountered
before: 

17/04/20 20:35:26 WARN TaskSetManager: Lost task 3.0 in stage 101.0 (TID
53644, 172.19.1.206, executor 40): java.lang.IllegalStateException: There is
no space for new record
at
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:225)
at
org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:130)
at
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:244)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Any thoughts on how to avoid this? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-There-is-no-space-for-new-record-in-GraphFrames-tp28635.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: PySpark Serialization/Deserialization (Pickling) Overhead

2017-03-08 Thread rok
My guess is that the UI serialization times show the Java side only. To get
a feeling for the python pickling/unpickling, use the show_profiles()
method of the SparkContext instance: http://spark.apache.
org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.show_profiles

That will show you how much of the execution time is used up by
cPickle.load() and .dump() methods.

Hope that helps,

Rok

On Wed, Mar 8, 2017 at 3:18 AM, Yeoul Na [via Apache Spark User List] <
ml-node+s1001560n28468...@n3.nabble.com> wrote:

>
> Hi all,
>
> I am trying to analyze PySpark performance overhead. People just say
> PySpark
> is slower than Scala due to the Serialization/Deserialization overhead. I
> tried with the example in this post:
> https://0x0fff.com/spark-dataframes-are-faster-arent-they/. This and many
> articles say straight-forward Python implementation is the slowest due to
> the serialization/deserialization overhead.
>
> However, when I actually looked at the log in the Web UI, serialization
> and deserialization time of PySpark do not seem to be any bigger than that
> of Scala. The main contributor was "Executor Computing Time". Thus, we
> cannot sure whether this is due to serialization or because Python code is
> basically slower than Scala code.
>
> So my question is that does "Task Deserialization Time" in Spark WebUI
> actually include serialization/deserialization times in PySpark? If this is
> not the case, how can I actually measure the serialization/deserialization
> overhead?
>
> Thanks,
> Yeoul
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-
> Serialization-Deserialization-Pickling-Overhead-tp28468.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=cm9rcm9za2FyQGdtYWlsLmNvbXwxfC0xNDM4OTI3NjU3>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Serialization-Deserialization-Pickling-Overhead-tp28468p28469.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Is stddev not a supported aggregation function in SparkSQL WindowSpec?

2016-02-18 Thread rok
There is a stddev function since 1.6:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.stddev


If you are using spark < 1.6 you can write your own more or less easily.

On Wed, Feb 17, 2016 at 5:06 PM, mayx [via Apache Spark User List] <
ml-node+s1001560n26250...@n3.nabble.com> wrote:

> I'd like to use standard deviation over window partitions on the Spark
> dataframe, but it didn't work. Is it not supported? Looks like it supports
> many aggregation functions, such as mean, min, etc. How can I make a
> feature request for this?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-stddev-not-a-supported-aggregation-function-in-SparkSQL-WindowSpec-tp26250.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-stddev-not-a-supported-aggregation-function-in-SparkSQL-WindowSpec-tp26250p26263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

spark metrics in graphite missing for some executors

2015-12-11 Thread rok
I'm using graphite/grafana to collect and visualize metrics from my spark
jobs.

It appears that not all executors report all the metrics -- for example,
even jvm heap data is missing from some. Is there an obvious reason why
this happens? Are metrics somehow held back? Often, an executor's metrics
will show up with a delay, but since they are aggregate metrics (e.g.
number of completed tasks), it is clear that they are being collected from
the beginning (the level once it appears matches other executors) but for
some reason just don't show up initially.

Any experience with this? How can it be fixed? Right now it's rendering
many metrics useless since I want to have a complete view into the
application and I'm only seeing a few executors at a time.

Thanks,

rok




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-metrics-in-graphite-missing-for-some-executors-tp25688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: very slow parquet file write

2015-11-13 Thread Rok Roskar
I'm not sure what you mean? I didn't do anything specifically to partition
the columns
On Nov 14, 2015 00:38, "Davies Liu" <dav...@databricks.com> wrote:

> Do you have partitioned columns?
>
> On Thu, Nov 5, 2015 at 2:08 AM, Rok Roskar <rokros...@gmail.com> wrote:
> > I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions
> into a
> > parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
> > the size of file this is way over-provisioned (I've tried it with fewer
> > partitions and fewer nodes, no obvious effect). I was expecting the dump
> to
> > disk to be very fast -- the DataFrame is cached in memory and contains
> just
> > 14 columns (13 are floats and one is a string). When I write it out in
> json
> > format, this is indeed reasonably fast (though it still takes a few
> minutes,
> > which is longer than I would expect).
> >
> > However, when I try to write a parquet file it takes way longer -- the
> first
> > set of tasks finishes in a few minutes, but the subsequent tasks take
> more
> > than twice as long or longer. In the end it takes over half an hour to
> write
> > the file. I've looked at the disk I/O and cpu usage on the compute nodes
> and
> > it looks like the processors are fully loaded while the disk I/O is
> > essentially zero for long periods of time. I don't see any obvious
> garbage
> > collection issues and there are no problems with memory.
> >
> > Any ideas on how to debug/fix this?
> >
> > Thanks!
> >
> >
>


Re: very slow parquet file write

2015-11-06 Thread Rok Roskar
yes I was expecting that too because of all the metadata generation and
compression. But I have not seen performance this bad for other parquet
files I’ve written and was wondering if there could be something obvious
(and wrong) to do with how I’ve specified the schema etc. It’s a very
simple schema consisting of a StructType with a few StructField floats and
a string. I’m using all the spark defaults for io compression.

I'll see what I can do about running a profiler -- can you point me to a
resource/example?

Thanks,

Rok

ps: my post on the mailing list is still listed as not accepted by the
mailing list:
http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-td25295.html
-- none of your responses are there either. I am definitely subscribed to
the list though (I get daily digests). Any clue how to fix it?




On Nov 6, 2015, at 9:26 AM, Cheng Lian <lian.cs@gmail.com> wrote:

I'd expect writing Parquet files slower than writing JSON files since
Parquet involves more complicated encoders, but maybe not that slow. Would
you mind to try to profile one Spark executor using tools like YJP to see
what's the hotspot?

Cheng

On 11/6/15 7:34 AM, rok wrote:

Apologies if this appears a second time!

I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions into a
parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
the size of file this is way over-provisioned (I've tried it with fewer
partitions and fewer nodes, no obvious effect). I was expecting the dump to
disk to be very fast -- the DataFrame is cached in memory and contains just
14 columns (13 are floats and one is a string). When I write it out in json
format, this is indeed reasonably fast (though it still takes a few minutes,
which is longer than I would expect).

However, when I try to write a parquet file it takes way longer -- the first
set of tasks finishes in a few minutes, but the subsequent tasks take more
than twice as long or longer. In the end it takes over half an hour to write
the file. I've looked at the disk I/O and cpu usage on the compute nodes and
it looks like the processors are fully loaded while the disk I/O is
essentially zero for long periods of time. I don't see any obvious garbage
collection issues and there are no problems with memory.

Any ideas on how to debug/fix this?

Thanks!



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-tp25295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


very slow parquet file write

2015-11-05 Thread Rok Roskar
I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions into
a parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
the size of file this is way over-provisioned (I've tried it with fewer
partitions and fewer nodes, no obvious effect). I was expecting the dump to
disk to be very fast -- the DataFrame is cached in memory and contains just
14 columns (13 are floats and one is a string). When I write it out in json
format, this is indeed reasonably fast (though it still takes a few
minutes, which is longer than I would expect).

However, when I try to write a parquet file it takes way longer -- the
first set of tasks finishes in a few minutes, but the subsequent tasks take
more than twice as long or longer. In the end it takes over half an hour to
write the file. I've looked at the disk I/O and cpu usage on the compute
nodes and it looks like the processors are fully loaded while the disk I/O
is essentially zero for long periods of time. I don't see any obvious
garbage collection issues and there are no problems with memory.

Any ideas on how to debug/fix this?

Thanks!


very slow parquet file write

2015-11-05 Thread rok
Apologies if this appears a second time! 

I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions into a
parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
the size of file this is way over-provisioned (I've tried it with fewer
partitions and fewer nodes, no obvious effect). I was expecting the dump to
disk to be very fast -- the DataFrame is cached in memory and contains just
14 columns (13 are floats and one is a string). When I write it out in json
format, this is indeed reasonably fast (though it still takes a few minutes,
which is longer than I would expect). 

However, when I try to write a parquet file it takes way longer -- the first
set of tasks finishes in a few minutes, but the subsequent tasks take more
than twice as long or longer. In the end it takes over half an hour to write
the file. I've looked at the disk I/O and cpu usage on the compute nodes and
it looks like the processors are fully loaded while the disk I/O is
essentially zero for long periods of time. I don't see any obvious garbage
collection issues and there are no problems with memory. 

Any ideas on how to debug/fix this? 

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-tp25295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: help plz! how to use zipWithIndex to each subset of a RDD

2015-07-30 Thread rok
zipWithIndex gives you global indices, which is not what you want. You'll
want to use flatMap with a map function that iterates through each iterable
and returns the (String, Int, String) tuple for each element.

On Thu, Jul 30, 2015 at 4:13 AM, askformore [via Apache Spark User List] 
ml-node+s1001560n24071...@n3.nabble.com wrote:

 I have some data like this: RDD[(String, String)] = ((*key-1*, a), (
 *key-1*,b), (*key-2*,a), (*key-2*,c),(*key-3*,b),(*key-4*,d)) and I want
 to group the data by Key, and for each group, add index fields to the
 groupmember, at last I can transform the data to below : RDD[(String,
 *Int*, String)] = ((key-1,*1*, a), (key-1,*2,*b), (key-2,*1*,a), (key-2,
 *2*,b),(key-3,*1*,b),(key-4,*1*,d)) I tried to groupByKey firstly, then I
 got a RDD[(String, Iterable[String])], but I don't know how to use
 zipWithIndex function to each Iterable... thanks.

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cm9rcm9za2FyQGdtYWlsLmNvbXwxfC0xNDM4OTI3NjU3
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071p24074.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: problems running Spark on a firewalled remote YARN cluster via SOCKS proxy

2015-07-24 Thread Rok Roskar
Hi Akhil,

the namenode is definitely configured correctly, otherwise the job would
not start at all. It registers with YARN and starts up, but once the nodes
try to communicate to each other it fails. Note that a hadoop MR job using
the identical configuration executes without any problems. The driver also
connects just fine -- here is the log:

15/07/24 08:10:58 INFO yarn.ApplicationMaster: Registered signal
handlers for [TERM, HUP, INT]
15/07/24 08:10:59 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
15/07/24 08:10:59 INFO yarn.ApplicationMaster: ApplicationAttemptId:
appattempt_1437724871597_0001_01
15/07/24 08:11:00 INFO spark.SecurityManager: Changing view acls to: root,rok
15/07/24 08:11:00 INFO spark.SecurityManager: Changing modify acls to: root,rok
15/07/24 08:11:00 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(root, rok); users with modify permissions: Set(root,
rok)
15/07/24 08:11:00 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/24 08:11:01 INFO Remoting: Starting remoting
15/07/24 08:11:01 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkYarnAM@10.211.55.104:51896]
15/07/24 08:11:01 INFO util.Utils: Successfully started service
'sparkYarnAM' on port 51896.
15/07/24 08:11:01 INFO yarn.ApplicationMaster: Waiting for Spark
driver to be reachable.
15/07/24 08:11:01 INFO yarn.ApplicationMaster: Driver now available:
driver IP:58734
15/07/24 08:11:01 INFO yarn.ApplicationMaster$AMEndpoint: Add WebUI
Filter. 
AddWebUIFilter(org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,Map(PROXY_HOSTS
- node1, PROXY_URI_BASES -
http://node1:8089/proxy/application_1437724871597_0001),/proxy/application_1437724871597_0001)
15/07/24 08:11:01 INFO client.RMProxy: Connecting to ResourceManager
at node1/10.211.55.101:8030
15/07/24 08:11:01 INFO yarn.YarnRMClient: Registering the ApplicationMaster
15/07/24 08:11:02 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 0 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:03 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 1 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:04 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 2 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:05 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 3 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:06 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 4 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:07 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 5 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:08 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 6 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:09 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 7 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:10 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 8 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:11 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 9 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:11 ERROR yarn.ApplicationMaster: Uncaught exception:
java.io.IOException: Failed on local exception:
java.net.SocketException: Connection refused; Host Details : local
host is: node4/10.211.55.104; destination host is: node1:8030;




On Thu, Jul 23, 2015 at 7:00 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 It looks like its picking up the wrong namenode uri from the
 HADOOP_CONF_DIR, make sure it is proper. Also for submitting a spark job to
 a remote cluster, you might want to look at the spark.driver host and
 spark.driver.port

 Thanks
 Best Regards

 On Wed, Jul 22, 2015 at 8:56 PM, rok rokros...@gmail.com wrote:

 I am trying to run Spark applications with the driver running locally and
 interacting with a firewalled remote cluster via a SOCKS proxy.

 I have to modify the hadoop configuration on the *local machine* to try to
 make this work

problems running Spark on a firewalled remote YARN cluster via SOCKS proxy

2015-07-22 Thread rok
I am trying to run Spark applications with the driver running locally and
interacting with a firewalled remote cluster via a SOCKS proxy. 

I have to modify the hadoop configuration on the *local machine* to try to
make this work, adding 

property
   namehadoop.rpc.socket.factory.class.default/name
   valueorg.apache.hadoop.net.SocksSocketFactory/value
/property
property
   namehadoop.socks.server/name
   valuelocalhost:9998/value
/property

and on the *remote cluster* side

property
namehadoop.rpc.socket.factory.class.default/name
valueorg.apache.hadoop.net.StandardSocketFactory/value
finaltrue/final
/property

With this setup, and running ssh -D 9998 gateway.host to start the proxy
connection, MapReduce jobs started on the local machine execute fine on the
remote cluster. However, trying to launch a Spark job fails with the nodes
of the cluster apparently unable to communicate with one another: 

java.io.IOException: Failed on local exception: java.net.SocketException:
Connection refused; Host Details : local host is: node3/10.211.55.103;
destination host is: node1:8030;

Looking at the packets being sent to node1 from node3, it's clear that no
requests are made on port 8030, hinting that the connection is somehow being
proxied. 

Is it possible that the Spark job is not honoring the socket.factory
settings on the *cluster* side for some reason? 

Note that  Spark JIRA 5004
https://issues.apache.org/jira/browse/SPARK-5004   addresses a similar
problem, though it looks like they are actually not the same (since in that
case it sounds like a standalone cluster is being used). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problems-running-Spark-on-a-firewalled-remote-YARN-cluster-via-SOCKS-proxy-tp23955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FetchFailedException and MetadataFetchFailedException

2015-05-22 Thread Rok Roskar
$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:38)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

On Tue, May 19, 2015 at 3:38 AM, Imran Rashid iras...@cloudera.com wrote:

 Hi,

 can you take a look at the logs and see what the first error you are
 getting is?  Its possible that the file doesn't exist when that error is
 produced, but it shows up later -- I've seen similar things happen, but
 only after there have already been some errors.  But, if you see that in
 the very first error, then Im not sure what the cause is.  Would be
 helpful for you to send the logs.

 Imran

 On Fri, May 15, 2015 at 10:07 AM, rok rokros...@gmail.com wrote:

 I am trying to sort a collection of key,value pairs (between several
 hundred
 million to a few billion) and have recently been getting lots of
 FetchFailedException errors that seem to originate when one of the
 executors doesn't seem to find a temporary shuffle file on disk. E.g.:

 org.apache.spark.shuffle.FetchFailedException:

 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index
 (No such file or directory)

 This file actually exists:

  ls -l
 
 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

 -rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52

 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

 This error repeats on several executors and is followed by a number of

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0

 This results on most tasks being lost and executors dying.

 There is plenty of space on all of the appropriate filesystems, so none of
 the executors are running out of disk space. Any idea what might be
 causing
 this? I am running this via YARN on approximately 100 nodes with 2 cores
 per
 node. Any thoughts on what might be causing these errors? Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





FetchFailedException and MetadataFetchFailedException

2015-05-15 Thread rok
I am trying to sort a collection of key,value pairs (between several hundred
million to a few billion) and have recently been getting lots of
FetchFailedException errors that seem to originate when one of the
executors doesn't seem to find a temporary shuffle file on disk. E.g.: 

org.apache.spark.shuffle.FetchFailedException:
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index
(No such file or directory)

This file actually exists: 

 ls -l
 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

-rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

This error repeats on several executors and is followed by a number of 

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0

This results on most tasks being lost and executors dying. 

There is plenty of space on all of the appropriate filesystems, so none of
the executors are running out of disk space. Any idea what might be causing
this? I am running this via YARN on approximately 100 nodes with 2 cores per
node. Any thoughts on what might be causing these errors? Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-04-28 Thread Rok Roskar
That's exactly what I'm saying -- I specify the memory options using spark
options, but this is not reflected in how the JVM is created. No matter
which memory settings I specify, the JVM for the driver is always made with
512Mb of memory. So I'm not sure if this is a feature or a bug?

rok

On Mon, Apr 27, 2015 at 6:54 PM, Xiangrui Meng men...@gmail.com wrote:

 You might need to specify driver memory in spark-submit instead of
 passing JVM options. spark-submit is designed to handle different
 deployments correctly. -Xiangrui

 On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar rokros...@gmail.com wrote:
  ok yes, I think I have narrowed it down to being a problem with driver
  memory settings. It looks like the application master/driver is not being
  launched with the settings specified:
 
  For the driver process on the main node I see -XX:MaxPermSize=128m
 -Xms512m
  -Xmx512m as options used to start the JVM, even though I specified
 
  'spark.yarn.am.memory', '5g'
  'spark.yarn.am.memoryOverhead', '2000'
 
  The info shows that these options were read:
 
  15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with
 7120 MB
  memory including 2000 MB overhead
 
  Is there some reason why these options are being ignored and instead
  starting the driver with just 512Mb of heap?
 
  On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote:
 
  the feature dimension is 800k.
 
  yes, I believe the driver memory is likely the problem since it doesn't
  crash until the very last part of the tree aggregation.
 
  I'm running it via pyspark through YARN -- I have to run in client mode
 so
  I can't set spark.driver.memory -- I've tried setting the
  spark.yarn.am.memory and overhead parameters but it doesn't seem to
 have an
  effect.
 
  Thanks,
 
  Rok
 
  On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:
 
   What is the feature dimension? Did you set the driver memory?
 -Xiangrui
  
   On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
   I'm trying to use the StandardScaler in pyspark on a relatively small
   (a few
   hundred Mb) dataset of sparse vectors with 800k features. The fit
   method of
   StandardScaler crashes with Java heap space or Direct buffer memory
   errors.
   There should be plenty of memory around -- 10 executors with 2 cores
   each
   and 8 Gb per core. I'm giving the executors 9g of memory and have
 also
   tried
   lots of overhead (3g), thinking it might be the array creation in the
   aggregators that's causing issues.
  
   The bizarre thing is that this isn't always reproducible -- sometimes
   it
   actually works without problems. Should I be setting up executors
   differently?
  
   Thanks,
  
   Rok
  
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
   Sent from the Apache Spark User List mailing list archive at
   Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



Re: StandardScaler failing with OOM errors in PySpark

2015-04-23 Thread Rok Roskar
the feature dimension is 800k.

yes, I believe the driver memory is likely the problem since it doesn't crash 
until the very last part of the tree aggregation. 

I'm running it via pyspark through YARN -- I have to run in client mode so I 
can't set spark.driver.memory -- I've tried setting the spark.yarn.am.memory 
and overhead parameters but it doesn't seem to have an effect. 

Thanks,

Rok

On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:

 What is the feature dimension? Did you set the driver memory? -Xiangrui
 
 On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
 I'm trying to use the StandardScaler in pyspark on a relatively small (a few
 hundred Mb) dataset of sparse vectors with 800k features. The fit method of
 StandardScaler crashes with Java heap space or Direct buffer memory errors.
 There should be plenty of memory around -- 10 executors with 2 cores each
 and 8 Gb per core. I'm giving the executors 9g of memory and have also tried
 lots of overhead (3g), thinking it might be the array creation in the
 aggregators that's causing issues.
 
 The bizarre thing is that this isn't always reproducible -- sometimes it
 actually works without problems. Should I be setting up executors
 differently?
 
 Thanks,
 
 Rok
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-04-23 Thread Rok Roskar
ok yes, I think I have narrowed it down to being a problem with driver
memory settings. It looks like the application master/driver is not being
launched with the settings specified:

For the driver process on the main node I see -XX:MaxPermSize=128m
-Xms512m -Xmx512m as options used to start the JVM, even though I
specified

'spark.yarn.am.memory', '5g'
'spark.yarn.am.memoryOverhead', '2000'

The info shows that these options were read:

15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120
MB memory including 2000 MB overhead

Is there some reason why these options are being ignored and instead
starting the driver with just 512Mb of heap?

On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote:

 the feature dimension is 800k.

 yes, I believe the driver memory is likely the problem since it doesn't
 crash until the very last part of the tree aggregation.

 I'm running it via pyspark through YARN -- I have to run in client mode so
 I can't set spark.driver.memory -- I've tried setting the
 spark.yarn.am.memory and overhead parameters but it doesn't seem to have an
 effect.

 Thanks,

 Rok

 On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:

  What is the feature dimension? Did you set the driver memory? -Xiangrui
 
  On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
  I'm trying to use the StandardScaler in pyspark on a relatively small
 (a few
  hundred Mb) dataset of sparse vectors with 800k features. The fit
 method of
  StandardScaler crashes with Java heap space or Direct buffer memory
 errors.
  There should be plenty of memory around -- 10 executors with 2 cores
 each
  and 8 Gb per core. I'm giving the executors 9g of memory and have also
 tried
  lots of overhead (3g), thinking it might be the array creation in the
  aggregators that's causing issues.
 
  The bizarre thing is that this isn't always reproducible -- sometimes it
  actually works without problems. Should I be setting up executors
  differently?
 
  Thanks,
 
  Rok
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 




Re: java.util.NoSuchElementException: key not found:

2015-03-02 Thread Rok Roskar
aha ok, thanks.

If I create different RDDs from a parent RDD and force evaluation
thread-by-thread, then it should presumably be fine, correct? Or do I need
to checkpoint the child RDDs as a precaution in case it needs to be removed
from memory and recomputed?

On Sat, Feb 28, 2015 at 4:28 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 RDD is not thread-safe. You should not use it in multiple threads.

 Best Regards,
 Shixiong Zhu

 2015-02-27 23:14 GMT+08:00 rok rokros...@gmail.com:

 I'm seeing this java.util.NoSuchElementException: key not found: exception
 pop up sometimes when I run operations on an RDD from multiple threads in
 a
 python application. It ends up shutting down the SparkContext so I'm
 assuming this is a bug -- from what I understand, I should be able to run
 operations on the same RDD from multiple threads or is this not
 recommended?

 I can't reproduce it all the time and I've tried eliminating caching
 wherever possible to see if that would have an effect, but it doesn't seem
 to. Each thread first splits the base RDD and then runs the
 LogisticRegressionWithSGD on the subset.

 Is there a workaround to this exception?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-util-NoSuchElementException-key-not-found-tp21848.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





java.util.NoSuchElementException: key not found:

2015-02-27 Thread rok
I'm seeing this java.util.NoSuchElementException: key not found: exception
pop up sometimes when I run operations on an RDD from multiple threads in a
python application. It ends up shutting down the SparkContext so I'm
assuming this is a bug -- from what I understand, I should be able to run
operations on the same RDD from multiple threads or is this not recommended? 

I can't reproduce it all the time and I've tried eliminating caching
wherever possible to see if that would have an effect, but it doesn't seem
to. Each thread first splits the base RDD and then runs the
LogisticRegressionWithSGD on the subset.  

Is there a workaround to this exception? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-util-NoSuchElementException-key-not-found-tp21848.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



cannot connect to Spark Application Master in YARN

2015-02-18 Thread rok
)
at
org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
at 
org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
at org.mortbay.jetty.Server.handle(Server.java:326)
at 
org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
at
org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
at
org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
at
org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)


On the other hand, trying to access the Spark UI via the URL reported by
Spark when the application starts, it gets redirected to the cluster node
but is unable to connect. Are the applications somehow binding to the wrong
ports? Is this a spark setting I need to configure or something within YARN? 

Thanks!

Rok




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cannot-connect-to-Spark-Application-Master-in-YARN-tp21699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark: Java null pointer exception when accessing broadcast variables

2015-02-12 Thread Rok Roskar
]:
[0.39210713836346933,
 0.8636333432012482,
 0.28744831569153617,
 0.663815926356163,
 0.38274814840717364,
 0.6606453820150496,
 0.8610156719813942,
 0.6971353266345091,
 0.9896836700210551,
 0.05789392881996358]

Is there a size limit for objects serialized with Kryo? Or an option that
controls it? The Java serializer works fine.

On Wed, Feb 11, 2015 at 8:04 PM, Rok Roskar rokros...@gmail.com wrote:

 I think the problem was related to the broadcasts being too large -- I've
 now split it up into many smaller operations but it's still not quite there
 -- see
 http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-td21606.html

 Thanks,

 Rok

 On Wed, Feb 11, 2015, 19:59 Davies Liu dav...@databricks.com wrote:

 Could you share a short script to reproduce this problem?

 On Tue, Feb 10, 2015 at 8:55 PM, Rok Roskar rokros...@gmail.com wrote:
  I didn't notice other errors -- I also thought such a large broadcast
 is a
  bad idea but I tried something similar with a much smaller dictionary
 and
  encountered the same problem. I'm not familiar enough with spark
 internals
  to know whether the trace indicates an issue with the broadcast
 variables or
  perhaps something different?
 
  The driver and executors have 50gb of ram so memory should be fine.
 
  Thanks,
 
  Rok
 
  On Feb 11, 2015 12:19 AM, Davies Liu dav...@databricks.com wrote:
 
  It's brave to broadcast 8G pickled data, it will take more than 15G in
  memory for each Python worker,
  how much memory do you have in executor and driver?
  Do you see any other exceptions in driver and executors? Something
  related to serialization in JVM.
 
  On Tue, Feb 10, 2015 at 2:16 PM, Rok Roskar rokros...@gmail.com
 wrote:
   I get this in the driver log:
 
  I think this should happen on executor, or you called first() or
  take() on the RDD?
 
   java.lang.NullPointerException
   at
   org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
   at scala.collection.Iterator$class.foreach(Iterator.scala:
 727)
   at
   scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   at
   scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
   at scala.collection.AbstractIterable.foreach(
 Iterable.scala:54)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1.apply$mcV$sp(PythonRDD.scala:229)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1.apply(PythonRDD.scala:204)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1.apply(PythonRDD.scala:204)
   at
   org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread.run(Pytho
 nRDD.scala:203)
  
   and on one of the executor's stderr:
  
   15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly
   (crashed)
   org.apache.spark.api.python.PythonException: Traceback (most recent
 call
   last):
 File
   /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pysp
 ark/worker.py,
   line 57, in main
   split_index = read_int(infile)
 File
   /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pysp
 ark/serializers.py,
   line 511, in read_int
   raise EOFError
   EOFError
  
   at
   org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD
 .scala:137)
   at
   org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonR
 DD.scala:174)
   at
   org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
   at
   org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at
   org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1.apply$mcV$sp(PythonRDD.scala:242)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1.apply(PythonRDD.scala:204)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1.apply(PythonRDD.scala:204)
   at
   org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread.run(Pytho
 nRDD.scala:203)
   Caused by: java.lang.NullPointerException
   at
   org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
   at
   org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
 run$1$$anonfun$apply$mcV$sp$3.apply

Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
Aha great! Thanks for the clarification!
On Feb 11, 2015 8:11 PM, Davies Liu dav...@databricks.com wrote:

 On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote:
  I was having trouble with memory exceptions when broadcasting a large
 lookup
  table, so I've resorted to processing it iteratively -- but how can I
 modify
  an RDD iteratively?
 
  I'm trying something like :
 
  rdd = sc.parallelize(...)
  lookup_tables = {...}
 
  for lookup_table in lookup_tables :
  rdd = rdd.map(lambda x: func(x, lookup_table))
 
  If I leave it as is, then only the last lookup_table is applied
 instead of
  stringing together all the maps. However, if add a .cache() to the .map
 then
  it seems to work fine.

 This is the something related to Python closure implementation, you should
 do it like this:

 def create_func(lookup_table):
  return lambda x: func(x, lookup_table)

 for lookup_table in lookup_tables:
 rdd = rdd.map(create_func(lookup_table))

 The Python closure just remember the variable, not copy the value of it.
 In the loop, `lookup_table` is the same variable. When we serialize the
 final
 rdd, all the closures are referring to the same `lookup_table`, which
 points
 to the last value.

 When we create the closure in a function, Python create a variable for
 each closure, so it works.

  A second problem is that the runtime for each iteration roughly doubles
 at
  each iteration so this clearly doesn't seem to be the way to do it. What
 is
  the preferred way of doing such repeated modifications to an RDD and how
 can
  the accumulation of overhead be minimized?
 
  Thanks!
 
  Rok
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
the runtime for each consecutive iteration is still roughly twice as long as 
for the previous one -- is there a way to reduce whatever overhead is 
accumulating? 

On Feb 11, 2015, at 8:11 PM, Davies Liu dav...@databricks.com wrote:

 On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote:
 I was having trouble with memory exceptions when broadcasting a large lookup
 table, so I've resorted to processing it iteratively -- but how can I modify
 an RDD iteratively?
 
 I'm trying something like :
 
 rdd = sc.parallelize(...)
 lookup_tables = {...}
 
 for lookup_table in lookup_tables :
rdd = rdd.map(lambda x: func(x, lookup_table))
 
 If I leave it as is, then only the last lookup_table is applied instead of
 stringing together all the maps. However, if add a .cache() to the .map then
 it seems to work fine.
 
 This is the something related to Python closure implementation, you should
 do it like this:
 
 def create_func(lookup_table):
 return lambda x: func(x, lookup_table)
 
 for lookup_table in lookup_tables:
rdd = rdd.map(create_func(lookup_table))
 
 The Python closure just remember the variable, not copy the value of it.
 In the loop, `lookup_table` is the same variable. When we serialize the final
 rdd, all the closures are referring to the same `lookup_table`, which points
 to the last value.
 
 When we create the closure in a function, Python create a variable for
 each closure, so it works.
 
 A second problem is that the runtime for each iteration roughly doubles at
 each iteration so this clearly doesn't seem to be the way to do it. What is
 the preferred way of doing such repeated modifications to an RDD and how can
 the accumulation of overhead be minimized?
 
 Thanks!
 
 Rok
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
yes, sorry i wasn't clear -- I still have to trigger the calculation of the RDD 
at the end of each iteration. Otherwise all of the lookup tables are shipped to 
the cluster at the same time resulting in memory errors. Therefore this becomes 
several map jobs instead of one and each consecutive map is slower than the one 
before. I'll try the checkpoint, thanks for the suggestion. 


On Feb 12, 2015, at 12:13 AM, Davies Liu dav...@databricks.com wrote:

 On Wed, Feb 11, 2015 at 2:43 PM, Rok Roskar rokros...@gmail.com wrote:
 the runtime for each consecutive iteration is still roughly twice as long as 
 for the previous one -- is there a way to reduce whatever overhead is 
 accumulating?
 
 Sorry, I didn't fully understand you question, which two are you comparing?
 
 PySpark will try to combine the multiple map() together, then you will get
 a task which need all the lookup_tables (the same size as before).
 
 You could add a checkpoint after some of the iterations.
 
 On Feb 11, 2015, at 8:11 PM, Davies Liu dav...@databricks.com wrote:
 
 On Wed, Feb 11, 2015 at 10:47 AM, rok rokros...@gmail.com wrote:
 I was having trouble with memory exceptions when broadcasting a large 
 lookup
 table, so I've resorted to processing it iteratively -- but how can I 
 modify
 an RDD iteratively?
 
 I'm trying something like :
 
 rdd = sc.parallelize(...)
 lookup_tables = {...}
 
 for lookup_table in lookup_tables :
   rdd = rdd.map(lambda x: func(x, lookup_table))
 
 If I leave it as is, then only the last lookup_table is applied instead 
 of
 stringing together all the maps. However, if add a .cache() to the .map 
 then
 it seems to work fine.
 
 This is the something related to Python closure implementation, you should
 do it like this:
 
 def create_func(lookup_table):
return lambda x: func(x, lookup_table)
 
 for lookup_table in lookup_tables:
   rdd = rdd.map(create_func(lookup_table))
 
 The Python closure just remember the variable, not copy the value of it.
 In the loop, `lookup_table` is the same variable. When we serialize the 
 final
 rdd, all the closures are referring to the same `lookup_table`, which points
 to the last value.
 
 When we create the closure in a function, Python create a variable for
 each closure, so it works.
 
 A second problem is that the runtime for each iteration roughly doubles at
 each iteration so this clearly doesn't seem to be the way to do it. What is
 the preferred way of doing such repeated modifications to an RDD and how 
 can
 the accumulation of overhead be minimized?
 
 Thanks!
 
 Rok
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



pyspark: Java null pointer exception when accessing broadcast variables

2015-02-10 Thread rok
I'm trying to use a broadcasted dictionary inside a map function and am
consistently getting Java null pointer exceptions. This is inside an IPython
session connected to a standalone spark cluster. I seem to recall being able
to do this before but at the moment I am at a loss as to what to try next.
Is there a limit to the size of broadcast variables? This one is rather
large (a few Gb dict). Thanks!

Rok



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Java-null-pointer-exception-when-accessing-broadcast-variables-tp21580.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark: Java null pointer exception when accessing broadcast variables

2015-02-10 Thread Rok Roskar
$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
... 4 more


What I find odd is that when I make the broadcast object, the logs don't show 
any significant amount of memory being allocated in any of the block managers 
-- but the dictionary is large, it's 8 Gb pickled on disk. 


On Feb 10, 2015, at 10:01 PM, Davies Liu dav...@databricks.com wrote:

 Could you paste the NPE stack trace here? It will better to create a
 JIRA for it, thanks!
 
 On Tue, Feb 10, 2015 at 10:42 AM, rok rokros...@gmail.com wrote:
 I'm trying to use a broadcasted dictionary inside a map function and am
 consistently getting Java null pointer exceptions. This is inside an IPython
 session connected to a standalone spark cluster. I seem to recall being able
 to do this before but at the moment I am at a loss as to what to try next.
 Is there a limit to the size of broadcast variables? This one is rather
 large (a few Gb dict). Thanks!
 
 Rok
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Java-null-pointer-exception-when-accessing-broadcast-variables-tp21580.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NegativeArraySizeException in pyspark when loading an RDD pickleFile

2015-01-29 Thread Rok Roskar
Thanks for the clarification on the partitioning.

I did what you suggested and tried reading in individual part-* files --
some of them are ~1.7Gb in size and that's where it's failing. When I
increase the number of partitions before writing to disk, it seems to work.
Would be nice if this was somehow automatically corrected!

Thanks,

Rok

On Wed, Jan 28, 2015 at 7:01 PM, Davies Liu dav...@databricks.com wrote:

 HadoopRDD will try to split the file as 64M partitions in size, so you
 got 1916+ partitions.
 (assume 100k per row, they are 80G in size).

 I think it has very small chance that one object or one batch will be
 bigger than 2G.
 Maybe there are a bug when it split the pickled file, could you create
 a RDD for each
 file, then see which file is cause the issue (maybe some of them)?

 On Wed, Jan 28, 2015 at 1:30 AM, Rok Roskar rokros...@gmail.com wrote:
  hi, thanks for the quick answer -- I suppose this is possible, though I
  don't understand how it could come about. The largest individual RDD
  elements are ~ 1 Mb in size (most are smaller) and the RDD is composed of
  800k of them. The file is saved in 134 parts, but is being read in using
  some 1916+ partitions (I don't know why actually -- how does this number
  come about?). How can I check if any objects/batches are exceeding 2Gb?
 
  Thanks,
 
  Rok
 
 
  On Tue, Jan 27, 2015 at 7:55 PM, Davies Liu dav...@databricks.com
 wrote:
 
  Maybe it's caused by integer overflow, is it possible that one object
  or batch bigger than 2G (after pickling)?
 
  On Tue, Jan 27, 2015 at 7:59 AM, rok rokros...@gmail.com wrote:
   I've got an dataset saved with saveAsPickleFile using pyspark -- it
   saves
   without problems. When I try to read it back in, it fails with:
  
   Job aborted due to stage failure: Task 401 in stage 0.0 failed 4
 times,
   most
   recent failure: Lost task 401.3 in stage 0.0 (TID 449,
   e1326.hpc-lca.ethz.ch): java.lang.NegativeArraySizeException:
  
   org.apache.hadoop.io.BytesWritable.setCapacity(BytesWritable.java:119)
  
   org.apache.hadoop.io.BytesWritable.setSize(BytesWritable.java:98)
  
   org.apache.hadoop.io.BytesWritable.readFields(BytesWritable.java:153)
  
  
  
 org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
  
  
  
 org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
  
  
  
 org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:1875)
  
  
  
 org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1848)
  
  
  
 org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:103)
  
  
  
 org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:78)
  
   org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219)
  
   org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188)
  
   org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
  
  
  
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
   scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  
  
  
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:330)
  
  
  
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
  
  
  
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
  
  
  
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
  
   org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
  
  
  
 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
  
  
   Not really sure where to start looking for the culprit -- any
   suggestions
   most welcome. Thanks!
  
   Rok
  
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-in-pyspark-when-loading-an-RDD-pickleFile-tp21395.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



Re: NegativeArraySizeException in pyspark when loading an RDD pickleFile

2015-01-28 Thread Rok Roskar
hi, thanks for the quick answer -- I suppose this is possible, though I
don't understand how it could come about. The largest individual RDD
elements are ~ 1 Mb in size (most are smaller) and the RDD is composed of
800k of them. The file is saved in 134 parts, but is being read in using
some 1916+ partitions (I don't know why actually -- how does this number
come about?). How can I check if any objects/batches are exceeding 2Gb?

Thanks,

Rok


On Tue, Jan 27, 2015 at 7:55 PM, Davies Liu dav...@databricks.com wrote:

 Maybe it's caused by integer overflow, is it possible that one object
 or batch bigger than 2G (after pickling)?

 On Tue, Jan 27, 2015 at 7:59 AM, rok rokros...@gmail.com wrote:
  I've got an dataset saved with saveAsPickleFile using pyspark -- it saves
  without problems. When I try to read it back in, it fails with:
 
  Job aborted due to stage failure: Task 401 in stage 0.0 failed 4 times,
 most
  recent failure: Lost task 401.3 in stage 0.0 (TID 449,
  e1326.hpc-lca.ethz.ch): java.lang.NegativeArraySizeException:
 
  org.apache.hadoop.io.BytesWritable.setCapacity(BytesWritable.java:119)
  org.apache.hadoop.io.BytesWritable.setSize(BytesWritable.java:98)
 
  org.apache.hadoop.io.BytesWritable.readFields(BytesWritable.java:153)
 
 
 org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
 
 
 org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
 
 
 org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:1875)
 
 
 org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1848)
 
 
 org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:103)
 
 
 org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:78)
 
  org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219)
 
  org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188)
  org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 
 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 
 
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:330)
 
 
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
 
 
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
 
 
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
 
  org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
 
 
 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
 
 
  Not really sure where to start looking for the culprit -- any suggestions
  most welcome. Thanks!
 
  Rok
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-in-pyspark-when-loading-an-RDD-pickleFile-tp21395.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



NegativeArraySizeException in pyspark when loading an RDD pickleFile

2015-01-27 Thread rok
I've got an dataset saved with saveAsPickleFile using pyspark -- it saves
without problems. When I try to read it back in, it fails with: 

Job aborted due to stage failure: Task 401 in stage 0.0 failed 4 times, most
recent failure: Lost task 401.3 in stage 0.0 (TID 449,
e1326.hpc-lca.ethz.ch): java.lang.NegativeArraySizeException: 
   
org.apache.hadoop.io.BytesWritable.setCapacity(BytesWritable.java:119)
org.apache.hadoop.io.BytesWritable.setSize(BytesWritable.java:98)
   
org.apache.hadoop.io.BytesWritable.readFields(BytesWritable.java:153)
   
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
   
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
   
org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:1875)
   
org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1848)
   
org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:103)
   
org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:78)
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219)
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
   
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
   
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:330)
   
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
   
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
   
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
   
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)


Not really sure where to start looking for the culprit -- any suggestions
most welcome. Thanks!

Rok




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-in-pyspark-when-loading-an-RDD-pickleFile-tp21395.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: calculating the mean of SparseVector RDD

2015-01-12 Thread Rok Roskar
This was without using Kryo -- if I use kryo, I got errors about buffer
overflows (see above):

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
required: 8

Just calling colStats doesn't actually compute those statistics, does it?
It looks like the computation is only carried out once you call the .mean()
method.



On Sat, Jan 10, 2015 at 7:04 AM, Xiangrui Meng men...@gmail.com wrote:

 colStats() computes the mean values along with several other summary
 statistics, which makes it slower. How is the performance if you don't
 use kryo? -Xiangrui

 On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar rokros...@gmail.com wrote:
  thanks for the suggestion -- however, looks like this is even slower.
 With
  the small data set I'm using, my aggregate function takes ~ 9 seconds and
  the colStats.mean() takes ~ 1 minute. However, I can't get it to run with
  the Kyro serializer -- I get the error:
 
  com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
  required: 8
 
  is there an easy/obvious fix?
 
 
  On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote:
 
  There is some serialization overhead. You can try
 
 
 https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
  . -Xiangrui
 
  On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote:
   I have an RDD of SparseVectors and I'd like to calculate the means
   returning
   a dense vector. I've tried doing this with the following (using
 pyspark,
   spark v1.2.0):
  
   def aggregate_partition_values(vec1, vec2) :
   vec1[vec2.indices] += vec2.values
   return vec1
  
   def aggregate_combined_vectors(vec1, vec2) :
   if all(vec1 == vec2) :
   # then the vector came from only one partition
   return vec1
   else:
   return vec1 + vec2
  
   means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
   aggregate_combined_vectors)
   means = means / nvals
  
   This turns out to be really slow -- and doesn't seem to depend on how
   many
   vectors there are so there seems to be some overhead somewhere that
 I'm
   not
   understanding. Is there a better way of doing this?
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 



Re: calculating the mean of SparseVector RDD

2015-01-09 Thread Rok Roskar
thanks for the suggestion -- however, looks like this is even slower. With
the small data set I'm using, my aggregate function takes ~ 9 seconds and
the colStats.mean() takes ~ 1 minute. However, I can't get it to run with
the Kyro serializer -- I get the error:

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
required: 8

is there an easy/obvious fix?


On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote:

 There is some serialization overhead. You can try

 https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
 . -Xiangrui

 On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote:
  I have an RDD of SparseVectors and I'd like to calculate the means
 returning
  a dense vector. I've tried doing this with the following (using pyspark,
  spark v1.2.0):
 
  def aggregate_partition_values(vec1, vec2) :
  vec1[vec2.indices] += vec2.values
  return vec1
 
  def aggregate_combined_vectors(vec1, vec2) :
  if all(vec1 == vec2) :
  # then the vector came from only one partition
  return vec1
  else:
  return vec1 + vec2
 
  means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
  aggregate_combined_vectors)
  means = means / nvals
 
  This turns out to be really slow -- and doesn't seem to depend on how
 many
  vectors there are so there seems to be some overhead somewhere that I'm
 not
  understanding. Is there a better way of doing this?
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



calculating the mean of SparseVector RDD

2015-01-07 Thread rok
I have an RDD of SparseVectors and I'd like to calculate the means returning
a dense vector. I've tried doing this with the following (using pyspark,
spark v1.2.0): 

def aggregate_partition_values(vec1, vec2) :
vec1[vec2.indices] += vec2.values
return vec1

def aggregate_combined_vectors(vec1, vec2) : 
if all(vec1 == vec2) : 
# then the vector came from only one partition
return vec1
else:
return vec1 + vec2

means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
aggregate_combined_vectors)
means = means / nvals

This turns out to be really slow -- and doesn't seem to depend on how many
vectors there are so there seems to be some overhead somewhere that I'm not
understanding. Is there a better way of doing this? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ensuring RDD indices remain immutable

2014-12-01 Thread rok
I have an RDD that serves as a feature look-up table downstream in my
analysis. I create it using the zipWithIndex() and because I suppose that
the elements of the RDD could end up in a different order if it is
regenerated at any point, I cache it to try and ensure that the (feature --
index) mapping remains fixed. 

However, I'm having trouble verifying that this is actually robust -- can
someone comment whether using such a mapping should be stable or is there
another preferred method? zipWithUniqueID() isn't optimal since max ID
generated this way is always greater than the number of features so I'm
trying to avoid it. 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ensuring RDD indices remain immutable

2014-12-01 Thread rok
true though I was hoping to avoid having to sort... maybe there's no way
around it. Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094p20104.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



minimizing disk I/O

2014-11-13 Thread rok
I'm trying to understand the disk I/O patterns for Spark -- specifically, I'd
like to reduce the number of files that are being written during shuffle
operations. A couple questions: 

* is the amount of file I/O performed independent of the memory I allocate
for the shuffles? 

* if this is the case, what is the purpose of this memory and is there any
way to see how much of it is actually being used?
 
* how can I minimize the number of files being written? With 24 cores per
node, the filesystem can't handle the large amount of simultaneous I/O very
well so it limits the number of cores I can use... 

Thanks for any insight you might have! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/minimizing-disk-I-O-tp18845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: using LogisticRegressionWithSGD.train in Python crashes with Broken pipe

2014-11-13 Thread rok
Hi, I'm using Spark 1.1.0. There is no error on the executors -- it appears
as if the job never gets properly dispatched -- the only message is the
Broken Pipe message in the driver. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182p18846.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



using LogisticRegressionWithSGD.train in Python crashes with Broken pipe

2014-11-05 Thread rok
I have a dataset comprised of ~200k labeled points whose features are
SparseVectors with ~20M features. I take 5% of the data for a training set. 

 model = LogisticRegressionWithSGD.train(training_set)

fails with 

ERROR:py4j.java_gateway:Error while sending or receiving.
Traceback (most recent call last):
  File
/cluster/home/roskarr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 472, in send_command
self.socket.sendall(command.encode('utf-8'))
  File /cluster/home/roskarr/miniconda/lib/python2.7/socket.py, line 224,
in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe

I'm at a loss as to where to begin to debug this... any suggestions? Thanks,

Rok



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: using LogisticRegressionWithSGD.train in Python crashes with Broken pipe

2014-11-05 Thread rok
yes, the training set is fine, I've verified it. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182p18195.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



sharing RDDs between PySpark and Scala

2014-10-30 Thread rok
I'm processing some data using PySpark and I'd like to save the RDDs to disk
(they are (k,v) RDDs of strings and SparseVector types) and read them in
using Scala to run them through some other analysis. Is this possible? 

Thanks,

Rok



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sharing-RDDs-between-PySpark-and-Scala-tp17718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



repartitioning an RDD yielding imbalance

2014-08-28 Thread Rok Roskar
I've got an RDD where each element is a long string (a whole document). I'm 
using pyspark so some of the handy partition-handling functions aren't 
available, and I count the number of elements in each partition with: 

def count_partitions(id, iterator): 
c = sum(1 for _ in iterator)
yield (id,c) 

 rdd.mapPartitionsWithSplit(count_partitions).collectAsMap()

This returns the following: 

{0: 866, 1: 1158, 2: 828, 3: 876}

But if I do: 

 rdd.repartition(8).mapPartitionsWithSplit(count_partitions).collectAsMap()

I get

{0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 3594, 7: 134}

Why this strange redistribution of elements? I'm obviously misunderstanding how 
spark does the partitioning -- is it a problem with having a list of strings as 
an RDD? 

Help vey much appreciated! 

Thanks,

Rok


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



out of memory errors -- per core memory limits?

2014-08-21 Thread Rok Roskar
I am having some issues with processes running out of memory and I'm wondering 
if I'm setting things up incorrectly. 

I am running a job on two nodes with 24 cores and 256Gb of memory each. I start 
the pyspark shell with SPARK_EXECUTOR_MEMORY=210gb. When I run the job with 
anything more than 8 cores, the processes start dying off with out of memory 
errors. But when I watch the memory consumption using top on the two execute 
nodes, the individual processes never seem to exceed the per-core memory and 
the nodes themselves are far from running out of memory. So I'm wondering if 
Spark is setting the per-core memory limit somewhere? 

Thanks,

Rok




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark, numpy arrays and binary data

2014-08-07 Thread Rok Roskar
thanks for the quick answer!

 numpy array only can support basic types, so we can not use it during 
 collect()
 by default.
 

sure, but if you knew that a numpy array went in on one end, you could safely 
use it on the other end, no? Perhaps it would require an extension of the RDD 
class and overriding the colect() method. 

 Could you give a short example about how numpy array is used in your project?
 

sure -- basically our main data structure is a container class (acts like a 
dictionary) that holds various arrays that represent particle data. Each 
particle has various properties, position, velocity, mass etc. you get at these 
individual properties by calling something like 

s['pos']

where 's' is the container object and 'pos' is the name of the array. A really 
common use case then is to select particles based on their properties and do 
some plotting, or take a slice of the particles, e.g. you might do 

r = np.sqrt((s['pos']**2).sum(axis=1))
ind = np.where(r  5)
plot(s[ind]['x'], s[ind]['y'])

Internally, the various arrays are kept in a dictionary -- I'm hoping to write 
a class that keeps them in an RDD instead. To the user, this would have to be 
transparent, i.e. if the user wants to get at the data for specific particles, 
she would just have to do 

s['pos'][1,5,10] 

for example, and the data would be fetched for her from the RDD just like it 
would be if she were simply using the usual single-machine version. This is why 
the writing to/from files when retrieving data from the RDD really is a no-go 
-- can you recommend how this can be circumvented? 


 
 * is there a preferred way to read binary data off a local disk directly
 into an RDD? Our I/O routines are built to read data in chunks and each
 chunk could be read by a different process/RDD, but it's not clear to me how
 to accomplish this with the existing API. Since the idea is to process data
 sets that don't fit into a single node's memory, reading first and then
 distributing via sc.parallelize is obviously not an option.
 
 If you already know how to partition the data, then you could use
 sc.parallelize()
 to distribute the description of your data, then read the data in parallel by
 given descriptions.
 
 For examples, you can partition your data into (path, start, length), then
 
 partitions = [(path1, start1, length), (path1, start2, length), ...]
 
 def read_chunk(path, start, length):
  f = open(path)
  f.seek(start)
  data = f.read(length)
  #processing the data
 
 rdd = sc.parallelize(partitions, len(partitions)).flatMap(read_chunk)
 


right... this is totally obvious in retrospect!  Thanks!


Rok




 * related to the first question -- when an RDD is created by parallelizing a
 numpy array, the array gets serialized and distributed. I see in the source
 that it actually gets written into a file first (!?) -- but surely the Py4J
 bottleneck for python array types (mentioned in the source comment) doesn't
 really apply to numpy arrays? Is it really necessary to dump the data onto
 disk first? Conversely, the collect() seems really slow and I suspect that
 this is due to the combination of disk I/O and python list creation. Are
 there any ways of getting around this if numpy arrays are being used?
 
 
 I'd be curious about any other best-practices tips anyone might have for
 running pyspark with numpy data...!
 
 Thanks!
 
 
 Rok
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark, numpy arrays and binary data

2014-08-06 Thread Rok Roskar
Hello,

I'm interested in getting started with Spark to scale our scientific analysis 
package (http://pynbody.github.io) to larger data sets. The package is written 
in Python and makes heavy use of numpy/scipy and related frameworks. I've got a 
couple of questions that I have not been able to find easy answers to despite 
some research efforts... I hope someone here can clarify things for me a bit!

* is there a preferred way to read binary data off a local disk directly into 
an RDD? Our I/O routines are built to read data in chunks and each chunk could 
be read by a different process/RDD, but it's not clear to me how to accomplish 
this with the existing API. Since the idea is to process data sets that don't 
fit into a single node's memory, reading first and then distributing via 
sc.parallelize is obviously not an option. 

* related to the first question -- when an RDD is created by parallelizing a 
numpy array, the array gets serialized and distributed. I see in the source 
that it actually gets written into a file first (!?) -- but surely the Py4J 
bottleneck for python array types (mentioned in the source comment) doesn't 
really apply to numpy arrays? Is it really necessary to dump the data onto disk 
first? Conversely, the collect() seems really slow and I suspect that this is 
due to the combination of disk I/O and python list creation. Are there any ways 
of getting around this if numpy arrays are being used? 


I'd be curious about any other best-practices tips anyone might have for 
running pyspark with numpy data...! 

Thanks!


Rok