I have this spark job which is using S3 client in mapPartition. And I get
this error
Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 3.0 (TID 74,
ip-10-90-78-177.ec2.internal, executor 11): java.lang.NoClassDefFoundError:
Could not initialize class com.amazonaws.services.s3.AmazonS3ClientBuilder
+details
Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 3.0 (TID 74,
ip-10-90-78-177.ec2.internal, executor 11): java.lang.NoClassDefFoundError:
Could not initialize class com.amazonaws.services.s3.AmazonS3ClientBuilder
at SparrowOrc$$anonfun$1.apply(sparrowOrc.scala:49)
at SparrowOrc$$anonfun$1.apply(sparrowOrc.scala:46)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
This is my code
val jsonRows = sqs.mapPartitions(partitions => {
val s3Client = AmazonS3ClientBuilder.standard().withCredentials(new
DefaultCredentialsProvider).build()
val txfm = new LogLine2Json
val log = Logger.getLogger("parseLog")
partitions.flatMap(messages => {
val sqsMsg = Json.parse(messages)
val bucketName =
Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"",
"")
val key =
Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"",
"")
val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
val stream = obj.getObjectContent()
scala.io.Source.fromInputStream(stream).getLines().map(line => {
try {
txfm.parseLine(line)
}
catch {
case e: Throwable => {
log.info(line); "{}";
}
}
}).filter(line => line != "{}")
})
})
This is my build.sbt
name := "sparrow-to-orc"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.1.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0" %
"provided"
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3" %
"provided"
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.3" %
"provided"
libraryDependencies += "com.cn" %% "sparrow-clf-parser" % "1.1-SNAPSHOT"
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.155"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.155"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-core" % "1.11.155"
libraryDependencies += "com.github.seratch" %% "awscala" % "0.6.+"
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.6.0"
dependencyOverrides ++= Set("com.fasterxml.jackson.core" %
"jackson-databind" % "2.6.0")
assemblyMergeStrategy in assembly := {
case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last
case PathList("javax", "inject", xs @ _*) => MergeStrategy.last
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
case PathList("org", "apache", xs @ _*) => MergeStrategy.last
case PathList("com", "google", xs @ _*) => MergeStrategy.last
case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
case PathList("com", "amazonaws", xs @ _*) => MergeStrategy.last
case PathList("com", "typesafe", xs @ _*) => MergeStrategy.last
case "about.html" => MergeStrategy.rename
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "plugin.properties" => MergeStrategy.last
case "log4j.properties" => MergeStrategy.last
case "overview.html" => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}