Why executor encourage OutOfMemoryException: Java heap space

2015-03-26 Thread sergunok
Hi all,

sometimes you can see OutOfMemoryException: Java heap space of executor in
Spark. There many ideas about how to work arounds.

My question is: how does executor execute tasks from the point of view of
memory usage and parallelism?

Picture in my mind is:
Executor is JVM instance. Number of parallel tasks which can be executed in
parallel threads inside single executor are contolled by --executor-cores
param of submit-job in case of YARN. Each executor owns --executor-memory
memory which is diveded in memory for RDD cache and memory for task
execution. I don't consider caching topic now.
It is very interesting to me how memory for task execution is used while
work of executor.

Let's consider an example when you have only map operations, no joins /
group/ reduce and no caching.

sc.textFile('test.txt') \
.map(lambda line: line.split()) \
.map(lambda item: int(item) + 10) \
.saveAsTextFile('out.txt')

How the input RDD will be processed in this case? I know RDDs are divided in
P partitions by some rules (for example by block size of HDFS).  So we will
have P partitions, P tasks and 1 stage (Am I right?). Let --executor-cores
be 2. In this case executor will process two partitions in parallel. Will it
try to load entire partitions in memory? Or will just call map chaines for
each element of partitions? What can encourage OutOfMemoryException: Java
heap space in this case?Large size of partition or large amount of
memory to be eated by processing of single element of RDD?

Please correct me and advise.

Serg.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-executor-encourage-OutOfMemoryException-Java-heap-space-tp22238.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Which RDD operations preserve ordering?

2015-03-26 Thread sergunok
Hi guys,

I don't have exact picture about preserving of ordering of elements of RDD
after executing of operations.

Which operations preserve it?
1) Map (Yes?)
2) ZipWithIndex (Yes or sometimes yes?)

Serg.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Which-RDD-operations-preserve-ordering-tp22239.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark UI tunneling

2015-03-23 Thread sergunok
Is it a way to tunnel Spark UI?

I tried to tunnel client-node:4040  but my browser was redirected from
localhost to some cluster locally visible domain name..

Maybe there is some startup option to encourage Spark UI be fully
accessiable just through single endpoint (address:port)?

Serg.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



log files of failed task

2015-03-23 Thread sergunok
Hi,

I executed a task on Spark in YARN and it failed.
I see just executor lost message from YARNClientScheduler, no further
details..
(I read ths error can be connected to spark.yarn.executor.memoryOverhead
setting and already played with this param)

How to go more deeply in details in log files and find exact reason? How can
log of failed task be examined?

Unfortunately I haven't access to UI of Spark just can use command line.

Thanks!

Serg.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/log-files-of-failed-task-tp22183.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



calculating TF-IDF for large 100GB dataset problems

2015-03-19 Thread sergunok
Hi,

I try to vectorize on yarn cluster corpus of texts (about 500K texts in 13
files - 100GB totally) located in HDFS .

This process already token about 20 hours on 3 node cluster with 6 cores,
20GB RAM on each node.

In my opinion it's to long :-)

I started the task with the following command:
spark-submit --master yarn --num-executors 9 --executor-memory 5GB
--excutor-cores=2 --driver-memory 5GB weight.py  

weight.py:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import Normalizer

conf = SparkConf() \
.set(spark.hadoop.validateOutputSpecs, false) \
.set(spark.yarn.executor.memoryOverhead, 900)
sc = SparkContext(conf=conf)


# reading files from directory 'in/texts.txt' in HDFS
texts=sc.textFile('in/texts.txt') \
.map(lambda line: line.split())

hashingTF = HashingTF()
tf = hashingTF.transform(texts)

tf.cache()
idf = IDF(minDocFreq=100).fit(tf)
tfidf = idf.transform(tf)

n=Normalizer()

normalized=n.transform(tfidf)

def x2((vec, num)):
triples=[]
for id, weight in zip(vec.indices, vec.values):
triples.append((num, id, weight))
return triples

