sc.parallelize() to distribute a list of data into numbers of partitions, but
generator can not be cut and serialized automatically.

If you can partition your generator, then you can try this:

sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x))

such as you want to generate xrange(M), M is huge, so
sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1))

On Mon, Oct 6, 2014 at 7:16 AM,  <jan.zi...@centrum.cz> wrote:
> Hi,
>
>
> I would like to ask if it is possible to use generator, that generates data
> bigger than size of RAM across all the machines as the input for sc =
> SparkContext(), sc.paralelize(generator). I would like to create RDD this
> way. When I am trying to create RDD by sc.TextFile(file) where file has even
> bigger size than data generated by the generator everything works fine, but
> unfortunately I need to use sc.parallelize(generator) and it makes my OS to
> kill the spark job.
>
>
>
> I'm getting only this log and then the job is killed:
>
>
>
> 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root,
>
> 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root,
>
> 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(root, ); users with modify permissions: Set(root, )
>
> 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started
>
> 14/10/06 13:34:16 INFO Remoting: Starting remoting
>
> 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]
>
> 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]
>
> 14/10/06 13:34:17 INFO util.Utils: Successfully started service
> 'sparkDriver' on port 41016.
>
> 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker
>
> 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster
>
> 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at
> /mnt/spark/spark-local-20141006133417-821e
>
> 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection
> manager for block manager' on port 42438.
>
> 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438
> with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438)
>
> 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with
> capacity 267.3 MB
>
> 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register
> BlockManager
>
> 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block
> manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM
>
> 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager
>
> 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is
> /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523
>
> 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server
>
> 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT
>
> 14/10/06 13:34:17 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:44768
>
> 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file
> server' on port 44768.
>
> 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT
>
> 14/10/06 13:34:17 INFO server.AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:4040
>
> 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on
> port 4040.
>
> 14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at
> http://ec2-54-164-72-236.compute-1.amazonaws.com:4040
>
> 14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to
> /tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py
>
> 14/10/06 13:34:18 INFO spark.SparkContext: Added file
> file:/root/generator_test.py at
> http://172.31.25.197:44768/files/generator_test.py with timestamp
> 1412602458065
>
> 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Connecting to master
> spark://ec2-54-164-72-236.compute-1.amazonaws.com:7077...
>
> 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend
> is ready for scheduling beginning after reached minRegisteredResourcesRatio:
> 0.0
>
> 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Connected to
> Spark cluster with app ID app-20141006133418-0046
>
> 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor added:
> app-20141006133418-0046/0 on
> worker-20141005074620-ip-172-31-30-40.ec2.internal-49979
> (ip-172-31-30-40.ec2.internal:49979) with 1 cores
>
> 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Granted executor
> ID app-20141006133418-0046/0 on hostPort ip-172-31-30-40.ec2.internal:49979
> with 1 cores, 512.0 MB RAM
>
> 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor updated:
> app-20141006133418-0046/0 is now RUNNING
>
> 14/10/06 13:34:21 INFO cluster.SparkDeploySchedulerBackend: Registered
> executor:
> Actor[akka.tcp://sparkExecutor@ip-172-31-30-40.ec2.internal:50877/user/Executor#-1621441852]
> with ID 0
>
> 14/10/06 13:34:21 INFO storage.BlockManagerMasterActor: Registering block
> manager ip-172-31-30-40.ec2.internal:34460 with 267.3 MB RAM
>
>
> Thank you in advance for any advice or sugestion.
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to