Hello
 my 2 cents here, hope it helps
If you want to just to play around with Spark, i'd leave Hadoop out, it's
an unnecessary dependency that you dont need for just running a python
script
Instead do the following:
- got to the root of our master / slave node. create a directory
/root/pyscripts
- place your csv file there as well as the python script
- run the script to replicate the whole directory  across the cluster (i
believe it's called copy-script.sh)
- then run your spark-submit , it will be something lke
    ./spark-submit /root/pyscripts/mysparkscripts.py
file:///root/pyscripts/tree_addhealth.csv 10 --master
spark://ip-172-31-44-155.us-west-2.compute.internal:7077
- in your python script, as part of your processing, write the parquet file
in directory /root/pyscripts

If you have an AWS account and you are versatile with that - you need to
setup bucket permissions etc - , you can just
- store your file in one of your S3 bucket
- create an EMR cluster
- connect to master or slave
- run your  scritp that reads from the s3 bucket and write to the same s3
bucket


Feel free to mail me privately, i have a working script i have used to test
some code on spark standalone cluster
hth










On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Steve,
>
> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>
> I am now going through the documentation (https://github.com/
> steveloughran/hadoop/blob/s3guard/HADOOP-13786-
> committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/
> hadoop-aws/s3a_committer_architecture.md) and it makes much much more
> sense now.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <ste...@hortonworks.com>
> wrote:
>
>>
>> On 2 Aug 2017, at 20:05, Gourav Sengupta <gourav.sengu...@gmail.com>
>> wrote:
>>
>> Hi Steve,
>>
>> I have written a sincere note of apology to everyone in a separate email.
>> I sincerely request your kind forgiveness before hand if anything does
>> sound impolite in my emails, in advance.
>>
>> Let me first start by thanking you.
>>
>> I know it looks like I formed all my opinion based on that document, but
>> that is not the case at all. If you or anyone tries to execute the code
>> that I have given then they will see what I mean. Code speaks louder and
>> better than words for me.
>>
>> So I am not saying you are wrong. I am asking verify and expecting
>> someone will be able to correct  a set of understanding that a moron like
>> me has gained after long hours of not having anything better to do.
>>
>>
>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
>> replication 2 and there is a HADOOP cluster of three nodes. All these nodes
>> have SPARK workers (executors) running in them.  Both are stored in the
>> following way:
>> -----------------------------------------------------
>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>> | (worker1)   |  (worker2)    |  (worker3)   |
>> | (master)     |                     |                    |
>> -----------------------------------------------------
>> | file1.csv      |                     | file1.csv     |
>> -----------------------------------------------------
>> |                    |  file2.csv      | file2.csv     |
>> -----------------------------------------------------
>> | file3.csv      |  file3.csv      |                   |
>> -----------------------------------------------------
>>
>>
>>
>>
>>
>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>> HDFS replication does not store the same file in all the nodes in the
>> cluster. So if I have three nodes and the replication is two then the same
>> file will be stored physically in two nodes in the cluster. Does that sound
>> right?
>>
>>
>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is >
>> 128 then it will be broken up into blocks
>>
>> file1.cvs -> [block0001, block002, block0003]
>>
>> and each block will be replicated. With replication = 2 there will be two
>> copies of each block, but the file itself can span > 2 hosts.
>>
>>
>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>> If SPARK is trying to process to the records then I am expecting that
>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
>> not be processing file2.csv and WORKER3 should not be processing file3.csv.
>> Because in case WORKER2 was trying to process file1.csv then it will
>> actually causing network transmission of the file unnecessarily.
>>
>>
>> Spark prefers to schedule work locally, so as to save on network traffic,
>> but it schedules for execution time over waiting for workers free on the
>> node with the data. IF a block is on nodes 2 and 3 but there is only a free
>> thread on node 1, then node 1 gets the work
>>
>> There's details on whether/how work across blocks takes place which I'm
>> avoiding. For now know those formats which are "splittable" will have work
>> scheduled by block. If you use Parquet/ORC/avro for your data and compress
>> with snappy, it will be split. This gives you maximum performance as >1
>> thread can work on different blocks. That is, if file1 is split into three
>> blocks, three worker threads can process it.
>>
>>
>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
>> THIS):
>> if WORKER 2 is not processing file1.csv then how does it matter whether
>> the file is there or not at all in the system? Should not SPARK just ask
>> the workers to process the files which are avialable in the worker nodes?
>> In case both WORKER2 and WORKER3 fails and are not available then file2.csv
>> will not be processed at all.
>>
>>
>> locality is best-effort, not guaranteed.
>>
>>
>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
>> EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
>> take more than 13 mins to set up the cluster and run the code).
>>
>> Once you execute the code then you will find that:
>> 1.  if the path starts with file:/// while reading back then there is no
>> error reported, but the number of records reported back are only those
>> records in the worker which also has the server.
>> 2. also you will notice that once you cache the file before writing the
>> partitions are ditributed nicely across the workers, and while writing
>> back, the dataframe partitions does write properly to the worker node in
>> the Master, but the workers in the other system have the files written in
>> _temporary folder which does not get copied back to the main folder.
>> Inspite of this the job is not reported as failed in SPARK.
>>
>>
>> This gets into the "commit protocol". You don't want to know all the
>> dirty details (*) but essentially its this
>>
>> 1. Every worker writes its output to a directory under the destination
>> directory, something like '$dest/_temporary/$appAtt
>> emptId/_temporary/$taskAttemptID'
>> 2. it is the spark driver which "commits" the job by moving the output
>> from the individual workers from the temporary directories into $dest, then
>> deleting $dest/_temporary
>> 3. For which it needs to be able to list all the output in
>> $dest/_temporary
>>
>> In your case, only the output on the same node of the driver is being
>> committed, because only those files can be listed and moved. The output on
>> the other nodes isn't seen, so isn't committed, nor cleaned up.
>>
>>
>>
>> Now in my own world, if I see, the following things are happening,
>> something is going wrong (with me):
>> 1. SPARK transfers files from different systems to process, instead of
>> processing them locally (I do not have code to prove this, and therefore
>> its just an assumption)
>> 2. SPARK cannot determine when the writes are failing in standalone
>> clusters workers and reports success (code is there for this)
>> 3. SPARK reports back number of records in the worker running in the
>> master node when count() is given without reporting an error while using
>> file:/// and reports an error when I mention the path without file:///
>> (for SPARK 2.1.x onwards, code is there for this)
>>
>>
>>
>> s everyone's been saying, file:// requires a shared filestore, with
>> uniform paths everywhere. That's needed to list the files to process, read
>> the files in the workers and commit the final output. NFS cross-mounting is
>> the simplest way to do this, especially as for three nodes HDFS is
>> overkill: more services to keep running, no real fault tolerance. Export a
>> directory tree from one of the servers, give the rest access to it, don't
>> worry about bandwidth use as the shared disk itself will become the
>> bottleneck
>>
>>
>>
>> I very sincerely hope with your genuine help the bar of language and
>> social skills will be lowered for me. And everyone will find a way to
>> excuse me and not qualify this email as a means to measure my extremely
>> versatile and amazingly vivid social skills. It will be a lot of help to
>> just focus on the facts related to machines, data, error and (the language
>> that I somehow understand better) code.
>>
>>
>> My sincere apologies once again, as I am 100% sure that I did not meet
>> the required social and language skills.
>>
>> Thanks a ton once again for your kindness, patience and understanding.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>> * for the curious, the details of the v1 and v2 commit protocols are
>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/
>> tools/hadoop-aws/s3a_committer_architecture.md
>>
>> Like I said: you don't want to know the details, and you really don't
>> want to step through Hadoop's FileOutputCommitter to see what's going on.
>> The Spark side is much easier to follow.
>>
>>
>

Reply via email to