I've got a simple pyspark program that generates two CSV files and then carries out a leftOuterJoin (a fact RDD joined to a dimension RDD). The program works fine for smaller volumes of records, but when it goes beyond 3 million records for the fact dataset, I get the error below. I'm running PySpark via PyCharm and the information for my environment is:
OS: Windows 7 Python version: 2.7.9 Spark version: 1.1.1 Java version: 1.8 I've also included the py file I am using. I'd appreciate any help you can give me, MJ. ----------------------------ERROR MESSAGE---------------------------- C:\Python27\python.exe "C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py" Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/12/16 12:48:26 INFO SecurityManager: Changing view acls to: Mark Jones, 14/12/16 12:48:26 INFO SecurityManager: Changing modify acls to: Mark Jones, 14/12/16 12:48:26 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Mark Jones, ); users with modify permissions: Set(Mark Jones, ) 14/12/16 12:48:26 INFO Slf4jLogger: Slf4jLogger started 14/12/16 12:48:27 INFO Remoting: Starting remoting 14/12/16 12:48:27 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.19.83:51387] 14/12/16 12:48:27 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.19.83:51387] 14/12/16 12:48:27 INFO Utils: Successfully started service 'sparkDriver' on port 51387. 14/12/16 12:48:27 INFO SparkEnv: Registering MapOutputTracker 14/12/16 12:48:27 INFO SparkEnv: Registering BlockManagerMaster 14/12/16 12:48:27 INFO DiskBlockManager: Created local directory at C:\Users\MARKJO~1\AppData\Local\Temp\spark-local-20141216124827-11ef 14/12/16 12:48:27 INFO Utils: Successfully started service 'Connection manager for block manager' on port 51390. 14/12/16 12:48:27 INFO ConnectionManager: Bound socket to port 51390 with id = ConnectionManagerId(192.168.19.83,51390) 14/12/16 12:48:27 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 14/12/16 12:48:27 INFO BlockManagerMaster: Trying to register BlockManager 14/12/16 12:48:27 INFO BlockManagerMasterActor: Registering block manager 192.168.19.83:51390 with 265.1 MB RAM 14/12/16 12:48:27 INFO BlockManagerMaster: Registered BlockManager 14/12/16 12:48:27 INFO HttpFileServer: HTTP File server directory is C:\Users\MARKJO~1\AppData\Local\Temp\spark-3b772ca1-dbf7-4eaa-b62c-be5e73036f5d 14/12/16 12:48:27 INFO HttpServer: Starting HTTP Server 14/12/16 12:48:27 INFO Utils: Successfully started service 'HTTP file server' on port 51391. 14/12/16 12:48:27 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/12/16 12:48:27 INFO SparkUI: Started SparkUI at http://192.168.19.83:4040 14/12/16 12:48:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/16 12:48:28 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326) at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.<init>(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.<init>(SparkHadoopUtil.scala:36) at org.apache.spark.deploy.SparkHadoopUtil$.<init>(SparkHadoopUtil.scala:109) at org.apache.spark.deploy.SparkHadoopUtil$.<clinit>(SparkHadoopUtil.scala) at org.apache.spark.SparkContext.<init>(SparkContext.scala:228) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:53) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) 14/12/16 12:48:28 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@192.168.19.83:51387/user/HeartbeatReceiver 14/12/16 12:48:28 INFO MemoryStore: ensureFreeSpace(159118) called with curMem=0, maxMem=278019440 14/12/16 12:48:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 155.4 KB, free 265.0 MB) 14/12/16 12:48:28 INFO MemoryStore: ensureFreeSpace(159118) called with curMem=159118, maxMem=278019440 14/12/16 12:48:28 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 155.4 KB, free 264.8 MB) 14/12/16 12:48:28 INFO FileInputFormat: Total input paths to process : 1 14/12/16 12:48:28 INFO FileInputFormat: Total input paths to process : 1 14/12/16 12:48:28 INFO SparkContext: Starting job: count at C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py:79 14/12/16 12:48:28 INFO DAGScheduler: Registering RDD 8 (RDD at PythonRDD.scala:261) 14/12/16 12:48:28 INFO DAGScheduler: Got job 0 (count at C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py:79) with 9 output partitions (allowLocal=false) 14/12/16 12:48:28 INFO DAGScheduler: Final stage: Stage 0(count at C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py:79) 14/12/16 12:48:28 INFO DAGScheduler: Parents of final stage: List(Stage 1) 14/12/16 12:48:28 INFO DAGScheduler: Missing parents: List(Stage 1) 14/12/16 12:48:28 INFO DAGScheduler: Submitting Stage 1 (PairwiseRDD[8] at RDD at PythonRDD.scala:261), which has no missing parents 14/12/16 12:48:28 INFO MemoryStore: ensureFreeSpace(11672) called with curMem=318236, maxMem=278019440 14/12/16 12:48:28 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 11.4 KB, free 264.8 MB) 14/12/16 12:48:28 INFO DAGScheduler: Submitting 9 missing tasks from Stage 1 (PairwiseRDD[8] at RDD at PythonRDD.scala:261) 14/12/16 12:48:28 INFO TaskSchedulerImpl: Adding task set 1.0 with 9 tasks 14/12/16 12:48:28 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, localhost, PROCESS_LOCAL, 1275 bytes) 14/12/16 12:48:28 INFO Executor: Running task 0.0 in stage 1.0 (TID 0) 14/12/16 12:48:29 INFO HadoopRDD: Input split: file:/c:/tmp/fact.csv:0+33554432 14/12/16 12:48:29 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 14/12/16 12:48:29 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 14/12/16 12:48:29 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 14/12/16 12:48:29 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 14/12/16 12:48:29 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 14/12/16 12:48:51 INFO PythonRDD: Times: total = 22350, boot = 271, init = 59, finish = 22020 14/12/16 12:48:52 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 0) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\apps\spark\python\pyspark\worker.py", line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File "C:\apps\spark\python\pyspark\serializers.py", line 127, in dump_stream for obj in iterator: File "C:\apps\spark\python\pyspark\rdd.py", line 1495, in add_shuffle_key for (k, v) in iterator: File "C:\apps\spark\python\pyspark\shuffle.py", line 371, in _external_items False) File "C:\apps\spark\python\pyspark\shuffle.py", line 281, in mergeCombiners for k, v in iterator: File "C:\apps\spark\python\pyspark\serializers.py", line 133, in load_stream yield self._read_with_length(stream) File "C:\apps\spark\python\pyspark\serializers.py", line 146, in _read_with_length length = read_int(stream) File "C:\apps\spark\python\pyspark\serializers.py", line 465, in read_int return struct.unpack("!i", length)[0] error: unpack requires a string argument of length 4 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 14/12/16 12:48:52 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1275 bytes) 14/12/16 12:48:52 INFO Executor: Running task 1.0 in stage 1.0 (TID 1) 14/12/16 12:48:52 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\apps\spark\python\pyspark\worker.py", line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File "C:\apps\spark\python\pyspark\serializers.py", line 127, in dump_stream for obj in iterator: File "C:\apps\spark\python\pyspark\rdd.py", line 1495, in add_shuffle_key for (k, v) in iterator: File "C:\apps\spark\python\pyspark\shuffle.py", line 371, in _external_items False) File "C:\apps\spark\python\pyspark\shuffle.py", line 281, in mergeCombiners for k, v in iterator: File "C:\apps\spark\python\pyspark\serializers.py", line 133, in load_stream yield self._read_with_length(stream) File "C:\apps\spark\python\pyspark\serializers.py", line 146, in _read_with_length length = read_int(stream) File "C:\apps\spark\python\pyspark\serializers.py", line 465, in read_int return struct.unpack("!i", length)[0] error: unpack requires a string argument of length 4 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) 14/12/16 12:48:52 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 14/12/16 12:48:52 INFO TaskSchedulerImpl: Cancelling stage 1 14/12/16 12:48:52 INFO TaskSchedulerImpl: Stage 1 was cancelled 14/12/16 12:48:52 INFO Executor: Executor is trying to kill task 1.0 in stage 1.0 (TID 1) 14/12/16 12:48:52 INFO DAGScheduler: Failed to run count at C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py:79 Traceback (most recent call last): File "C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py", line 79, in <module> print("Joined fact count: %d" % joined_facts.count()) File "C:\apps\spark\python\pyspark\rdd.py", line 847, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "C:\apps\spark\python\pyspark\rdd.py", line 838, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File "C:\apps\spark\python\pyspark\rdd.py", line 759, in reduce vals = self.mapPartitions(func).collect() File "C:\apps\spark\python\pyspark\rdd.py", line 723, in collect bytesInJava = self._jrdd.collect().iterator() File "C:\apps\spark\python\build\py4j\java_gateway.py", line 538, in __call__ self.target_id, self.name) File "C:\apps\spark\python\build\py4j\protocol.py", line 300, in get_return_value format(target_id, '.', name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o55.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\apps\spark\python\pyspark\worker.py", line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File "C:\apps\spark\python\pyspark\serializers.py", line 127, in dump_stream for obj in iterator: File "C:\apps\spark\python\pyspark\rdd.py", line 1495, in add_shuffle_key for (k, v) in iterator: File "C:\apps\spark\python\pyspark\shuffle.py", line 371, in _external_items False) File "C:\apps\spark\python\pyspark\shuffle.py", line 281, in mergeCombiners for k, v in iterator: File "C:\apps\spark\python\pyspark\serializers.py", line 133, in load_stream yield self._read_with_length(stream) File "C:\apps\spark\python\pyspark\serializers.py", line 146, in _read_with_length length = read_int(stream) File "C:\apps\spark\python\pyspark\serializers.py", line 465, in read_int return struct.unpack("!i", length)[0] error: unpack requires a string argument of length 4 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Process finished with exit code 1 ----------------------------PYTHON FILE---------------------------- import random from pyspark import SparkContext, SparkConf, StorageLevel DIM_FILE_NAME="c:/tmp/dim.csv" FACT_FILE_NAME="c:/tmp/fact.csv" MIN_PARTITIONS=4 class DataGenerator(): def __init__(self): pass def generate_dimension_file(self, file_name, num_items=100): with open(file_name, 'w') as dim_file: for i in range(0, num_items): id = i name = "Dimension Item #%d" % id dim_file.write("%d,%s\n" %(id, name)) dim_file.flush() def generate_fact_file(self, file_name, num_items=100): with open(file_name, 'w') as fact_file: for i in range(0, num_items): fact_id = i name = "Fact Item #%d" % fact_id dim_id=random.randint(0,99) fact_file.write("%d,%d,%s\n" %(fact_id, dim_id, name)) fact_file.flush() class CsvConverter(): def __init__(self): pass def read_dimension(self, line): items = line.split(',') return (int(items[0]), items) def read_fact(self, line): items = line.split(',') return (int(items[1]), items) if __name__ == "__main__": # Generate data generator = DataGenerator() generator.generate_dimension_file(file_name=DIM_FILE_NAME, num_items=100) generator.generate_fact_file(file_name=FACT_FILE_NAME, num_items=5000000) # Load files into spark appName = "spark_error_sample" master = "local" conf = SparkConf().setAppName(appName).setMaster(master).set('spark.executor.memory', '4g') sc = SparkContext(conf=conf) dimension_lines=sc.textFile(DIM_FILE_NAME, minPartitions=MIN_PARTITIONS) fact_lines=sc.textFile(FACT_FILE_NAME, minPartitions=MIN_PARTITIONS) converter = CsvConverter() dimensions = dimension_lines.map(converter.read_dimension) facts = fact_lines.map(converter.read_fact) joined_facts = facts.leftOuterJoin(dimensions) print("Joined fact count: %d" % joined_facts.count()) print(joined_facts.first()) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-1-1-1-error-with-large-number-of-records-serializer-dump-stream-func-split-index-iterator-ou-tp20713.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org