Hi Marco,

thanks a ton, I will surely use those alternatives.


Regards,
Gourav Sengupta

On Sun, Aug 6, 2017 at 3:45 PM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Sengupta
>  further to this, if you try the following notebook in databricks cloud,
> it will read a .csv file , write to a parquet file and read it again (just
> to count the number of rows stored)
> Please note that the path to the csv file might differ for you.....
> So, what you will need todo is
> 1 - create an account to community.cloud.databricks.com
> 2 - upload the .csv file onto the Data of your databricks private cluster
> 3  - run the script. that will store the data on the distrubuted
> filesystem of the databricks cloudn (dbfs)
>
> It's worth investing in this free databricks cloud as it can create a
> cluster for you with minimal effort, and it's  a very easy way to test your
> spark scripts on a real cluster
>
> hope this helps
> kr
>
> ##################################
> from pyspark.sql import SQLContext
>
> from random import randint
> from time import sleep
> from pyspark.sql.session import SparkSession
> import logging
> logger = logging.getLogger(__name__)
> logger.setLevel(logging.INFO)
> ch = logging.StreamHandler()
> logger.addHandler(ch)
>
>
> import sys
>
> def read_parquet_file(parquetFileName):
>   logger.info('Reading now the parquet files we just created...:%s',
> parquetFileName)
>   parquet_data = sqlContext.read.parquet(parquetFileName)
>   logger.info('Parquet file has %s', parquet_data.count())
>
> def dataprocessing(filePath, count, sqlContext):
>     logger.info( 'Iter count is:%s' , count)
>     if count == 0:
>         print 'exiting'
>     else:
>         df_traffic_tmp = sqlContext.read.format("csv").
> option("header",'true').load(filePath)
>         logger.info( '#############################DataSet has:%s' ,
> df_traffic_tmp.count())
>         logger.info('WRting to a parquet file')
>         parquetFileName = "dbfs:/myParquetDf2.parquet"
>         df_traffic_tmp.write.parquet(parquetFileName)
>         sleepInterval = randint(10,100)
>         logger.info( '#############################Sleeping for %s' ,
> sleepInterval)
>         sleep(sleepInterval)
>         read_parquet_file(parquetFileName)
>         dataprocessing(filePath, count-1, sqlContext)
>
> filename = '/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This
> path might differ for you
> iterations = 1
> logger.info('----------------------')
> logger.info('Filename:%s', filename)
> logger.info('Iterations:%s', iterations )
> logger.info('----------------------')
>
> logger.info ('Initializing sqlContext')
> logger.info( '........Starting spark..........Loading from%s for %s
> iterations' , filename, iterations)
> logger.info(  'Starting up....')
> sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
> logger.info ('Initializing sqlContext')
> sqlContext = SQLContext(sc)
> dataprocessing(filename, iterations, sqlContext)
> logger.info('Out of here..')
> ######################################
>
>
> On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> Uh believe me there are lots of ppl on this list who will send u code
>> snippets if u ask... 😀
>>
>> Yes that is what Steve pointed out, suggesting also that for that simple
>> exercise you should perform all operations on a spark standalone instead
>> (or alt. Use an nfs on the cluster)
>> I'd agree with his suggestion....
>> I suggest u another alternative:
>> https://community.cloud.databricks.com/
>>
>> That's a ready made cluster and you can run your spark app as well store
>> data on the cluster (well I haven't tried myself but I assume it's
>> possible).   Try that out... I will try ur script there as I have an
>> account there (though I guess I'll get there before me.....)
>>
>> Try that out and let me know if u get stuck....
>> Kr
>>
>> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengu...@gmail.com>
>> wrote:
>>
>>> Hi Marco,
>>>
>>> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
>>> someone actually executing code and providing response. It feel wonderful
>>> that at least someone considered to respond back by executing code and just
>>> did not filter out each and every technical details to brood only on my
>>> superb social skills, while claiming the reason for ignoring technical
>>> details is that it elementary. I think that Steve also is the first person
>>> who could answer the WHY of an elementary question instead of saying that
>>> is how it is and pointed out to the correct documentation.
>>>
>>> That code works fantastically. But the problem which I have tried to
>>> find out is while writing out the data and not reading it.
>>>
>>>
>>> So if you see try to read the data from the same folder which has the
>>> same file across all the nodes then it will work fine. In fact that is what
>>> should work.
>>>
>>> What does not work is that if you try to write back the file and then
>>> read it once again from the location you have written that is when the
>>> issue starts happening.
>>>
>>> Therefore if in my code you were to save the pandas dataframe as a CSV
>>> file and then read it then you will find the following observations:
>>>
>>> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ---------------------------
>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>>> columns=list('ABCD'))
>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>>> header=True, sep=",", index=0)
>>> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/spa
>>> rkdata/testdir/")
>>> testdf.cache()
>>> testdf.count()
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ---------------------------
>>>
>>>
>>> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN
>>> WHICH THE DATA DOES NOT EXISTS
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ---------------------------
>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>>> columns=list('ABCD'))
>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>>> header=True, sep=",", index=0)
>>> testdf = spark.read.load("file:///Users/gouravsengupta/Development/sp
>>> ark/sparkdata/testdir/")
>>> testdf.cache()
>>> testdf.count()
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ---------------------------
>>>
>>>
>>> if you execute my code then also you will surprisingly see that the
>>> writes in the nodes which is not the master node does not complete moving
>>> the files from the _temporary folder to the main one.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>>
>>> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistr...@gmail.com>
>>> wrote:
>>>
>>>> Hello
>>>>  please have a look at this. it'sa simple script that just read a
>>>> dataframe for n time, sleeping at random interval. i used it to test memory
>>>> issues that another user was experiencing on a spark cluster
>>>>
>>>> you should run it like this e.g
>>>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv>
>>>> <num of iterations>
>>>>
>>>> i ran it on the cluster like this
>>>>
>>>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>>>> west-2.compute.amazonaws.com:7077   
>>>> /root/pyscripts/dataprocessing_Sample-2.py
>>>> file:///root/pyscripts/tree_addhealth.csv
>>>>
>>>> hth, ping me back if you have issues
>>>> i do agree with Steve's comments.... if you want to test your  spark
>>>> script s just for playing, do it on  a standaone server on your localhost.
>>>> Moving to a c luster is just a matter of deploying your script and mke sure
>>>> you have a common place where to read and store the data..... SysAdmin
>>>> should give you this when they setup the cluster...
>>>>
>>>> kr
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>> I am sincerely obliged for your kind time and response. Can you please
>>>>> try the solution that you have so kindly suggested?
>>>>>
>>>>> It will be a lot of help if you could kindly execute the code that I
>>>>> have given. I dont think that anyone has yet.
>>>>>
>>>>> There are lots of fine responses to my question here, but if you read
>>>>> the last response from Simon, it comes the closest to being satisfactory. 
>>>>> I
>>>>> am sure even he did not execute the code, but at least he came quite close
>>>>> to understanding what the problem is.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>>
>>>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mmistr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello
>>>>>>  my 2 cents here, hope it helps
>>>>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>>>>> it's an unnecessary dependency that you dont need for just running a 
>>>>>> python
>>>>>> script
>>>>>> Instead do the following:
>>>>>> - got to the root of our master / slave node. create a directory
>>>>>> /root/pyscripts
>>>>>> - place your csv file there as well as the python script
>>>>>> - run the script to replicate the whole directory  across the cluster
>>>>>> (i believe it's called copy-script.sh)
>>>>>> - then run your spark-submit , it will be something lke
>>>>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>>>>> - in your python script, as part of your processing, write the
>>>>>> parquet file in directory /root/pyscripts
>>>>>>
>>>>>> If you have an AWS account and you are versatile with that - you need
>>>>>> to setup bucket permissions etc - , you can just
>>>>>> - store your file in one of your S3 bucket
>>>>>> - create an EMR cluster
>>>>>> - connect to master or slave
>>>>>> - run your  scritp that reads from the s3 bucket and write to the
>>>>>> same s3 bucket
>>>>>>
>>>>>>
>>>>>> Feel free to mail me privately, i have a working script i have used
>>>>>> to test some code on spark standalone cluster
>>>>>> hth
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>>>>> gourav.sengu...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Steve,
>>>>>>>
>>>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>>>>
>>>>>>> I am now going through the documentation (
>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP
>>>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t
>>>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much
>>>>>>> much more sense now.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Gourav Sengupta
>>>>>>>
>>>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <
>>>>>>> ste...@hortonworks.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <gourav.sengu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Steve,
>>>>>>>>
>>>>>>>> I have written a sincere note of apology to everyone in a separate
>>>>>>>> email. I sincerely request your kind forgiveness before hand if 
>>>>>>>> anything
>>>>>>>> does sound impolite in my emails, in advance.
>>>>>>>>
>>>>>>>> Let me first start by thanking you.
>>>>>>>>
>>>>>>>> I know it looks like I formed all my opinion based on that
>>>>>>>> document, but that is not the case at all. If you or anyone tries to
>>>>>>>> execute the code that I have given then they will see what I mean. Code
>>>>>>>> speaks louder and better than words for me.
>>>>>>>>
>>>>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>>>>> someone will be able to correct  a set of understanding that a moron 
>>>>>>>> like
>>>>>>>> me has gained after long hours of not having anything better to do.
>>>>>>>>
>>>>>>>>
>>>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in
>>>>>>>> HDFS with replication 2 and there is a HADOOP cluster of three nodes. 
>>>>>>>> All
>>>>>>>> these nodes have SPARK workers (executors) running in them.  Both are
>>>>>>>> stored in the following way:
>>>>>>>> -----------------------------------------------------
>>>>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>>>>> | (master)     |                     |                    |
>>>>>>>> -----------------------------------------------------
>>>>>>>> | file1.csv      |                     | file1.csv     |
>>>>>>>> -----------------------------------------------------
>>>>>>>> |                    |  file2.csv      | file2.csv     |
>>>>>>>> -----------------------------------------------------
>>>>>>>> | file3.csv      |  file3.csv      |                   |
>>>>>>>> -----------------------------------------------------
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>>>>> HDFS replication does not store the same file in all the nodes in
>>>>>>>> the cluster. So if I have three nodes and the replication is two then 
>>>>>>>> the
>>>>>>>> same file will be stored physically in two nodes in the cluster. Does 
>>>>>>>> that
>>>>>>>> sound right?
>>>>>>>>
>>>>>>>>
>>>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file
>>>>>>>> is > 128 then it will be broken up into blocks
>>>>>>>>
>>>>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>>>>
>>>>>>>> and each block will be replicated. With replication = 2 there will
>>>>>>>> be two copies of each block, but the file itself can span > 2 hosts.
>>>>>>>>
>>>>>>>>
>>>>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>>>>> If SPARK is trying to process to the records then I am expecting
>>>>>>>> that WORKER2 should not be processing file1.csv, and similary WORKER 1
>>>>>>>> should not be processing file2.csv and WORKER3 should not be processing
>>>>>>>> file3.csv. Because in case WORKER2 was trying to process file1.csv 
>>>>>>>> then it
>>>>>>>> will actually causing network transmission of the file unnecessarily.
>>>>>>>>
>>>>>>>>
>>>>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>>>>> traffic, but it schedules for execution time over waiting for workers 
>>>>>>>> free
>>>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is 
>>>>>>>> only
>>>>>>>> a free thread on node 1, then node 1 gets the work
>>>>>>>>
>>>>>>>> There's details on whether/how work across blocks takes place which
>>>>>>>> I'm avoiding. For now know those formats which are "splittable" will 
>>>>>>>> have
>>>>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and
>>>>>>>> compress with snappy, it will be split. This gives you maximum 
>>>>>>>> performance
>>>>>>>> as >1 thread can work on different blocks. That is, if file1 is split 
>>>>>>>> into
>>>>>>>> three blocks, three worker threads can process it.
>>>>>>>>
>>>>>>>>
>>>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE
>>>>>>>> CLARIFY THIS):
>>>>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>>>>> whether the file is there or not at all in the system? Should not SPARK
>>>>>>>> just ask the workers to process the files which are avialable in the 
>>>>>>>> worker
>>>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available 
>>>>>>>> then
>>>>>>>> file2.csv will not be processed at all.
>>>>>>>>
>>>>>>>>
>>>>>>>> locality is best-effort, not guaranteed.
>>>>>>>>
>>>>>>>>
>>>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD
>>>>>>>> BE EXECUTED (Its been pointed out that I am learning SPARK, and even I 
>>>>>>>> did
>>>>>>>> not take more than 13 mins to set up the cluster and run the code).
>>>>>>>>
>>>>>>>> Once you execute the code then you will find that:
>>>>>>>> 1.  if the path starts with file:/// while reading back then there
>>>>>>>> is no error reported, but the number of records reported back are only
>>>>>>>> those records in the worker which also has the server.
>>>>>>>> 2. also you will notice that once you cache the file before writing
>>>>>>>> the partitions are ditributed nicely across the workers, and while 
>>>>>>>> writing
>>>>>>>> back, the dataframe partitions does write properly to the worker node 
>>>>>>>> in
>>>>>>>> the Master, but the workers in the other system have the files written 
>>>>>>>> in
>>>>>>>> _temporary folder which does not get copied back to the main folder.
>>>>>>>> Inspite of this the job is not reported as failed in SPARK.
>>>>>>>>
>>>>>>>>
>>>>>>>> This gets into the "commit protocol". You don't want to know all
>>>>>>>> the dirty details (*) but essentially its this
>>>>>>>>
>>>>>>>> 1. Every worker writes its output to a directory under the
>>>>>>>> destination directory, something like '$dest/_temporary/$appAtt
>>>>>>>> emptId/_temporary/$taskAttemptID'
>>>>>>>> 2. it is the spark driver which "commits" the job by moving the
>>>>>>>> output from the individual workers from the temporary directories into
>>>>>>>> $dest, then deleting $dest/_temporary
>>>>>>>> 3. For which it needs to be able to list all the output in
>>>>>>>> $dest/_temporary
>>>>>>>>
>>>>>>>> In your case, only the output on the same node of the driver is
>>>>>>>> being committed, because only those files can be listed and moved. The
>>>>>>>> output on the other nodes isn't seen, so isn't committed, nor cleaned 
>>>>>>>> up.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Now in my own world, if I see, the following things are happening,
>>>>>>>> something is going wrong (with me):
>>>>>>>> 1. SPARK transfers files from different systems to process, instead
>>>>>>>> of processing them locally (I do not have code to prove this, and 
>>>>>>>> therefore
>>>>>>>> its just an assumption)
>>>>>>>> 2. SPARK cannot determine when the writes are failing in standalone
>>>>>>>> clusters workers and reports success (code is there for this)
>>>>>>>> 3. SPARK reports back number of records in the worker running in
>>>>>>>> the master node when count() is given without reporting an error while
>>>>>>>> using file:/// and reports an error when I mention the path
>>>>>>>> without file:/// (for SPARK 2.1.x onwards, code is there for this)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> s everyone's been saying, file:// requires a shared filestore, with
>>>>>>>> uniform paths everywhere. That's needed to list the files to process, 
>>>>>>>> read
>>>>>>>> the files in the workers and commit the final output. NFS 
>>>>>>>> cross-mounting is
>>>>>>>> the simplest way to do this, especially as for three nodes HDFS is
>>>>>>>> overkill: more services to keep running, no real fault tolerance. 
>>>>>>>> Export a
>>>>>>>> directory tree from one of the servers, give the rest access to it, 
>>>>>>>> don't
>>>>>>>> worry about bandwidth use as the shared disk itself will become the
>>>>>>>> bottleneck
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I very sincerely hope with your genuine help the bar of language
>>>>>>>> and social skills will be lowered for me. And everyone will find a way 
>>>>>>>> to
>>>>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>>>>> versatile and amazingly vivid social skills. It will be a lot of help 
>>>>>>>> to
>>>>>>>> just focus on the facts related to machines, data, error and (the 
>>>>>>>> language
>>>>>>>> that I somehow understand better) code.
>>>>>>>>
>>>>>>>>
>>>>>>>> My sincere apologies once again, as I am 100% sure that I did not
>>>>>>>> meet the required social and language skills.
>>>>>>>>
>>>>>>>> Thanks a ton once again for your kindness, patience and
>>>>>>>> understanding.
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Gourav Sengupta
>>>>>>>>
>>>>>>>>
>>>>>>>> * for the curious, the details of the v1 and v2 commit protocols are
>>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>>>>
>>>>>>>> Like I said: you don't want to know the details, and you really
>>>>>>>> don't want to step through Hadoop's FileOutputCommitter to see what's 
>>>>>>>> going
>>>>>>>> on. The Spark side is much easier to follow.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to