I've got a simple pyspark program that generates two CSV files and then
carries out a leftOuterJoin (a fact RDD joined to a dimension RDD). The
program works fine for smaller volumes of records, but when it goes beyond 3
million records for the fact dataset, I get the error below. I'm running
PySpark via PyCharm and the information for my environment is:

OS: Windows 7
Python version: 2.7.9
Spark version: 1.1.1
Java version: 1.8

I've also included the py file I am using. I'd appreciate any help you can
give me, 

MJ.


----------------------------ERROR MESSAGE----------------------------
C:\Python27\python.exe "C:/Users/Mark
Jones/PycharmProjects/spark_test/spark_error_sample.py"
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/12/16 12:48:26 INFO SecurityManager: Changing view acls to: Mark Jones,
14/12/16 12:48:26 INFO SecurityManager: Changing modify acls to: Mark Jones,
14/12/16 12:48:26 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(Mark Jones, );
users with modify permissions: Set(Mark Jones, )
14/12/16 12:48:26 INFO Slf4jLogger: Slf4jLogger started
14/12/16 12:48:27 INFO Remoting: Starting remoting
14/12/16 12:48:27 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@192.168.19.83:51387]
14/12/16 12:48:27 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@192.168.19.83:51387]
14/12/16 12:48:27 INFO Utils: Successfully started service 'sparkDriver' on
port 51387.
14/12/16 12:48:27 INFO SparkEnv: Registering MapOutputTracker
14/12/16 12:48:27 INFO SparkEnv: Registering BlockManagerMaster
14/12/16 12:48:27 INFO DiskBlockManager: Created local directory at
C:\Users\MARKJO~1\AppData\Local\Temp\spark-local-20141216124827-11ef
14/12/16 12:48:27 INFO Utils: Successfully started service 'Connection
manager for block manager' on port 51390.
14/12/16 12:48:27 INFO ConnectionManager: Bound socket to port 51390 with id
= ConnectionManagerId(192.168.19.83,51390)
14/12/16 12:48:27 INFO MemoryStore: MemoryStore started with capacity 265.1
MB
14/12/16 12:48:27 INFO BlockManagerMaster: Trying to register BlockManager
14/12/16 12:48:27 INFO BlockManagerMasterActor: Registering block manager
192.168.19.83:51390 with 265.1 MB RAM
14/12/16 12:48:27 INFO BlockManagerMaster: Registered BlockManager
14/12/16 12:48:27 INFO HttpFileServer: HTTP File server directory is
C:\Users\MARKJO~1\AppData\Local\Temp\spark-3b772ca1-dbf7-4eaa-b62c-be5e73036f5d
14/12/16 12:48:27 INFO HttpServer: Starting HTTP Server
14/12/16 12:48:27 INFO Utils: Successfully started service 'HTTP file
server' on port 51391.
14/12/16 12:48:27 INFO Utils: Successfully started service 'SparkUI' on port
4040.
14/12/16 12:48:27 INFO SparkUI: Started SparkUI at http://192.168.19.83:4040
14/12/16 12:48:27 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/12/16 12:48:28 ERROR Shell: Failed to locate the winutils binary in the
hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in
the Hadoop binaries.
        at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
        at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
        at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326)
        at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
        at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
        at org.apache.hadoop.security.Groups.<init>(Groups.java:77)
        at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
        at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
        at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
        at 
org.apache.spark.deploy.SparkHadoopUtil.<init>(SparkHadoopUtil.scala:36)
        at
org.apache.spark.deploy.SparkHadoopUtil$.<init>(SparkHadoopUtil.scala:109)
        at 
org.apache.spark.deploy.SparkHadoopUtil$.<clinit>(SparkHadoopUtil.scala)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:228)
        at
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:53)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:214)
        at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
