Hi Steve,

I could get the application working by setting "spark.hadoop.fs.default.name".
Thank you!!

And thank you for your input on using S3 for checkpoint. I am still working
on PoC so will consider using HDFS for the final implementation.

Thanks
Ankur

On Fri, Jan 6, 2017 at 9:57 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 5 Jan 2017, at 21:10, Ankur Srivastava <ankur.srivast...@gmail.com>
> wrote:
>
> Yes I did try it out and it choses the local file system as my checkpoint
> location starts with s3n://
>
> I am not sure how can I make it load the S3FileSystem.
>
>
> set fs.default.name to s3n://whatever , or, in spark context,
> spark.hadoop.fs.default.name
>
> However
>
> 1. you should really use s3a, if you have the hadoop 2.7 JARs on your
> classpath.
> 2. neither s3n or s3a are real filesystems, and certain assumptions that
> checkpointing code tends to make "renames being O(1) atomic calls" do not
> hold. It may be that checkpointing to s3 isn't as robust as you'd like
>
>
>
>
> On Thu, Jan 5, 2017 at 12:12 PM, Felix Cheung <felixcheun...@hotmail.com>
> wrote:
>
>> Right, I'd agree, it seems to be only with delete.
>>
>> Could you by chance run just the delete to see if it fails
>>
>> FileSystem.get(sc.hadoopConfiguration)
>> .delete(new Path(somepath), true)
>> ------------------------------
>> *From:* Ankur Srivastava <ankur.srivast...@gmail.com>
>> *Sent:* Thursday, January 5, 2017 10:05:03 AM
>> *To:* Felix Cheung
>> *Cc:* user@spark.apache.org
>>
>> *Subject:* Re: Spark GraphFrame ConnectedComponents
>>
>> Yes it works to read the vertices and edges data from S3 location and is
>> also able to write the checkpoint files to S3. It only fails when deleting
>> the data and that is because it tries to use the default file system. I
>> tried looking up how to update the default file system but could not find
>> anything in that regard.
>>
>> Thanks
>> Ankur
>>
>> On Thu, Jan 5, 2017 at 12:55 AM, Felix Cheung <felixcheun...@hotmail.com>
>> wrote:
>>
>>> From the stack it looks to be an error from the explicit call to
>>> hadoop.fs.FileSystem.
>>>
>>> Is the URL scheme for s3n registered?
>>> Does it work when you try to read from s3 from Spark?
>>>
>>> _____________________________
>>> From: Ankur Srivastava <ankur.srivast...@gmail.com>
>>> Sent: Wednesday, January 4, 2017 9:23 PM
>>> Subject: Re: Spark GraphFrame ConnectedComponents
>>> To: Felix Cheung <felixcheun...@hotmail.com>
>>> Cc: <user@spark.apache.org>
>>>
>>>
>>>
>>> This is the exact trace from the driver logs
>>>
>>> Exception in thread "main" java.lang.IllegalArgumentException: Wrong
>>> FS: s3n://<checkpoint-folder>/8ac233e4-10f9-4eb3-aa53-df6d9d7ea7
>>> be/connected-components-c1dbc2b0/3, expected: file:///
>>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalF
>>> ileSystem.java:80)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileSta
>>> tus(RawLocalFileSystem.java:529)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInt
>>> ernal(RawLocalFileSystem.java:747)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
>>> alFileSystem.java:524)
>>> at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileS
>>> ystem.java:534)
>>> at org.graphframes.lib.ConnectedComponents$.org$graphframes$lib
>>> $ConnectedComponents$$run(ConnectedComponents.scala:340)
>>> at org.graphframes.lib.ConnectedComponents.run(ConnectedCompone
>>> nts.scala:139)
>>> at GraphTest.main(GraphTest.java:31) ----------- Application Class
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:57)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
>>> $SparkSubmit$$runMain(SparkSubmit.scala:731)
>>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit
>>> .scala:181)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>> And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10
>>>
>>> Thanks
>>> Ankur
>>>
>>> On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivastava <
>>> ankur.srivast...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I am rerunning the pipeline to generate the exact trace, I have below
>>>> part of trace from last run:
>>>>
>>>> Exception in thread "main" java.lang.IllegalArgumentException: Wrong
>>>> FS: s3n://<folder-path>, expected: file:///
>>>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
>>>> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalF
>>>> ileSystem.java:69)
>>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
>>>> alFileSystem.java:516)
>>>> at 
>>>> org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:528)
>>>>
>>>>
>>>> Also I think the error is happening in this part of the code
>>>> "ConnectedComponents.scala:339" I am referring the code @
>>>> https://github.com/graphframes/graphframes/blob/master/src/
>>>> main/scala/org/graphframes/lib/ConnectedComponents.scala
>>>>
>>>>       if (shouldCheckpoint && (iteration % checkpointInterval == 0)) {
>>>>         // TODO: remove this after DataFrame.checkpoint is implemented
>>>>         val out = s"${checkpointDir.get}/$iteration"
>>>>         ee.write.parquet(out)
>>>>         // may hit S3 eventually consistent issue
>>>>         ee = sqlContext.read.parquet(out)
>>>>
>>>>         // remove previous checkpoint
>>>>         if (iteration > checkpointInterval) {
>>>>           *FileSystem.get(sc.hadoopConfiguration)*
>>>> *            .delete(new Path(s"${checkpointDir.get}/${iteration -
>>>> checkpointInterval}"), true)*
>>>>         }
>>>>
>>>>         System.gc() // hint Spark to clean shuffle directories
>>>>       }
>>>>
>>>>
>>>> Thanks
>>>> Ankur
>>>>
>>>> On Wed, Jan 4, 2017 at 5:19 PM, Felix Cheung <felixcheun...@hotmail.com
>>>> > wrote:
>>>>
>>>>> Do you have more of the exception stack?
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* Ankur Srivastava <ankur.srivast...@gmail.com>
>>>>> *Sent:* Wednesday, January 4, 2017 4:40:02 PM
>>>>> *To:* user@spark.apache.org
>>>>> *Subject:* Spark GraphFrame ConnectedComponents
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to use the ConnectedComponent algorithm of GraphFrames but
>>>>> by default it needs a checkpoint directory. As I am running my spark
>>>>> cluster with S3 as the DFS and do not have access to HDFS file system I
>>>>> tried using a s3 directory as checkpoint directory but I run into below
>>>>> exception:
>>>>>
>>>>> Exception in thread "main"java.lang.IllegalArgumentException: Wrong
>>>>> FS: s3n://<folder-path>, expected: file:///
>>>>>
>>>>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
>>>>>
>>>>> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalF
>>>>> ileSystem.java:69)
>>>>>
>>>>> If I set checkpoint interval to -1 to avoid checkpointing the driver
>>>>> just hangs after 3 or 4 iterations.
>>>>>
>>>>> Is there some way I can set the default FileSystem to S3 for Spark or
>>>>> any other option?
>>>>>
>>>>> Thanks
>>>>> Ankur
>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>
>
>

Reply via email to