He is using CSV and either ORC or parquet would be fine.

> On 28. Jan 2018, at 06:49, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
> 
> Hi,
> 
> There is definitely a parameter while creating temporary security credential 
> to mention the number of minutes those credentials will be active. There is 
> an upper limit ofcourse which is around 3 days in case I remember correctly 
> and the default, as you can see, is 30 mins.
> 
> Can you let me know:
> 1. how you are generating the credentials? (the exact code)
> 2. doing S3 writes from local network is super suboptimal anyway given the 
> network latency and cost associated with it. So why are you doing it?
> 3. when you are porting your code to EMR, do you still use accesskeys or do 
> you have to change your code?
> 4. Any particular reason why your partition value has "-" in it, therefore I 
> am trying to understand why is the partition value 2018-01-23 instead of 
> 20180123? Are you considering the partition type to be String?
> 5. Have you heard of and tried using spot instances, the cost is so 
> ridiculously low at that point of time, that there is no need to be running 
> the code locally (I am expecting that since you can run the code locally, 
> therefore the EMR instance size and node type would be small)
> 6. Why are you not using Parquet format and using ORC instead? I think that 
> many more products use Parquet and only HIVE uses ORC format.
> 
> Regards,
> Gourav Sengupta
> 
>> On Tue, Jan 23, 2018 at 10:58 PM, Vasyl Harasymiv 
>> <vasyl.harasy...@gmail.com> wrote:
>> Hi Spark Community,
>> 
>> Saving a data frame into a file on S3 using:
>> 
>> df.write.csv(s3_location)
>> 
>> If run for longer than 30 mins, the following error persists:
>> 
>> The provided token has expired. (Service: Amazon S3; Status Code: 400; Error 
>> Code: ExpiredToken;`)
>> 
>> Potentially, because there is a hardcoded session limit in temporary S3 
>> connection from Spark.
>> 
>> One can specify the duration as per here:
>> 
>> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html
>> 
>> One can, of course, chunk data into sub-30 min writes. However, Is there a 
>> way to change the token expiry parameter directly in Spark before using 
>> "write.csv"?
>> 
>> Thanks a lot for any help!
>> Vasyl
>> 
>> 
>> 
>> 
>> 
>>> On Tue, Jan 23, 2018 at 2:46 PM, Toy <noppani...@gmail.com> wrote:
>>> Thanks, I get this error when I switched to s3a://
>>> 
>>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: 
>>> com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
>>>     at 
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
>>>     at 
>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>>> 
>>>> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <palw...@hortonworks.com> 
>>>> wrote:
>>>> Spark cannot read locally from S3 without an S3a protocol; you’ll more 
>>>> than likely need a local copy of the data or you’ll need to utilize the 
>>>> proper jars to enable S3 communication from the edge to the datacenter.
>>>> 
>>>>  
>>>> 
>>>> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark
>>>> 
>>>>  
>>>> 
>>>> Here are the jars: 
>>>> https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
>>>> 
>>>>  
>>>> 
>>>> Looks like you already have them, in which case you’ll have to make small 
>>>> configuration changes, e.g. s3 à s3a
>>>> 
>>>>  
>>>> 
>>>> Keep in mind: The Amazon JARs have proven very brittle: the version of the 
>>>> Amazon libraries must match the versions against which the Hadoop binaries 
>>>> were built.
>>>> 
>>>>  
>>>> 
>>>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>> From: Toy <noppani...@gmail.com>
>>>> Date: Tuesday, January 23, 2018 at 11:33 AM
>>>> To: "user@spark.apache.org" <user@spark.apache.org>
>>>> Subject: I can't save DataFrame from running Spark locally
>>>> 
>>>>  
>>>> 
>>>> Hi,
>>>> 
>>>>  
>>>> 
>>>> First of all, my Spark application runs fine in AWS EMR. However, I'm 
>>>> trying to run it locally to debug some issue. My application is just to 
>>>> parse log files and convert to DataFrame then convert to ORC and save to 
>>>> S3. However, when I run  locally I get this error
>>>> 
>>>>  
>>>> 
>>>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>>>> 
>>>> at 
>>>> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
>>>> 
>>>> at 
>>>> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
>>>> 
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> 
>>>> at 
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> 
>>>> at 
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> 
>>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>> 
>>>> at 
>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>> 
>>>> at 
>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>> 
>>>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>>>> 
>>>> at 
>>>> org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
>>>> 
>>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>>>> 
>>>> at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>>>> 
>>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>>>> 
>>>> at 
>>>> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>>>> 
>>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>>>> 
>>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>>>> 
>>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>>>> 
>>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>>>> 
>>>>  
>>>> 
>>>> Here's what I have in sbt
>>>> 
>>>>  
>>>> 
>>>> scalaVersion := "2.11.8"
>>>> 
>>>>  
>>>> 
>>>> val sparkVersion = "2.1.0"
>>>> 
>>>> val hadoopVersion = "2.7.3"
>>>> 
>>>> val awsVersion = "1.11.155"
>>>> 
>>>>  
>>>> 
>>>> lazy val sparkAndDependencies = Seq(
>>>> 
>>>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>>>> 
>>>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>>> 
>>>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>>>> 
>>>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>>>> 
>>>>  
>>>> 
>>>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>>>> 
>>>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>>>> 
>>>> )
>>>> 
>>>>  
>>>> 
>>>> And this is where the code failed
>>>> 
>>>>  
>>>> 
>>>> val sparrowWriter = 
>>>> sparrowCastedDf.write.mode("append").format("orc").option("compression", 
>>>> "zlib")
>>>> 
>>>> sparrowWriter.save(sparrowOutputPath)
>>>> 
>>>>  
>>>> 
>>>> sparrowOutputPath is something like s3://bucket/folder and it exists I 
>>>> checked it with aws command line
>>>> 
>>>>  
>>>> 
>>>> I put a breakpoint there and the full path looks like this 
>>>> s3://bucket/orc/dt=2018-01-23 which exists.
>>>> 
>>>>  
>>>> 
>>>> I have also set up the credentials like this
>>>> 
>>>>  
>>>> 
>>>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>>>> 
>>>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>>>> 
>>>>  
>>>> 
>>>> My confusion is this code runs fine in the cluster but I get this error 
>>>> running locally.
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>> 
> 

Reply via email to