Hi Freedafeng The following works for me df will be a data frame. fullPath is lists list of various part files stored in s3.
fullPath = ['s3n://xxxx/json/StreamingKafkaCollector/s1/2016-07-10/1468173040000/part-r -00000-a2121800-fa5b-44b1-a994-67795' ] from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format('json').load(fullPath).select(³key1") #.cache() My file is raw JSON. I you might have to tweak the above statement to work with gz files Andy From: freedafeng <freedaf...@yahoo.com> Date: Wednesday, July 27, 2016 at 10:36 AM To: "user @spark" <user@spark.apache.org> Subject: spark 1.6.0 read s3 files error. > cdh 5.7.1. pyspark. > > codes: =================== > from pyspark import SparkContext, SparkConf > > conf = SparkConf().setAppName('s3 ---') > sc = SparkContext(conf=conf) > > myRdd = > sc.textFile("s3n://****/y=2016/m=5/d=26/h=20/2016.5.26.21.9.52.6d53180a-28b9-4 > e65-b749-b4a2694b9199.json.gz") > > count = myRdd.count() > print "The count is", count > > ======================= > standalone mode: command line: > > AWS_ACCESS_KEY_ID=??? AWS_SECRET_ACCESS_KEY=??? ./bin/spark-submit > --driver-memory 4G --master spark://master:7077 --conf > "spark.default.parallelism=70" /root/workspace/test/s3.py > > Error: > ) > 16/07/27 17:27:26 INFO spark.SparkContext: Created broadcast 0 from textFile > at NativeMethodAccessorImpl.java:-2 > Traceback (most recent call last): > File "/root/workspace/test/s3.py", line 12, in <module> > count = myRdd.count() > File > "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/pyspark > .zip/pyspark/rdd.py", > line 1004, in count > File > "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/pyspark > .zip/pyspark/rdd.py", > line 995, in sum > File > "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/pyspark > .zip/pyspark/rdd.py", > line 869, in fold > File > "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/pyspark > .zip/pyspark/rdd.py", > line 771, in collect > File > "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/py4j-0. > 9-src.zip/py4j/java_gateway.py", > line 813, in __call__ > File > "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/py4j-0. > 9-src.zip/py4j/protocol.py", > line 308, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe. > : java.lang.VerifyError: Bad type on operand stack > Exception Details: > Location: > > org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/Stri > ng;Ljava/lang/String;)V > @155: invokevirtual > Reason: > Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is > not assignable to 'org/jets3t/service/model/StorageObject' > Current Frame: > bci: @155 > flags: { } > locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', > 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' > } > stack: { 'org/jets3t/service/S3Service', 'java/lang/String', > 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', > integer } > Bytecode: > 0000000: b200 fcb9 0190 0100 9900 39b2 00fc bb01 > 0000010: 5659 b701 5713 0192 b601 5b2b b601 5b13 > 0000020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a > 0000030: b400 7db6 00e7 b601 5bb6 015e b901 9802 > 0000040: 002a b400 5799 0030 2ab4 0047 2ab4 007d > 0000050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e > 0000060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d > 0000070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a > 0000080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600 > 0000090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7 > 00000a0: 000a 4e2a 2d2b b700 c7b1 > Exception Handler Table: > bci [0, 116] => handler: 162 > bci [117, 159] => handler: 162 > Stackmap Table: > same_frame_extended(@65) > same_frame(@117) > same_locals_1_stack_item_frame(@162,Object[#139]) > same_frame(@169) > > at > org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3Fi > leSystem.java:338) > at > org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem > .java:328) > > ..... > > TIA > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-0-read-s3-files- > error-tp27417.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >