Re: I can't save DataFrame from running Spark locally

2018-01-23 Thread Toy
Thanks, I get this error when I switched to s3a://

Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError:
com.amazonaws.services.s3.transfer.TransferManager.(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)

On Tue, 23 Jan 2018 at 15:05 Patrick Alwell  wrote:

> Spark cannot read locally from S3 without an S3a protocol; you’ll more
> than likely need a local copy of the data or you’ll need to utilize the
> proper jars to enable S3 communication from the edge to the datacenter.
>
>
>
>
> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark
>
>
>
> Here are the jars:
> https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
>
>
>
> Looks like you already have them, in which case you’ll have to make small
> configuration changes, e.g. s3 à s3a
>
>
>
> Keep in mind: *The Amazon JARs have proven very brittle: the version of
> the Amazon libraries must match the versions against which the Hadoop
> binaries were built.*
>
>
>
>
> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client
>
>
>
>
>
>
>
>
>
> *From: *Toy 
> *Date: *Tuesday, January 23, 2018 at 11:33 AM
> *To: *"user@spark.apache.org" 
> *Subject: *I can't save DataFrame from running Spark locally
>
>
>
> Hi,
>
>
>
> First of all, my Spark application runs fine in AWS EMR. However, I'm
> trying to run it locally to debug some issue. My application is just to
> parse log files and convert to DataFrame then convert to ORC and save to
> S3. However, when I run locally I get this error
>
>
>
> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>
> at
> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
>
> at
> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>
> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>
> at
> org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
>
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>
> at
> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>
> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>
> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>
>
>
> Here's what I have in sbt
>
>
>
> scalaVersion := "2.11.8"
>
>
>
> val sparkVersion = "2.1.0"
>
> val hadoopVersion = "2.7.3"
>
> val awsVersion = "1.11.155"
>
>
>
> lazy val spar

I can't save DataFrame from running Spark locally

2018-01-23 Thread Toy
Hi,

First of all, my Spark application runs fine in AWS EMR. However, I'm
trying to run it locally to debug some issue. My application is just to
parse log files and convert to DataFrame then convert to ORC and save to
S3. However, when I run locally I get this error

java.io.IOException: /orc/dt=2018-01-23 doesn't exist
at
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)

Here's what I have in sbt

scalaVersion := "2.11.8"

val sparkVersion = "2.1.0"
val hadoopVersion = "2.7.3"
val awsVersion = "1.11.155"

lazy val sparkAndDependencies = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,

  "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
  "org.apache.hadoop" % "hadoop-common" % hadoopVersion
)

And this is where the code failed

val sparrowWriter =
sparrowCastedDf.write.mode("append").format("orc").option("compression",
"zlib")
sparrowWriter.save(sparrowOutputPath)

sparrowOutputPath is something like s3://bucket/folder and it exists I
checked it with aws command line

I put a breakpoint there and the full path looks like this
s3://bucket/orc/dt=2018-01-23 which exists.

I have also set up the credentials like this

sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")

My confusion is this code runs fine in the cluster but I get this error
running locally.


Re: [EXT] How do I extract a value in foreachRDD operation

2018-01-22 Thread Toy
Thanks Michael,

Can you give me an example? I'm new to Spark

On Mon, 22 Jan 2018 at 12:25 Michael Mansour 
wrote:

> Toy,
>
>
>
> I suggest your partition your data according to date, and use the
> forEachPartition function, using the partition as the bucket location.
>
> This would require you to define a custom hash partitioner function, but
> that is not too difficult.
>
>
>
> --
>
> Michael Mansour
>
> Data Scientist
>
> Symantec
>
> *From: *Toy 
> *Date: *Monday, January 22, 2018 at 8:19 AM
> *To: *"user@spark.apache.org" 
> *Subject: *[EXT] How do I extract a value in foreachRDD operation
>
>
>
> Hi,
>
>
>
> We have a spark application to parse log files and save to S3 in ORC
> format. However, during the foreachRDD operation we need to extract a date
> field to be able to determine the bucket location; we partition it by date.
> Currently, we just hardcode it by current date, but we have a requirement
> to determine it for each record.
>
>
>
> Here's the current code.
>
>
>
> jsonRows.foreachRDD(r => {
>
>   val parsedFormat = new SimpleDateFormat("-MM-dd/")
>
>   val parsedDate = parsedFormat.format(new java.util.Date())
>
>   val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" +
> parsedDate
>
>
>
>   val jsonDf = sqlSession.read.schema(Schema.schema).json(r)
>
>
>
>   val writer =
> jsonDf.write.mode("append").format("orc").option("compression", "zlib")
>
>
>
>   if (environment.equals("local")) {
>
> writer.save("/tmp/sparrow")
>
>   } else {
>
> writer.save(OutputPath)
>
>   }
>
> })
>
>
>
> The column in jsonRow that we want is `_ts`.
>
>
>
> Thanks.
>


How do I extract a value in foreachRDD operation

2018-01-22 Thread Toy
Hi,

We have a spark application to parse log files and save to S3 in ORC
format. However, during the foreachRDD operation we need to extract a date
field to be able to determine the bucket location; we partition it by date.
Currently, we just hardcode it by current date, but we have a requirement
to determine it for each record.

Here's the current code.

jsonRows.foreachRDD(r => {
  val parsedFormat = new SimpleDateFormat("-MM-dd/")
  val parsedDate = parsedFormat.format(new java.util.Date())
  val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" +
parsedDate

  val jsonDf = sqlSession.read.schema(Schema.schema).json(r)

  val writer =
jsonDf.write.mode("append").format("orc").option("compression", "zlib")

  if (environment.equals("local")) {
writer.save("/tmp/sparrow")
  } else {
writer.save(OutputPath)
  }
})

The column in jsonRow that we want is `_ts`.

Thanks.


How to do stop streaming before the application got killed

2017-12-22 Thread Toy
I'm trying to write a deployment job for Spark application. Basically the
job will send yarn application --kill app_id to the cluster but after the
application received the signal it dies without finishing whatever is
processing or stopping the stream.

I'm using Spark Streaming. What's the best way to stop Spark application so
we won't lose any data.


Re: Why do I see five attempts on my Spark application

2017-12-13 Thread Toy
Hi,

Can you point me to the config for that please?

On Wed, 13 Dec 2017 at 14:23 Marcelo Vanzin  wrote:

> On Wed, Dec 13, 2017 at 11:21 AM, Toy  wrote:
> > I'm wondering why am I seeing 5 attempts for my Spark application? Does
> Spark application restart itself?
>
> It restarts itself if it fails (up to a limit that can be configured
> either per Spark application or globally in YARN).
>
>
> --
> Marcelo
>


Why do I see five attempts on my Spark application

2017-12-13 Thread Toy
Hi,

I'm wondering why am I seeing 5 attempts for my Spark application? Does
Spark application restart itself?[image: Screen Shot 2017-12-13 at 2.18.03
PM.png]