Yes it works to read the vertices and edges data from S3 location and is
also able to write the checkpoint files to S3. It only fails when deleting
the data and that is because it tries to use the default file system. I
tried looking up how to update the default file system but could not find
anything in that regard.

Thanks
Ankur

On Thu, Jan 5, 2017 at 12:55 AM, Felix Cheung <felixcheun...@hotmail.com>
wrote:

> From the stack it looks to be an error from the explicit call to
> hadoop.fs.FileSystem.
>
> Is the URL scheme for s3n registered?
> Does it work when you try to read from s3 from Spark?
>
> _____________________________
> From: Ankur Srivastava <ankur.srivast...@gmail.com>
> Sent: Wednesday, January 4, 2017 9:23 PM
> Subject: Re: Spark GraphFrame ConnectedComponents
> To: Felix Cheung <felixcheun...@hotmail.com>
> Cc: <user@spark.apache.org>
>
>
>
> This is the exact trace from the driver logs
>
> Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS:
> s3n://<checkpoint-folder>/8ac233e4-10f9-4eb3-aa53-df6d9d7ea7be/connected-components-c1dbc2b0/3,
> expected: file:///
> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(
> RawLocalFileSystem.java:80)
> at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(
> RawLocalFileSystem.java:529)
> at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(
> RawLocalFileSystem.java:747)
> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(
> RawLocalFileSystem.java:524)
> at org.apache.hadoop.fs.ChecksumFileSystem.delete(
> ChecksumFileSystem.java:534)
> at org.graphframes.lib.ConnectedComponents$.org$graphframes$lib$
> ConnectedComponents$$run(ConnectedComponents.scala:340)
> at org.graphframes.lib.ConnectedComponents.run(
> ConnectedComponents.scala:139)
> at GraphTest.main(GraphTest.java:31) ----------- Application Class
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:57)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10
>
> Thanks
> Ankur
>
> On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi
>>
>> I am rerunning the pipeline to generate the exact trace, I have below
>> part of trace from last run:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS:
>> s3n://<folder-path>, expected: file:///
>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
>> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalF
>> ileSystem.java:69)
>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
>> alFileSystem.java:516)
>> at 
>> org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:528)
>>
>>
>> Also I think the error is happening in this part of the code
>> "ConnectedComponents.scala:339" I am referring the code @
>> https://github.com/graphframes/graphframes/blob/master/src/
>> main/scala/org/graphframes/lib/ConnectedComponents.scala
>>
>>       if (shouldCheckpoint && (iteration % checkpointInterval == 0)) {
>>         // TODO: remove this after DataFrame.checkpoint is implemented
>>         val out = s"${checkpointDir.get}/$iteration"
>>         ee.write.parquet(out)
>>         // may hit S3 eventually consistent issue
>>         ee = sqlContext.read.parquet(out)
>>
>>         // remove previous checkpoint
>>         if (iteration > checkpointInterval) {
>>           *FileSystem.get(sc.hadoopConfiguration)*
>> *            .delete(new Path(s"${checkpointDir.get}/${iteration -
>> checkpointInterval}"), true)*
>>         }
>>
>>         System.gc() // hint Spark to clean shuffle directories
>>       }
>>
>>
>> Thanks
>> Ankur
>>
>> On Wed, Jan 4, 2017 at 5:19 PM, Felix Cheung <felixcheun...@hotmail.com>
>> wrote:
>>
>>> Do you have more of the exception stack?
>>>
>>>
>>> ------------------------------
>>> *From:* Ankur Srivastava <ankur.srivast...@gmail.com>
>>> *Sent:* Wednesday, January 4, 2017 4:40:02 PM
>>> *To:* user@spark.apache.org
>>> *Subject:* Spark GraphFrame ConnectedComponents
>>>
>>> Hi,
>>>
>>> I am trying to use the ConnectedComponent algorithm of GraphFrames but
>>> by default it needs a checkpoint directory. As I am running my spark
>>> cluster with S3 as the DFS and do not have access to HDFS file system I
>>> tried using a s3 directory as checkpoint directory but I run into below
>>> exception:
>>>
>>> Exception in thread "main"java.lang.IllegalArgumentException: Wrong FS:
>>> s3n://<folder-path>, expected: file:///
>>>
>>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
>>>
>>> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalF
>>> ileSystem.java:69)
>>>
>>> If I set checkpoint interval to -1 to avoid checkpointing the driver
>>> just hangs after 3 or 4 iterations.
>>>
>>> Is there some way I can set the default FileSystem to S3 for Spark or
>>> any other option?
>>>
>>> Thanks
>>> Ankur
>>>
>>>
>>
>
>
>

Reply via email to