14/12/16 12:48:28 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@192.168.19.83:51387/user/HeartbeatReceiver
14/12/16 12:48:28 INFO MemoryStore: ensureFreeSpace(159118) called with
curMem=0, maxMem=278019440
14/12/16 12:48:28 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 155.4 KB, free 265.0 MB)
14/12/16 12:48:28 INFO MemoryStore: ensureFreeSpace(159118) called with
curMem=159118, maxMem=278019440
14/12/16 12:48:28 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 155.4 KB, free 264.8 MB)
14/12/16 12:48:28 INFO FileInputFormat: Total input paths to process : 1
14/12/16 12:48:28 INFO FileInputFormat: Total input paths to process : 1
14/12/16 12:48:28 INFO SparkContext: Starting job: count at C:/Users/Mark
Jones/PycharmProjects/spark_test/spark_error_sample.py:79
14/12/16 12:48:28 INFO DAGScheduler: Registering RDD 8 (RDD at
PythonRDD.scala:261)
14/12/16 12:48:28 INFO DAGScheduler: Got job 0 (count at C:/Users/Mark
Jones/PycharmProjects/spark_test/spark_error_sample.py:79) with 9 output
partitions (allowLocal=false)
14/12/16 12:48:28 INFO DAGScheduler: Final stage: Stage 0(count at
C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py:79)
14/12/16 12:48:28 INFO DAGScheduler: Parents of final stage: List(Stage 1)
14/12/16 12:48:28 INFO DAGScheduler: Missing parents: List(Stage 1)
14/12/16 12:48:28 INFO DAGScheduler: Submitting Stage 1 (PairwiseRDD[8] at
RDD at PythonRDD.scala:261), which has no missing parents
14/12/16 12:48:28 INFO MemoryStore: ensureFreeSpace(11672) called with
curMem=318236, maxMem=278019440
14/12/16 12:48:28 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 11.4 KB, free 264.8 MB)
14/12/16 12:48:28 INFO DAGScheduler: Submitting 9 missing tasks from Stage 1
(PairwiseRDD[8] at RDD at PythonRDD.scala:261)
14/12/16 12:48:28 INFO TaskSchedulerImpl: Adding task set 1.0 with 9 tasks
14/12/16 12:48:28 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID
0, localhost, PROCESS_LOCAL, 1275 bytes)
14/12/16 12:48:28 INFO Executor: Running task 0.0 in stage 1.0 (TID 0)
14/12/16 12:48:29 INFO HadoopRDD: Input split:
file:/c:/tmp/fact.csv:0+33554432
14/12/16 12:48:29 INFO deprecation: mapred.tip.id is deprecated. Instead,
use mapreduce.task.id
14/12/16 12:48:29 INFO deprecation: mapred.task.id is deprecated. Instead,
use mapreduce.task.attempt.id
14/12/16 12:48:29 INFO deprecation: mapred.task.is.map is deprecated.
Instead, use mapreduce.task.ismap
14/12/16 12:48:29 INFO deprecation: mapred.task.partition is deprecated.
Instead, use mapreduce.task.partition
14/12/16 12:48:29 INFO deprecation: mapred.job.id is deprecated. Instead,
use mapreduce.job.id
14/12/16 12:48:51 INFO PythonRDD: Times: total = 22350, boot = 271, init =
59, finish = 22020
14/12/16 12:48:52 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 0)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "C:\apps\spark\python\pyspark\worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "C:\apps\spark\python\pyspark\serializers.py", line 127, in
dump_stream
    for obj in iterator:
  File "C:\apps\spark\python\pyspark\rdd.py", line 1495, in add_shuffle_key
    for (k, v) in iterator:
  File "C:\apps\spark\python\pyspark\shuffle.py", line 371, in
_external_items
    False)
  File "C:\apps\spark\python\pyspark\shuffle.py", line 281, in
mergeCombiners
    for k, v in iterator:
  File "C:\apps\spark\python\pyspark\serializers.py", line 133, in
load_stream
    yield self._read_with_length(stream)
  File "C:\apps\spark\python\pyspark\serializers.py", line 146, in
_read_with_length
    length = read_int(stream)
  File "C:\apps\spark\python\pyspark\serializers.py", line 465, in read_int
    return struct.unpack("!i", length)[0]
error: unpack requires a string argument of length 4

        at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
        at
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
14/12/16 12:48:52 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID
1, localhost, PROCESS_LOCAL, 1275 bytes)
14/12/16 12:48:52 INFO Executor: Running task 1.0 in stage 1.0 (TID 1)
14/12/16 12:48:52 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 0,
localhost): org.apache.spark.api.python.PythonException: Traceback (most
recent call last):
  File "C:\apps\spark\python\pyspark\worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "C:\apps\spark\python\pyspark\serializers.py", line 127, in
dump_stream
    for obj in iterator:
  File "C:\apps\spark\python\pyspark\rdd.py", line 1495, in add_shuffle_key
    for (k, v) in iterator:
  File "C:\apps\spark\python\pyspark\shuffle.py", line 371, in
_external_items
    False)
  File "C:\apps\spark\python\pyspark\shuffle.py", line 281, in
mergeCombiners
    for k, v in iterator:
  File "C:\apps\spark\python\pyspark\serializers.py", line 133, in
load_stream
    yield self._read_with_length(stream)
  File "C:\apps\spark\python\pyspark\serializers.py", line 146, in
_read_with_length
    length = read_int(stream)
  File "C:\apps\spark\python\pyspark\serializers.py", line 465, in read_int
    return struct.unpack("!i", length)[0]
