I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
Spark 1.6.0. It seems not to work. I keep getting this error.

Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
stack
Exception Details:
  Location:
    
org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
 @155: invokevirtual
  Reason:
    Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
assignable to 'org/jets3t/service/model/StorageObject'
  Current Frame:
    bci: @155
    flags: { }
    locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
    stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
integer }
  Bytecode:
    0x0000000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
    0x0000010: 5659 b701 5713 0192 b601 5b2b b601 5b13
    0x0000020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
    0x0000030: b400 7db6 00e7 b601 5bb6 015e b901 9802
    0x0000040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
    0x0000050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
    0x0000060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
    0x0000070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
    0x0000080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
    0x0000090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
    0x00000a0: 000a 4e2a 2d2b b700 c7b1               
  Exception Handler Table:
    bci [0, 116] => handler: 162
    bci [117, 159] => handler: 162
  Stackmap Table:
    same_frame_extended(@65)
    same_frame(@117)
    same_locals_1_stack_item_frame(@162,Object[#139])
    same_frame(@169)

        at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
        at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
        at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
        at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at 
org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
        at 
org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
        at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
        at scala.util.Try$.apply(Try.scala:161)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I passed the access and secret keys using this code. I wonder if it’s incorrect.

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", accessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", secretKey)

val ssc = new StreamingContext(sc, Seconds(10))

val lines = ssc.textFileStream("s3://amg-events-outs/“)

Please let me know what I’m doing wrong.

Thanks,
Ben


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to