He is using CSV and either ORC or parquet would be fine.
> On 28. Jan 2018, at 06:49, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > > Hi, > > There is definitely a parameter while creating temporary security credential > to mention the number of minutes those credentials will be active. There is > an upper limit ofcourse which is around 3 days in case I remember correctly > and the default, as you can see, is 30 mins. > > Can you let me know: > 1. how you are generating the credentials? (the exact code) > 2. doing S3 writes from local network is super suboptimal anyway given the > network latency and cost associated with it. So why are you doing it? > 3. when you are porting your code to EMR, do you still use accesskeys or do > you have to change your code? > 4. Any particular reason why your partition value has "-" in it, therefore I > am trying to understand why is the partition value 2018-01-23 instead of > 20180123? Are you considering the partition type to be String? > 5. Have you heard of and tried using spot instances, the cost is so > ridiculously low at that point of time, that there is no need to be running > the code locally (I am expecting that since you can run the code locally, > therefore the EMR instance size and node type would be small) > 6. Why are you not using Parquet format and using ORC instead? I think that > many more products use Parquet and only HIVE uses ORC format. > > Regards, > Gourav Sengupta > >> On Tue, Jan 23, 2018 at 10:58 PM, Vasyl Harasymiv >> <vasyl.harasy...@gmail.com> wrote: >> Hi Spark Community, >> >> Saving a data frame into a file on S3 using: >> >> df.write.csv(s3_location) >> >> If run for longer than 30 mins, the following error persists: >> >> The provided token has expired. (Service: Amazon S3; Status Code: 400; Error >> Code: ExpiredToken;`) >> >> Potentially, because there is a hardcoded session limit in temporary S3 >> connection from Spark. >> >> One can specify the duration as per here: >> >> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html >> >> One can, of course, chunk data into sub-30 min writes. However, Is there a >> way to change the token expiry parameter directly in Spark before using >> "write.csv"? >> >> Thanks a lot for any help! >> Vasyl >> >> >> >> >> >>> On Tue, Jan 23, 2018 at 2:46 PM, Toy <noppani...@gmail.com> wrote: >>> Thanks, I get this error when I switched to s3a:// >>> >>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: >>> com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287) >>> at >>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669) >>> >>>> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <palw...@hortonworks.com> >>>> wrote: >>>> Spark cannot read locally from S3 without an S3a protocol; you’ll more >>>> than likely need a local copy of the data or you’ll need to utilize the >>>> proper jars to enable S3 communication from the edge to the datacenter. >>>> >>>> >>>> >>>> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark >>>> >>>> >>>> >>>> Here are the jars: >>>> https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws >>>> >>>> >>>> >>>> Looks like you already have them, in which case you’ll have to make small >>>> configuration changes, e.g. s3 à s3a >>>> >>>> >>>> >>>> Keep in mind: The Amazon JARs have proven very brittle: the version of the >>>> Amazon libraries must match the versions against which the Hadoop binaries >>>> were built. >>>> >>>> >>>> >>>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> From: Toy <noppani...@gmail.com> >>>> Date: Tuesday, January 23, 2018 at 11:33 AM >>>> To: "user@spark.apache.org" <user@spark.apache.org> >>>> Subject: I can't save DataFrame from running Spark locally >>>> >>>> >>>> >>>> Hi, >>>> >>>> >>>> >>>> First of all, my Spark application runs fine in AWS EMR. However, I'm >>>> trying to run it locally to debug some issue. My application is just to >>>> parse log files and convert to DataFrame then convert to ORC and save to >>>> S3. However, when I run locally I get this error >>>> >>>> >>>> >>>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist >>>> >>>> at >>>> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170) >>>> >>>> at >>>> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221) >>>> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> >>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>> >>>> at >>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) >>>> >>>> at >>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >>>> >>>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source) >>>> >>>> at >>>> org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340) >>>> >>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) >>>> >>>> at >>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77) >>>> >>>> at >>>> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) >>>> >>>> at >>>> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) >>>> >>>> at >>>> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) >>>> >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) >>>> >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) >>>> >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) >>>> >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>> >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) >>>> >>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) >>>> >>>> at >>>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) >>>> >>>> at >>>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) >>>> >>>> at >>>> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492) >>>> >>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) >>>> >>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) >>>> >>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193) >>>> >>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170) >>>> >>>> >>>> >>>> Here's what I have in sbt >>>> >>>> >>>> >>>> scalaVersion := "2.11.8" >>>> >>>> >>>> >>>> val sparkVersion = "2.1.0" >>>> >>>> val hadoopVersion = "2.7.3" >>>> >>>> val awsVersion = "1.11.155" >>>> >>>> >>>> >>>> lazy val sparkAndDependencies = Seq( >>>> >>>> "org.apache.spark" %% "spark-core" % sparkVersion, >>>> >>>> "org.apache.spark" %% "spark-sql" % sparkVersion, >>>> >>>> "org.apache.spark" %% "spark-hive" % sparkVersion, >>>> >>>> "org.apache.spark" %% "spark-streaming" % sparkVersion, >>>> >>>> >>>> >>>> "org.apache.hadoop" % "hadoop-aws" % hadoopVersion, >>>> >>>> "org.apache.hadoop" % "hadoop-common" % hadoopVersion >>>> >>>> ) >>>> >>>> >>>> >>>> And this is where the code failed >>>> >>>> >>>> >>>> val sparrowWriter = >>>> sparrowCastedDf.write.mode("append").format("orc").option("compression", >>>> "zlib") >>>> >>>> sparrowWriter.save(sparrowOutputPath) >>>> >>>> >>>> >>>> sparrowOutputPath is something like s3://bucket/folder and it exists I >>>> checked it with aws command line >>>> >>>> >>>> >>>> I put a breakpoint there and the full path looks like this >>>> s3://bucket/orc/dt=2018-01-23 which exists. >>>> >>>> >>>> >>>> I have also set up the credentials like this >>>> >>>> >>>> >>>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key") >>>> >>>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret") >>>> >>>> >>>> >>>> My confusion is this code runs fine in the cluster but I get this error >>>> running locally. >>>> >>>> >>>> >>>> >>>> >> >