On 2 Aug 2017, at 14:25, Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:

Hi,

I am definitely sure that at this point of time everyone who has kindly cared 
to respond to my query do need to go and check this link 
https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode.

I see. Well, we shall have to edit that document to make clear something which 
had been omitted:

in order for multiple spark workers to process data, they must have a shared 
store for that data, one with read/write access for all workers. This is must 
be provided by a shared filesystem: HDFS, network-mounted NFS, Glusterfs, 
through an object store (S3, Azure WASB, ...), or through alternative 
datastores implementing the Hadoop Filesystem API (example: Apache Cassandra).

n your case, for a small cluster of 1-3 machines, especially if you are just 
learning to play with spark, I'd start with an NFS mounted disk accessible on 
the same path on all machines. If you aren't willing to set that up, stick to 
spark standalone on a single machine first. You don't need a shared cluster to 
use spark standalone.

Personally, I'd recommend downloading apache zeppelin and running it locally as 
the simplest out-the-box experience.


It does mention that SPARK standalone cluster can have multiple machines 
running as slaves.


Clearly it omits the small detail about the requirement for a shared store.

The general idea of writing to the user group is that people who know should 
answer, and not those who do not know.

Agreed, but if the answer doesn't appear to be correct to you, do consider that 
there may be some detail that hasn't been mentioned, rather than immediately 
concluding that the person replying is wrong.

-Steve





Regards,
Gourav Sengupta

On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker 
<mahesh_sawai...@persistent.com<mailto:mahesh_sawai...@persistent.com>> wrote:
Gourav,
Riccardo’s answer is spot on.
What is happening is one node of spark is writing to its own directory and 
telling a slave to read the data from there, when the slave goes to read it, 
the part is not found.

Check the folder 
Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
 on the slave.
The reason it ran on spark 1.5 may have been because the executor ran on the 
driver itself. There is not much use to a set up where you don’t have some kind 
of distributed file system, so I would encourage you to use hdfs, or a mounted 
file system shared by all nodes.

Regards,
Mahesh


From: Gourav Sengupta 
[mailto:gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>]
Sent: Monday, July 31, 2017 9:54 PM
To: Riccardo Ferrari
Cc: user
Subject: Re: SPARK Issue in Standalone cluster

Hi Riccardo,

I am grateful for your kind response.

Also I am sure that your answer is completely wrong and errorneous. SPARK must 
be having a method so that different executors do not pick up the same files to 
process. You also did not answer the question why was the processing successful 
in SPARK 1.5 and not in SPARK 2.2.

Also the exact same directory is is present across in both the nodes.

I feel quite facinated when individuals respond before even understanding the 
issue, or trying out the code.

It will be of great help if someone could kindly read my email and help me 
figure out the issue.


Regards,
Gourav Sengupta



On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari 
<ferra...@gmail.com<mailto:ferra...@gmail.com>> wrote:
Hi Gourav,

The issue here is the location where you're trying to write/read from 
:/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available to 
all executors (and driver), and that is reason why you generally use HDFS, S3, 
NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not tries 
to pick up the data from a selected node, it rather tries to write/read in 
parallel from the executor nodes. Also given its control logic there is no way 
(read. you should not care) to know what executor is doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:
Hi,

I am working by creating a native SPARK standalone cluster 
(https://spark.apache.org/docs/2.2.0/spark-standalone.html)

Therefore I  do not have a HDFS.


EXERCISE:
Its the most fundamental and simple exercise. Create a sample SPARK dataframe 
and then write it to a location and then read it back.

SETTINGS:
So after I have installed SPARK in two physical systems with the same:
1. SPARK version,
2. JAVA version,
3. PYTHON_PATH
4. SPARK_HOME
5. PYSPARK_PYTHON
the user in both the systems is the root user therefore there are no permission 
issues anywhere.

I am able to start:
1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate computers)

After that I can see in the spark UI (at port 8080) two workers.


CODE:
Then I run the following code:

======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master("spark://mastersystem.local:7077")
        .appName("gouravtest")
        .enableHiveSupport()
        .getOrCreate())
import pandas, numpy
testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 4), 
columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test2").count()
======================================================


ERROR I (in above code):
ERROR in line: 
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
This line does not fail or report any error. But when I am looking at the stage 
in spark Application UI the error reported for one of the slave node which is 
not in the same system as the master node is mentioned below. The writing on 
the slave node which is in the same physical system as the Master happens 
correctly. (NOTE: slave node basically the worker and master node the driver)
----------------------------------------------------------------------------------------------------------------------------------

0 (TID 41). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000006_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000006_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000028_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000028_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000021_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000021_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 2103 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000018_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000018_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000029_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000029_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000027_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000027_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000010_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010

17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000010_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000030_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030

17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000030_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000016_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000016_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000024_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000024_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_000023_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_000023_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 2103 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 2103 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 2060 
bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 2060 
bytes result sent to driver

17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65

17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)

17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3

17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in 
memory (estimated size 24.9 KB, free 365.9 MB)

17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms

17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in 
memory (estimated size 70.3 KB, free 365.9 MB)

17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)

java.io.FileNotFoundException: File 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
 does not exist

          at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)

          at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)

          at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)

          at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)

          at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)

          at 
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)

          at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)

          at 
org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)

          at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)

          at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)

          at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)

          at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)

          at 
scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)

          at 
scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)

          at 
scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)

          at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)

          at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)

          at 
scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)

          at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)

          at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)

          at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)

          at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

          at 
scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)

          at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)

          at 
scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)

          at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)

          at 
scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)

          at 
scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)

          at 
scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)

          at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)

          at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)

          at 
scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)

          at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)

          at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)

          at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)

          at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

          at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

          at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

          at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66

----------------------------------------------------------------------------------------------------------------------------------


ERROR II  (in above code):
While trying to read the file there is now a distinct error thrown which 
mentions the same saying that the files do not exist.

Also why is SPARK trying to search for the same files in both the systems? If 
the same path in two systems have different files should SPARK not combine and 
work on them?



NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
I started spark using the same method but now using SPARK 1.5 and this does not 
give any error:
======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark

sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
sqlContext = pyspark.SQLContext(sc)
import pandas, numpy
testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 
4), columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test3")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test3").count()
======================================================

I will be sincerely obliged if someone could kindly help me out with this issue 
and point out my mistakes/ assumptions.




Regards,
Gourav Sengupta


DISCLAIMER
==========
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.


Reply via email to