Re: Spark Streaming S3 Error

2016-05-21 Thread Benjamin Kim
I got my answer.

The way to access S3 has changed.

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", accessKey)
hadoopConf.set("fs.s3a.secret.key", secretKey)

val lines = ssc.textFileStream("s3a://amg-events-out/")

This worked.

Cheers,
Ben


> On May 21, 2016, at 4:18 AM, Ted Yu  wrote:
> 
> Maybe more than one version of jets3t-xx.jar was on the classpath.
> 
> FYI
> 
> On Fri, May 20, 2016 at 8:31 PM, Benjamin Kim  > wrote:
> I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
> Spark 1.6.0. It seems not to work. I keep getting this error.
> 
> Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
> stack
> Exception Details:
>   Location:
> 
> org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
>  @155: invokevirtual
>   Reason:
> Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
> assignable to 'org/jets3t/service/model/StorageObject'
>   Current Frame:
> bci: @155
> flags: { }
> locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
> stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
> integer }
>   Bytecode:
> 0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
> 0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
> 0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
> 0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
> 0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
> 0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
> 0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
> 0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
> 0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
> 0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
> 0x0a0: 000a 4e2a 2d2b b700 c7b1
>   Exception Handler Table:
> bci [0, 116] => handler: 162
> bci [117, 159] => handler: 162
>   Stackmap Table:
> same_frame_extended(@65)
> same_frame(@117)
> same_locals_1_stack_item_frame(@162,Object[#139])
> same_frame(@169)
> 
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.streaming.dstream.FileInputDStream.org 
> $apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at scala.Option.orElse(Option.scala:257)
> at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at 
> 

Re: Spark Streaming S3 Error

2016-05-21 Thread Benjamin Kim
Ted,

I only see 1 jets3t-0.9.0 jar in the classpath after running this to list the 
jars.

val cl = ClassLoader.getSystemClassLoader
cl.asInstanceOf[java.net.URLClassLoader].getURLs.foreach(println)

/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/jars/jets3t-0.9.0.jar

I don’t know what else could be wrong.

Thanks,
Ben

> On May 21, 2016, at 4:18 AM, Ted Yu  wrote:
> 
> Maybe more than one version of jets3t-xx.jar was on the classpath.
> 
> FYI
> 
> On Fri, May 20, 2016 at 8:31 PM, Benjamin Kim  > wrote:
> I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
> Spark 1.6.0. It seems not to work. I keep getting this error.
> 
> Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
> stack
> Exception Details:
>   Location:
> 
> org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
>  @155: invokevirtual
>   Reason:
> Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
> assignable to 'org/jets3t/service/model/StorageObject'
>   Current Frame:
> bci: @155
> flags: { }
> locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
> stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
> integer }
>   Bytecode:
> 0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
> 0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
> 0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
> 0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
> 0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
> 0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
> 0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
> 0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
> 0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
> 0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
> 0x0a0: 000a 4e2a 2d2b b700 c7b1
>   Exception Handler Table:
> bci [0, 116] => handler: 162
> bci [117, 159] => handler: 162
>   Stackmap Table:
> same_frame_extended(@65)
> same_frame(@117)
> same_locals_1_stack_item_frame(@162,Object[#139])
> same_frame(@169)
> 
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.streaming.dstream.FileInputDStream.org 
> $apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at scala.Option.orElse(Option.scala:257)
> at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at 

Re: Spark Streaming S3 Error

2016-05-21 Thread Ted Yu
Maybe more than one version of jets3t-xx.jar was on the classpath.

FYI

On Fri, May 20, 2016 at 8:31 PM, Benjamin Kim  wrote:

> I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of
> Spark 1.6.0. It seems not to work. I keep getting this error.
>
> Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on
> operand stack
> Exception Details:
>   Location:
>
> org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
> @155: invokevirtual
>   Reason:
> Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is
> not assignable to 'org/jets3t/service/model/StorageObject'
>   Current Frame:
> bci: @155
> flags: { }
> locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore',
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object'
> }
> stack: { 'org/jets3t/service/S3Service', 'java/lang/String',
> 'java/lang/String', 'java/lang/String',
> 'org/jets3t/service/model/S3Object', integer }
>   Bytecode:
> 0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
> 0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
> 0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
> 0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
> 0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
> 0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
> 0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
> 0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
> 0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
> 0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
> 0x0a0: 000a 4e2a 2d2b b700 c7b1
>   Exception Handler Table:
> bci [0, 116] => handler: 162
> bci [117, 159] => handler: 162
>   Stackmap Table:
> same_frame_extended(@65)
> same_frame(@117)
> same_locals_1_stack_item_frame(@162,Object[#139])
> same_frame(@169)
>
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.streaming.dstream.FileInputDStream.org
> $apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
> at
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
> at
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
> at
> 

Spark Streaming S3 Error

2016-05-20 Thread Benjamin Kim
I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
Spark 1.6.0. It seems not to work. I keep getting this error.

Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
stack
Exception Details:
  Location:

org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
 @155: invokevirtual
  Reason:
Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
assignable to 'org/jets3t/service/model/StorageObject'
  Current Frame:
bci: @155
flags: { }
locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
integer }
  Bytecode:
0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
0x0a0: 000a 4e2a 2d2b b700 c7b1   
  Exception Handler Table:
bci [0, 116] => handler: 162
bci [117, 159] => handler: 162
  Stackmap Table:
same_frame_extended(@65)
same_frame(@117)
same_locals_1_stack_item_frame(@162,Object[#139])
same_frame(@169)

at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at 
org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
at 
org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at