Hi,

I am working by creating a native SPARK standalone cluster (
https://spark.apache.org/docs/2.2.0/spark-standalone.html)

Therefore I  do not have a HDFS.


EXERCISE:
Its the most fundamental and simple exercise. Create a sample SPARK
dataframe and then write it to a location and then read it back.

SETTINGS:
So after I have installed SPARK in two physical systems with the same:
1. SPARK version,
2. JAVA version,
3. PYTHON_PATH
4. SPARK_HOME
5. PYSPARK_PYTHON
the user in both the systems is the root user therefore there are no
permission issues anywhere.

I am able to start:
1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
computers)

After that I can see in the spark UI (at port 8080) two workers.


CODE:
Then I run the following code:

======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master("spark://mastersystem.local:7077")
        .appName("gouravtest")
        .enableHiveSupport()
        .getOrCreate())
import pandas, numpy
testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
4), columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test2").count()
======================================================


ERROR I (in above code):
ERROR in line:
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
This line does not fail or report any error. But when I am looking at the
stage in spark Application UI the error reported for one of the slave node
which is not in the same system as the master node is mentioned below. The
writing on the slave node which is in the same physical system as the
Master happens correctly. (NOTE: slave node basically the worker and master
node the driver)
----------------------------------------------------------------------------------------------------------------------------------

0 (TID 41). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000006_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000006_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID
64). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000028_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000028_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000021_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000021_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID
45). 2103 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID
37). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID
39). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000018_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000018_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000029_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000029_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000027_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000027_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID
54). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000010_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010
17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID
52). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000010_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000030_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030
17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID
55). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000030_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID
53). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID
61). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000016_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000016_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID
59). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID
51). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000024_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000024_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000023_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000023_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID
62). 2103 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID
43). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID
49). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID
60). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID
63). 2103 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID
56). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID
57). 2060 bytes result sent to driver
17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65
17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)
17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3
17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as
bytes in memory (estimated size 24.9 KB, free 365.9 MB)
17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms
17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values
in memory (estimated size 70.3 KB, free 365.9 MB)
17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)
java.io.FileNotFoundException: File
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
does not exist
        at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
        at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
        at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
        at 
org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
        at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
        at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
        at 
scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
        at 
scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
        at 
scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at 
scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
        at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
        at 
scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
        at 
scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
        at 
scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
        at 
scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at 
scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66


----------------------------------------------------------------------------------------------------------------------------------


ERROR II  (in above code):
While trying to read the file there is now a distinct error thrown which
mentions the same saying that the files do not exist.

Also why is SPARK trying to search for the same files in both the systems?
If the same path in two systems have different files should SPARK not
combine and work on them?



NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
I started spark using the same method but now using SPARK 1.5 and this does
not give any error:
======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark

sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
sqlContext = pyspark.SQLContext(sc)
import pandas, numpy
testdf
= sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 4),
columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test3")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test3").count()
======================================================

I will be sincerely obliged if someone could kindly help me out with this
issue and point out my mistakes/ assumptions.




Regards,
Gourav Sengupta

Reply via email to