# I use zipWithIndex to enumerate documents
normalized.zipWithIndex() \
.flatMap(x2) \
.map(lambda t: '{}\t{}\t{}'.format(t[0],t[1],t[2])) \
.saveAsTextFile('out/weights.txt')

1) What could be a bottleneck? 
Unfortunately I don't have access to the web UI.
In the log file I see stages: 0,1,2,3
Stage 0 MapPartitionsRDD[6] at mapPartitionsWithIndex at
RDDFunctions.scala:108 with 584 tasks completed very quick
Stage 1 MappedRDD[8] at values at RDDFunctions.scala:110 (23 tasks) -
quick too
Stage 2 zipWithIndex (584 tasks) was long (17 hours)
Stage 3 saveAsTextFile (584 tasks) - too (still executing about 2 hours)

I don't understand bounds of Stages 0,1..
And don't understand why I I see numbers like 584 or 23 tasks on stages.


2) On previous start of this task I saw a lot of executor lost errors of
yarn scheduler. Later I added .set(spark.yarn.executor.memoryOverhead,
900) setting in code and now I see only a few such messages.  Could it be
a reason of poor performance?

Please advise!

Any explainations appreciated!

Serg.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/calculating-TF-IDF-for-large-100GB-dataset-problems-tp22144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD ordering after map

2015-03-18 Thread sergunok
Does map(...) preserve ordering of original RDD?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-ordering-after-map-tp22129.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MEMORY_ONLY vs MEMORY_AND_DISK

2015-03-18 Thread sergunok
What persistance level is better if RDD to be cached is heavily to be
recalculated?
Am I right it is MEMORY_AND_DISK?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MEMORY-ONLY-vs-MEMORY-AND-DISK-tp22130.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Processing of text file in large gzip archive

2015-03-16 Thread sergunok
I have a 30GB gzip file (originally that is text file where each line
represents text document) in HDFS and Spark 1.2.0 under YARN cluster with 3
worker nodes with 64GB RAM and 4 cores on each node.
Replictaion factor for my file is 3.

I tried to implement simple pyspark script to parse this file and represent
it in tf-idf:

Something like:
lines=sc.textFile('file.gz')
docs=lines.map(lambda: line.split(' '))

hashingTF=HashingTF()
tf=hashingTF.transform(docs)

tf.cache()

idf=IDF().fit(tf)
tfidf=idf.transform(tf)

tfidf.map(lambda t: ' '.join([u'{}:{}'.format(t[0], t[1]) for t in
zip(t.indices, t.values)])) \
.saveAsTextFile('tfidf.txt')

I started the scipt with:
spark-submit --master yarn --num-executors 24 script.py

No comments about why I selected 24 executors - that is just first try.


I saw in the output that all 24 executors and corresponding blockmanagers
with 0.5 GB on each of them were started on 3 nodes but output stops on
messages:
INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on node3:36765
(size: 49.7 KB, free: 530.0 MB)
INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on node3:36765
(size: 21.6 KB, free: 529.9 MB)

I'm waiting already about 1 hour and don't see any changes. (Unfortunately I
cannot monitor the cluster via Web UI)

My main question is it generally speaking normal time of processing for such
volume of data and such cluster?
Is it ok that output stops on Added broadcast...?
Is it ok to read gzip archive via sc.textFile(..) in code or is it better to
unpack it before (from performance purposes)?
How to monitor Spark task via command line?
Please advise about some tuning.

Thanks!

Sergey.














--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Processing-of-text-file-in-large-gzip-archive-tp22073.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SVD transform of large matrix with MLlib

2015-03-11 Thread sergunok
Does somebody used SVD from MLlib for very large (like 10^6 x 10^7) sparse
matrix?
What time did it take?
What implementation of SVD is used in MLLib? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SVD-transform-of-large-matrix-with-MLlib-tp22005.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



can not submit job to spark in windows

2015-02-26 Thread sergunok
Hi!