error: unpack requires a string argument of length 4

       
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
       
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
       
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
       
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
       
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
       
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
14/12/16 12:48:52 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times;
aborting job
14/12/16 12:48:52 INFO TaskSchedulerImpl: Cancelling stage 1
14/12/16 12:48:52 INFO TaskSchedulerImpl: Stage 1 was cancelled
14/12/16 12:48:52 INFO Executor: Executor is trying to kill task 1.0 in
stage 1.0 (TID 1)
14/12/16 12:48:52 INFO DAGScheduler: Failed to run count at C:/Users/Mark
Jones/PycharmProjects/spark_test/spark_error_sample.py:79
Traceback (most recent call last):
  File "C:/Users/Mark
Jones/PycharmProjects/spark_test/spark_error_sample.py", line 79, in
<module>
    print("Joined fact count: %d" % joined_facts.count())
  File "C:\apps\spark\python\pyspark\rdd.py", line 847, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "C:\apps\spark\python\pyspark\rdd.py", line 838, in sum
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File "C:\apps\spark\python\pyspark\rdd.py", line 759, in reduce
    vals = self.mapPartitions(func).collect()
  File "C:\apps\spark\python\pyspark\rdd.py", line 723, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "C:\apps\spark\python\build\py4j\java_gateway.py", line 538, in
__call__
    self.target_id, self.name)
  File "C:\apps\spark\python\build\py4j\protocol.py", line 300, in
get_return_value
    format(target_id, '.', name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o55.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0
(TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback
(most recent call last):
  File "C:\apps\spark\python\pyspark\worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "C:\apps\spark\python\pyspark\serializers.py", line 127, in
dump_stream
    for obj in iterator:
  File "C:\apps\spark\python\pyspark\rdd.py", line 1495, in add_shuffle_key
    for (k, v) in iterator:
  File "C:\apps\spark\python\pyspark\shuffle.py", line 371, in
_external_items
    False)
  File "C:\apps\spark\python\pyspark\shuffle.py", line 281, in
mergeCombiners
    for k, v in iterator:
  File "C:\apps\spark\python\pyspark\serializers.py", line 133, in
load_stream
    yield self._read_with_length(stream)
  File "C:\apps\spark\python\pyspark\serializers.py", line 146, in
_read_with_length
    length = read_int(stream)
  File "C:\apps\spark\python\pyspark\serializers.py", line 465, in read_int
    return struct.unpack("!i", length)[0]
error: unpack requires a string argument of length 4

       
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
       
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
       
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
       
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
       
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
       
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Process finished with exit code 1



----------------------------PYTHON FILE----------------------------
import random
from pyspark import SparkContext, SparkConf, StorageLevel

DIM_FILE_NAME="c:/tmp/dim.csv"
FACT_FILE_NAME="c:/tmp/fact.csv"
MIN_PARTITIONS=4

class DataGenerator():

    def __init__(self):
        pass

    def generate_dimension_file(self, file_name, num_items=100):

        with open(file_name, 'w') as dim_file:

            for i in range(0, num_items):
                id = i
                name = "Dimension Item #%d" % id

                dim_file.write("%d,%s\n" %(id, name))

            dim_file.flush()

    def generate_fact_file(self, file_name, num_items=100):

        with open(file_name, 'w') as fact_file:

            for i in range(0, num_items):
                fact_id = i
                name = "Fact Item #%d" % fact_id
                dim_id=random.randint(0,99)

                fact_file.write("%d,%d,%s\n" %(fact_id, dim_id, name))

            fact_file.flush()

class CsvConverter():

    def __init__(self):
        pass

    def read_dimension(self, line):

        items = line.split(',')

        return (int(items[0]), items)

    def read_fact(self, line):

        items = line.split(',')

        return (int(items[1]), items)

if __name__ == "__main__":

    # Generate data
    generator = DataGenerator()
    generator.generate_dimension_file(file_name=DIM_FILE_NAME,
num_items=100)
    generator.generate_fact_file(file_name=FACT_FILE_NAME,
num_items=5000000)


    # Load files into spark
    appName = "spark_error_sample"
    master = "local"
    conf =
SparkConf().setAppName(appName).setMaster(master).set('spark.executor.memory',
'4g')
    sc = SparkContext(conf=conf)

    dimension_lines=sc.textFile(DIM_FILE_NAME, minPartitions=MIN_PARTITIONS)
    fact_lines=sc.textFile(FACT_FILE_NAME, minPartitions=MIN_PARTITIONS)

    converter = CsvConverter()

    dimensions = dimension_lines.map(converter.read_dimension)
    facts = fact_lines.map(converter.read_fact)

    joined_facts = facts.leftOuterJoin(dimensions)

    print("Joined fact count: %d" % joined_facts.count())
    print(joined_facts.first())






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-1-1-1-error-with-large-number-of-records-serializer-dump-stream-func-split-index-iterator-ou-tp20713.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to