Your Spark is trying to load a hadoop library "winutils.exe", which you
don't have in your Windows:

14/12/16 12:48:28 ERROR Shell: Failed to locate the winutils binary in the
hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in
the Hadoop binaries.
        at org.apache.hadoop.util.Shell.
getQualifiedBinPath(Shell.java:318)
...


It's a known bug: https://issues.apache.org/jira/browse/SPARK-2356

That issue references this email thread:
http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7

And that email thread references this blog post:
https://social.msdn.microsoft.com/forums/azure/en-US/28a57efb-082b-424b-8d9e-731b1fe135de/please-read-if-experiencing-job-failures?forum=hdinsight


I had the same problem before, you may temporarily solve it by using the
distribution from the summit:
https://databricks-training.s3.amazonaws.com/getting-started.html
Or you may want to try that other solution.

...but in my case, I ended up running Spark from a Linux machine in a VM
after I got other errors.
I have the impression that development for Windows is not currently a big
priority, since the bug is from version 1.0...

I hope that helps.

Best,


*Sebastián Ramírez*
Diseñador de Algoritmos

 <http://www.senseta.com>
________________
 Tel: (+571) 795 7950 ext: 1012
 Cel: (+57) 300 370 77 10
 Calle 73 No 7 - 06  Piso 4
 Linkedin: co.linkedin.com/in/tiangolo/
 Twitter: @tiangolo <https://twitter.com/tiangolo>
 Email: sebastian.rami...@senseta.com
 www.senseta.com

