Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
Hi, I would like to ask if it is possible to use generator, that generates data bigger than size of RAM across all the machines as the input for sc = SparkContext(), sc.paralelize(generator). I would like to create RDD this way. When I am trying to create RDD by sc.TextFile(file) where file has even bigger size than data generated by the generator everything works fine, but unfortunately I need to use sc.parallelize(generator) and it makes my OS to kill the spark job. I'm getting only this log and then the job is killed: 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/06 13:34:16 INFO Remoting: Starting remoting 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'sparkDriver' on port 41016. 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20141006133417-821e 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 42438. 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438) 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44768 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file server' on port 44768. 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to /tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py 14/10/06 13:34:18 INFO spark.SparkContext: Added file file:/root/generator_test.py at http://172.31.25.197:44768/files/generator_test.py with timestamp 1412602458065 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Connecting to master spark://ec2-54-164-72-236.compute-1.amazonaws.com:7077... 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141006133418-0046 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor added: app-20141006133418-0046/0 on worker-20141005074620-ip-172-31-30-40.ec2.internal-49979 (ip-172-31-30-40.ec2.internal:49979) with 1 cores 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141006133418-0046/0 on hostPort ip-172-31-30-40.ec2.internal:49979 with 1 cores, 512.0 MB RAM 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor updated: app-20141006133418-0046/0 is now RUNNING 14/10/06 13:34:21 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-30-40.ec2.internal:50877/user/Executor#-1621441852] with ID 0 14/10/06 13:34:21 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-30-40.ec2.internal:34460 with 267.3 MB RAM Thank you in advance for any advice or sugestion. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
sc.parallelize() to distribute a list of data into numbers of partitions, but generator can not be cut and serialized automatically. If you can partition your generator, then you can try this: sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x)) such as you want to generate xrange(M), M is huge, so sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1)) On Mon, Oct 6, 2014 at 7:16 AM, jan.zi...@centrum.cz wrote: Hi, I would like to ask if it is possible to use generator, that generates data bigger than size of RAM across all the machines as the input for sc = SparkContext(), sc.paralelize(generator). I would like to create RDD this way. When I am trying to create RDD by sc.TextFile(file) where file has even bigger size than data generated by the generator everything works fine, but unfortunately I need to use sc.parallelize(generator) and it makes my OS to kill the spark job. I'm getting only this log and then the job is killed: 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/06 13:34:16 INFO Remoting: Starting remoting 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'sparkDriver' on port 41016. 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20141006133417-821e 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 42438. 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438) 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44768 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file server' on port 44768. 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to /tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py 14/10/06 13:34:18 INFO spark.SparkContext: Added file file:/root/generator_test.py at http://172.31.25.197:44768/files/generator_test.py with timestamp 1412602458065 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Connecting to master spark://ec2-54-164-72-236.compute-1.amazonaws.com:7077... 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141006133418-0046 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor added: app-20141006133418-0046/0 on worker-20141005074620-ip-172-31-30-40.ec2.internal-49979 (ip-172-31-30-40.ec2.internal:49979) with 1 cores 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141006133418-0046/0 on hostPort ip-172-31-30-40.ec2.internal:49979 with 1 cores, 512.0 MB RAM 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor updated: app-20141006133418-0046/0 is now RUNNING 14/10/06 13:34:21 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-30-40.ec2.internal:50877/user/Executor#-1621441852] with ID 0
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
Hi, Thank you for your advice. It really might work, but to specify my problem a bit more, think of my data more like one generated item is one parsed wikipedia page. I am getting this generator from the parser and I don't want to save it to the storage, but directly apply parallelize and create RDD, based on your advice I'm now thinking that something like batching and creating several RDDs and then applying union on them might possibly be the way to go. Originaly I was thinking of calling the parsing function in flatMap on the RDD loaded from the xml file, but then I unfortunately had this problem http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim so now I am trying to parse the xml on the master node an directly put it to the RDD. __ Od: Davies Liu dav...@databricks.com Komu: jan.zi...@centrum.cz Datum: 06.10.2014 18:09 Předmět: Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize() sc.parallelize() to distribute a list of data into numbers of partitions, but generator can not be cut and serialized automatically. If you can partition your generator, then you can try this: sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x)) such as you want to generate xrange(M), M is huge, so sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1)) On Mon, Oct 6, 2014 at 7:16 AM, jan.zi...@centrum.cz wrote: Hi, I would like to ask if it is possible to use generator, that generates data bigger than size of RAM across all the machines as the input for sc = SparkContext(), sc.paralelize(generator). I would like to create RDD this way. When I am trying to create RDD by sc.TextFile(file) where file has even bigger size than data generated by the generator everything works fine, but unfortunately I need to use sc.parallelize(generator) and it makes my OS to kill the spark job. I'm getting only this log and then the job is killed: 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/06 13:34:16 INFO Remoting: Starting remoting 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'sparkDriver' on port 41016. 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20141006133417-821e 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 42438. 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438) 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44768 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file server' on port 44768. 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to /tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py 14/10/06 13:34:18 INFO spark.SparkContext: Added file file:/root/generator_test.py at http://172.31.25.197:44768/files/generator_test.py http://172.31.25.197:44768/files/generator_test.py with timestamp 1412602458065 14/10/06 13:34:18
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
On Mon, Oct 6, 2014 at 1:08 PM, jan.zi...@centrum.cz wrote: Hi, Thank you for your advice. It really might work, but to specify my problem a bit more, think of my data more like one generated item is one parsed wikipedia page. I am getting this generator from the parser and I don't want to save it to the storage, but directly apply parallelize and create RDD, based on your advice I'm now thinking that something like batching and creating several RDDs and then applying union on them might possibly be the way to go. Originaly I was thinking of calling the parsing function in flatMap on the RDD loaded from the xml file, but then I unfortunately had this problem http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim so now I am trying to parse the xml on the master node an directly put it to the RDD. gensim.corpora.wikicorpus.extract_pages should be call in flatMap() for better performance (or it will be the bottleneck in master). Because the function used in flatMap() is executed in worker, so you should make sure that the files (accessed by extract_pages) should be accessable by workers, putting them in a DFS or NFS in cluster mode. In local mode, may be you should use absolute path for the files. Davies __ Od: Davies Liu dav...@databricks.com Komu: jan.zi...@centrum.cz Datum: 06.10.2014 18:09 Předmět: Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize() sc.parallelize() to distribute a list of data into numbers of partitions, but generator can not be cut and serialized automatically. If you can partition your generator, then you can try this: sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x)) such as you want to generate xrange(M), M is huge, so sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1)) On Mon, Oct 6, 2014 at 7:16 AM, jan.zi...@centrum.cz wrote: Hi, I would like to ask if it is possible to use generator, that generates data bigger than size of RAM across all the machines as the input for sc = SparkContext(), sc.paralelize(generator). I would like to create RDD this way. When I am trying to create RDD by sc.TextFile(file) where file has even bigger size than data generated by the generator everything works fine, but unfortunately I need to use sc.parallelize(generator) and it makes my OS to kill the spark job. I'm getting only this log and then the job is killed: 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/06 13:34:16 INFO Remoting: Starting remoting 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'sparkDriver' on port 41016. 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20141006133417-821e 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 42438. 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438) 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44768 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file server' on port 44768. 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
Try a Hadoop Custom InputFormat - I can give you some samples - While I have not tried this an input split has only a length (could be ignores if the format treats as non splittable) and a String for a location. If the location is a URL into wikipedia the whole thing should work. Hadoop InputFormats seem to be the best way to get large (say multi gigabyte files) into RDDs
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
@Davies I know that gensim.corpora.wikicorpus.extract_pages will be for sure the bottle neck on the master node. Unfortunately I am using Spark on EC2 and I don't have enough space on my nodes to store there whole data that needs to be parsed by extract_pages. I have my data on S3 and I kind of hoped that after reading (sc.textFile(file_on_s3)) the data from S3 to RDD it will be possible to pass the RDD to extract_pages, this unfortunately does not work for me. If it'd work it'd be by far the best way to go for me. @Steve I can try Hadoop Custom InputFormat. It'd be great if you could send me some samples. But if I understand it correctly then I'm afraid that it won't work for me, because I actually don't have any url to wikipedia, I have only file, that is opened, parsed and returned as generator that generates parsed pagename and text from wikipedia (it can be also some non public wikipedia like site) __ Od: Steve Lewis lordjoe2...@gmail.com Komu: Davies Liu dav...@databricks.com Datum: 06.10.2014 22:39 Předmět: Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize() CC: user Try a Hadoop Custom InputFormat - I can give you some samples - While I have not tried this an input split has only a length (could be ignores if the format treats as non splittable) and a String for a location.If the location is a URL into wikipedia the whole thing should work.Hadoop InputFormats seem to be the best way to get large (say multi gigabyte files) into RDDs - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org