I downloaded and extracted Spark to local folder under windows 7 and have
successfully played with it in pyspark interactive shell.

BUT

When I try to use spark-submit (for example: job-submit pi.py ) I get:

C:\spark-1.2.1-bin-hadoop2.4\binspark-submit.cmd pi.py
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/02/26 18:21:37 INFO SecurityManager: Changing view acls to: sergun
15/02/26 18:21:37 INFO SecurityManager: Changing modify acls to: sergun
15/02/26 18:21:37 INFO SecurityManager: SecurityManager: authentication
disabled
; ui acls disabled; users with view permissions: Set(sergun); users with mo
dify permissions: Set(user)
15/02/26 18:21:38 INFO Slf4jLogger: Slf4jLogger started
15/02/26 18:21:38 INFO Remoting: Starting remoting
15/02/26 18:21:39 INFO Remoting: Remoting started; listening on addresses
:[akka
.tcp://sparkDriver@mypc:56640]
15/02/26 18:21:39 INFO Utils: Successfully started service 'sparkDriver' on
port
 56640.
15/02/26 18:21:39 INFO SparkEnv: Registering MapOutputTracker
15/02/26 18:21:39 INFO SparkEnv: Registering BlockManagerMaster
15/02/26 18:21:39 INFO DiskBlockManager: Created local directory at
C:\Users\sergun\AppData\Local\Temp\spark-adddeb0b-d6c8-4720-92e3-05255d46ea66\spark-c65cd4
06-28a4-486d-a1ad-92e4814df6fa
15/02/26 18:21:39 INFO MemoryStore: MemoryStore started with capacity 265.0
MB
15/02/26 18:21:40 WARN NativeCodeLoader: Unable to load native-hadoop
library fo
r your platform... using builtin-java classes where applicable
15/02/26 18:21:40 ERROR Shell: Failed to locate the winutils binary in the
hadoo
p binary path
java.io.IOException: Could not locate executable C:\\bin\winutils.exe in the
Had
oop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
at org.apache.hadoop.util.Shell.clinit(Shell.java:326)
at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76)
at
org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.init(Groups.java:77)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Group
s.java:240)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupI
nformation.java:255)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(User
GroupInformation.java:283)
at
org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:
44)
at
org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala
:214)
at
org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca
la)
at
org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1873)
at
org.apache.spark.storage.BlockManager.init(BlockManager.scala:105)
at
org.apache.spark.storage.BlockManager.init(BlockManager.scala:180)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159)
at org.apache.spark.SparkContext.init(SparkContext.scala:240)
at
org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.sc
ala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
Source)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
Sou
rce)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand
.java:79)
at
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)
15/02/26 18:21:41 INFO HttpFileServer: HTTP File server directory is
C:\Users\sergun\AppData\Local\Temp\spark-79f2a924-4fff-432c-abc8-ac9c6c4ee0c7\spark-1f295
e28-f0db-4daf-b877-2a47990b6e88
15/02/26 18:21:41 INFO HttpServer: Starting HTTP Server
15/02/26 18:21:41 INFO Utils: Successfully started service 'HTTP file
server' on
 port 56641.
15/02/26 18:21:41 INFO Utils: Successfully started service 'SparkUI' on port
404
0.
15/02/26 18:21:41 INFO SparkUI: Started SparkUI at http://mypc:4040
15/02/26 18:21:42 INFO Utils: Copying C:\spark-1.2.1-bin-hadoop2.4\bin\pi.py
to
C:\Users\sergun\AppData\Local\Temp\spark-76a21028-ccce-4308-9e70-09c3cfa76477\
spark-56b32155-2779-4345-9597-2bfa6a87a51d\pi.py
Traceback (most recent call last):
  File C:/spark-1.2.1-bin-hadoop2.4/bin/pi.py, line 29, in module
sc = SparkContext(appName=PythonPi)
  File C:\spark-1.2.1-bin-hadoop2.4\python\pyspark\context.py, line 105,
in __
init__
conf, jsc)
  File