On Tue, Dec 16, 2014 at 8:04 AM, mj <jone...@gmail.com> wrote:
>
> I've got a simple pyspark program that generates two CSV files and then
> carries out a leftOuterJoin (a fact RDD joined to a dimension RDD). The
> program works fine for smaller volumes of records, but when it goes beyond
> 3
> million records for the fact dataset, I get the error below. I'm running
> PySpark via PyCharm and the information for my environment is:
>
> OS: Windows 7
> Python version: 2.7.9
> Spark version: 1.1.1
> Java version: 1.8
>
> I've also included the py file I am using. I'd appreciate any help you can
> give me,
>
> MJ.
>
>
> ----------------------------ERROR MESSAGE----------------------------
> C:\Python27\python.exe "C:/Users/Mark
> Jones/PycharmProjects/spark_test/spark_error_sample.py"
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 14/12/16 12:48:26 INFO SecurityManager: Changing view acls to: Mark Jones,
> 14/12/16 12:48:26 INFO SecurityManager: Changing modify acls to: Mark
> Jones,
> 14/12/16 12:48:26 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(Mark Jones, );
> users with modify permissions: Set(Mark Jones, )
> 14/12/16 12:48:26 INFO Slf4jLogger: Slf4jLogger started
> 14/12/16 12:48:27 INFO Remoting: Starting remoting
> 14/12/16 12:48:27 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@192.168.19.83:51387]
> 14/12/16 12:48:27 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://sparkDriver@192.168.19.83:51387]
> 14/12/16 12:48:27 INFO Utils: Successfully started service 'sparkDriver' on
> port 51387.
> 14/12/16 12:48:27 INFO SparkEnv: Registering MapOutputTracker
> 14/12/16 12:48:27 INFO SparkEnv: Registering BlockManagerMaster
> 14/12/16 12:48:27 INFO DiskBlockManager: Created local directory at
> C:\Users\MARKJO~1\AppData\Local\Temp\spark-local-20141216124827-11ef
> 14/12/16 12:48:27 INFO Utils: Successfully started service 'Connection
> manager for block manager' on port 51390.
> 14/12/16 12:48:27 INFO ConnectionManager: Bound socket to port 51390 with
> id
> = ConnectionManagerId(192.168.19.83,51390)
> 14/12/16 12:48:27 INFO MemoryStore: MemoryStore started with capacity 265.1
> MB
> 14/12/16 12:48:27 INFO BlockManagerMaster: Trying to register BlockManager
> 14/12/16 12:48:27 INFO BlockManagerMasterActor: Registering block manager
> 192.168.19.83:51390 with 265.1 MB RAM
> 14/12/16 12:48:27 INFO BlockManagerMaster: Registered BlockManager
> 14/12/16 12:48:27 INFO HttpFileServer: HTTP File server directory is
>
> C:\Users\MARKJO~1\AppData\Local\Temp\spark-3b772ca1-dbf7-4eaa-b62c-be5e73036f5d
> 14/12/16 12:48:27 INFO HttpServer: Starting HTTP Server
> 14/12/16 12:48:27 INFO Utils: Successfully started service 'HTTP file
> server' on port 51391.
> 14/12/16 12:48:27 INFO Utils: Successfully started service 'SparkUI' on
> port
> 4040.
> 14/12/16 12:48:27 INFO SparkUI: Started SparkUI at
> http://192.168.19.83:4040
> 14/12/16 12:48:27 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/12/16 12:48:28 ERROR Shell: Failed to locate the winutils binary in the
> hadoop binary path
> java.io.IOException: Could not locate executable null\bin\winutils.exe in
> the Hadoop binaries.
>         at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
>         at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
>         at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326)
>         at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
>         at
> org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
>         at org.apache.hadoop.security.Groups.<init>(Groups.java:77)
>         at
>
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
>         at
> org.apache.spark.deploy.SparkHadoopUtil.<init>(SparkHadoopUtil.scala:36)
>         at
> org.apache.spark.deploy.SparkHadoopUtil$.<init>(SparkHadoopUtil.scala:109)
>         at
> org.apache.spark.deploy.SparkHadoopUtil$.<clinit>(SparkHadoopUtil.scala)
>         at org.apache.spark.SparkContext.<init>(SparkContext.scala:228)
>         at
>
> org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:53)
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
>         at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>         at py4j.Gateway.invoke(Gateway.java:214)
>         at
>
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
>         at
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
>         at py4j.GatewayConnection.run(GatewayConnection.java:207)
>         at java.lang.Thread.run(Thread.java:745)
> 14/12/16 12:48:28 INFO AkkaUtils: Connecting to HeartbeatReceiver:
> akka.tcp://sparkDriver@192.168.19.83:51387/user/HeartbeatReceiver
> 14/12/16 12:48:28 INFO MemoryStore: ensureFreeSpace(159118) called with
> curMem=0, maxMem=278019440
> 14/12/16 12:48:28 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 155.4 KB, free 265.0 MB)
> 14/12/16 12:48:28 INFO MemoryStore: ensureFreeSpace(159118) called with
> curMem=159118, maxMem=278019440
> 14/12/16 12:48:28 INFO MemoryStore: Block broadcast_1 stored as values in
> memory (estimated size 155.4 KB, free 264.8 MB)
> 14/12/16 12:48:28 INFO FileInputFormat: Total input paths to process : 1
> 14/12/16 12:48:28 INFO FileInputFormat: Total input paths to process : 1
> 14/12/16 12:48:28 INFO SparkContext: Starting job: count at C:/Users/Mark
> Jones/PycharmProjects/spark_test/spark_error_sample.py:79
> 14/12/16 12:48:28 INFO DAGScheduler: Registering RDD 8 (RDD at
> PythonRDD.scala:261)
> 14/12/16 12:48:28 INFO DAGScheduler: Got job 0 (count at C:/Users/Mark
> Jones/PycharmProjects/spark_test/spark_error_sample.py:79) with 9 output
> partitions (allowLocal=false)
> 14/12/16 12:48:28 INFO DAGScheduler: Final stage: Stage 0(count at
> C:/Users/Mark Jones/PycharmProjects/spark_test/spark_error_sample.py:79)
> 14/12/16 12:48:28 INFO DAGScheduler: Parents of final stage: List(Stage 1)
> 14/12/16 12:48:28 INFO DAGScheduler: Missing parents: List(Stage 1)
> 14/12/16 12:48:28 INFO DAGScheduler: Submitting Stage 1 (PairwiseRDD[8] at
> RDD at PythonRDD.scala:261), which has no missing parents
> 14/12/16 12:48:28 INFO MemoryStore: ensureFreeSpace(11672) called with
> curMem=318236, maxMem=278019440
> 14/12/16 12:48:28 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 11.4 KB, free 264.8 MB)
> 14/12/16 12:48:28 INFO DAGScheduler: Submitting 9 missing tasks from Stage
> 1
> (PairwiseRDD[8] at RDD at PythonRDD.scala:261)
> 14/12/16 12:48:28 INFO TaskSchedulerImpl: Adding task set 1.0 with 9 tasks
> 14/12/16 12:48:28 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID
> 0, localhost, PROCESS_LOCAL, 1275 bytes)
> 14/12/16 12:48:28 INFO Executor: Running task 0.0 in stage 1.0 (TID 0)
> 14/12/16 12:48:29 INFO HadoopRDD: Input split:
> file:/c:/tmp/fact.csv:0+33554432
> 14/12/16 12:48:29 INFO deprecation: mapred.tip.id is deprecated. Instead,
> use mapreduce.task.id
> 14/12/16 12:48:29 INFO deprecation: mapred.task.id is deprecated. Instead,
> use mapreduce.task.attempt.id
> 14/12/16 12:48:29 INFO deprecation: mapred.task.is.map is deprecated.
> Instead, use mapreduce.task.ismap
> 14/12/16 12:48:29 INFO deprecation: mapred.task.partition is deprecated.
> Instead, use mapreduce.task.partition
> 14/12/16 12:48:29 INFO deprecation: mapred.job.id is deprecated. Instead,
> use mapreduce.job.id
> 14/12/16 12:48:51 INFO PythonRDD: Times: total = 22350, boot = 271, init =
> 59, finish = 22020
> 14/12/16 12:48:52 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID
> 0)
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File "C:\apps\spark\python\pyspark\worker.py", line 79, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "C:\apps\spark\python\pyspark\serializers.py", line 127, in
> dump_stream
>     for obj in iterator:
>   File "C:\apps\spark\python\pyspark\rdd.py", line 1495, in add_shuffle_key
>     for (k, v) in iterator:
>   File "C:\apps\spark\python\pyspark\shuffle.py", line 371, in
> _external_items
>     False)
>   File "C:\apps\spark\python\pyspark\shuffle.py", line 281, in
> mergeCombiners
>     for k, v in iterator:
>   File "C:\apps\spark\python\pyspark\serializers.py", line 133, in
> load_stream
>     yield self._read_with_length(stream)
>   File "C:\apps\spark\python\pyspark\serializers.py", line 146, in
> _read_with_length
>     length = read_int(stream)
>   File "C:\apps\spark\python\pyspark\serializers.py", line 465, in read_int
>     return struct.unpack("!i", length)[0]
> error: unpack requires a string argument of length 4
>
>         at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
>         at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
>         at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 14/12/16 12:48:52 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID
> 1, localhost, PROCESS_LOCAL, 1275 bytes)
> 14/12/16 12:48:52 INFO Executor: Running task 1.0 in stage 1.0 (TID 1)
> 14/12/16 12:48:52 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 0,
> localhost): org.apache.spark.api.python.PythonException: Traceback (most
> recent call last):
>   File "C:\apps\spark\python\pyspark\worker.py", line 79, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "C:\apps\spark\python\pyspark\serializers.py", line 127, in
> dump_stream
>     for obj in iterator:
>   File "C:\apps\spark\python\pyspark\rdd.py", line 1495, in add_shuffle_key
>     for (k, v) in iterator:
>   File "C:\apps\spark\python\pyspark\shuffle.py", line 371, in
> _external_items
>     False)
>   File "C:\apps\spark\python\pyspark\shuffle.py", line 281, in
> mergeCombiners
>     for k, v in iterator:
>   File "C:\apps\spark\python\pyspark\serializers.py", line 133, in
> load_stream
>     yield self._read_with_length(stream)
>   File "C:\apps\spark\python\pyspark\serializers.py", line 146, in
> _read_with_length
>     length = read_int(stream)
>   File "C:\apps\spark\python\pyspark\serializers.py", line 465, in read_int
>     return struct.unpack("!i", length)[0]
> error: unpack requires a string argument of length 4
>
>
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
>
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
>         org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>
> org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> 14/12/16 12:48:52 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times;
> aborting job
> 14/12/16 12:48:52 INFO TaskSchedulerImpl: Cancelling stage 1
> 14/12/16 12:48:52 INFO TaskSchedulerImpl: Stage 1 was cancelled
> 14/12/16 12:48:52 INFO Executor: Executor is trying to kill task 1.0 in
> stage 1.0 (TID 1)
> 14/12/16 12:48:52 INFO DAGScheduler: Failed to run count at C:/Users/Mark
> Jones/PycharmProjects/spark_test/spark_error_sample.py:79
> Traceback (most recent call last):
>   File "C:/Users/Mark
> Jones/PycharmProjects/spark_test/spark_error_sample.py", line 79, in
> <module>
>     print("Joined fact count: %d" % joined_facts.count())
>   File "C:\apps\spark\python\pyspark\rdd.py", line 847, in count
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "C:\apps\spark\python\pyspark\rdd.py", line 838, in sum
>     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>   File "C:\apps\spark\python\pyspark\rdd.py", line 759, in reduce
>     vals = self.mapPartitions(func).collect()
>   File "C:\apps\spark\python\pyspark\rdd.py", line 723, in collect
>     bytesInJava = self._jrdd.collect().iterator()
>   File "C:\apps\spark\python\build\py4j\java_gateway.py", line 538, in
> __call__
>     self.target_id, self.name)
>   File "C:\apps\spark\python\build\py4j\protocol.py", line 300, in
> get_return_value
>     format(target_id, '.', name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o55.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 1.0
> (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback
> (most recent call last):
>   File "C:\apps\spark\python\pyspark\worker.py", line 79, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "C:\apps\spark\python\pyspark\serializers.py", line 127, in
> dump_stream
>     for obj in iterator:
>   File "C:\apps\spark\python\pyspark\rdd.py", line 1495, in add_shuffle_key
>     for (k, v) in iterator:
>   File "C:\apps\spark\python\pyspark\shuffle.py", line 371, in
> _external_items
>     False)
>   File "C:\apps\spark\python\pyspark\shuffle.py", line 281, in
> mergeCombiners
>     for k, v in iterator:
>   File "C:\apps\spark\python\pyspark\serializers.py", line 133, in
> load_stream
>     yield self._read_with_length(stream)
>   File "C:\apps\spark\python\pyspark\serializers.py", line 146, in
> _read_with_length
>     length = read_int(stream)
>   File "C:\apps\spark\python\pyspark\serializers.py", line 465, in read_int
>     return struct.unpack("!i", length)[0]
> error: unpack requires a string argument of length 4
>
>
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
>
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
>         org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>
> org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>         at scala.Option.foreach(Option.scala:236)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Process finished with exit code 1
>
>
>
> ----------------------------PYTHON FILE----------------------------
> import random
> from pyspark import SparkContext, SparkConf, StorageLevel
>
> DIM_FILE_NAME="c:/tmp/dim.csv"
> FACT_FILE_NAME="c:/tmp/fact.csv"
> MIN_PARTITIONS=4
>
> class DataGenerator():
>
>     def __init__(self):
>         pass
>
>     def generate_dimension_file(self, file_name, num_items=100):
>
>         with open(file_name, 'w') as dim_file:
>
>             for i in range(0, num_items):
>                 id = i
>                 name = "Dimension Item #%d" % id
>
>                 dim_file.write("%d,%s\n" %(id, name))
>
>             dim_file.flush()
>
>     def generate_fact_file(self, file_name, num_items=100):
>
>         with open(file_name, 'w') as fact_file:
>
>             for i in range(0, num_items):
>                 fact_id = i
>                 name = "Fact Item #%d" % fact_id
>                 dim_id=random.randint(0,99)
>
>                 fact_file.write("%d,%d,%s\n" %(fact_id, dim_id, name))
>
>             fact_file.flush()
>
> class CsvConverter():
>
>     def __init__(self):
>         pass
>
>     def read_dimension(self, line):
>
>         items = line.split(',')
>
>         return (int(items[0]), items)
>
>     def read_fact(self, line):
>
>         items = line.split(',')
>
>         return (int(items[1]), items)
>
> if __name__ == "__main__":
>
>     # Generate data
>     generator = DataGenerator()
>     generator.generate_dimension_file(file_name=DIM_FILE_NAME,
> num_items=100)
>     generator.generate_fact_file(file_name=FACT_FILE_NAME,
> num_items=5000000)
>
>
>     # Load files into spark
>     appName = "spark_error_sample"
>     master = "local"
>     conf =
>
> SparkConf().setAppName(appName).setMaster(master).set('spark.executor.memory',
> '4g')
>     sc = SparkContext(conf=conf)
>
>     dimension_lines=sc.textFile(DIM_FILE_NAME,
> minPartitions=MIN_PARTITIONS)
>     fact_lines=sc.textFile(FACT_FILE_NAME, minPartitions=MIN_PARTITIONS)
>
>     converter = CsvConverter()
>
>     dimensions = dimension_lines.map(converter.read_dimension)
>     facts = fact_lines.map(converter.read_fact)
>
>     joined_facts = facts.leftOuterJoin(dimensions)
>
>     print("Joined fact count: %d" % joined_facts.count())
>     print(joined_facts.first())
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-1-1-1-error-with-large-number-of-records-serializer-dump-stream-func-split-index-iterator-ou-tp20713.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

-- 
*----------------------------------------------------*
*This e-mail transmission, including any attachments, is intended only for 
the named recipient(s) and may contain information that is privileged, 
confidential and/or exempt from disclosure under applicable law. If you 
have received this transmission in error, or are not the named 
recipient(s), please notify Senseta immediately by return e-mail and 
permanently delete this transmission, including any attachments.*

Reply via email to