[ https://issues.apache.org/jira/browse/SPARK-21074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056309#comment-16056309 ]
Steve Loughran commented on SPARK-21074: ---------------------------------------- Given this is an s3 URL, it may be amplifying the problem, with issues such as HADOOP-11570 and HADOOP-12376; Http 1.1 clients optimising prematurely for connection re-use by reading all the way to the end of the stream. an open(0) and a seek(end - a bit) would cause them to read all the way through the file at first, then re-issue a new GET. Assuming this is the amazon EMR s3 client, I can't say if it has the same issue. # Test with local files to see if the problem goes away. If it doesn't, problem in parquet # move to Hadoop 2.7+ Jars and s3a:// URLs. Better yet, drop in the 2.8 binaries & matching AWS SDK to get the high-performance-seek code, including metrics you get to see when you call toString() on the FS instance. > Parquet files are read fully even though only count() is requested > ------------------------------------------------------------------ > > Key: SPARK-21074 > URL: https://issues.apache.org/jira/browse/SPARK-21074 > Project: Spark > Issue Type: Improvement > Components: Optimizer > Affects Versions: 2.1.0 > Reporter: Michael Spector > > I have the following sample code that creates parquet files: > {code:java} > val spark = SparkSession.builder() > .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", > "2") > .config("spark.hadoop.parquet.metadata.read.parallelism", "50") > .appName("Test Write").getOrCreate() > val sqc = spark.sqlContext > import sqc.implicits._ > val random = new scala.util.Random(31L) > (1465720077 to 1465720077+10000000).map(x => Event(x, random.nextString(2))) > .toDS() > .write > .mode(SaveMode.Overwrite) > .parquet("s3://my-bucket/test") > {code} > Afterwards, I'm trying to read these files with the following code: > {code:java} > val spark = SparkSession.builder() > .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", > "2") > .config("spark.hadoop.parquet.metadata.read.parallelism", "50") > .config("spark.sql.parquet.filterPushdown", "true") > .appName("Test Read").getOrCreate() > spark.sqlContext.read > .option("mergeSchema", "false") > .parquet("s3://my-bucket/test") > .count() > {code} > I've enabled DEBUG log level to see what requests are actually sent through > S3 API, and I've figured out that in addition to parquet "footer" retrieval > there are requests that ask for whole file content. > For example, this is full content request example: > {noformat} > 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET > /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet > HTTP/1.1[\r][\n]" > .... > 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes > 0-7472093/7472094[\r][\n]" > .... > 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: > 7472094[\r][\n]" > {noformat} > And this is partial request example for footer only: > {noformat} > 17/06/13 05:46:50 DEBUG headers: http-outgoing-2 >> GET > /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1 > .... > 17/06/13 05:46:50 DEBUG headers: http-outgoing-2 >> Range: > bytes=7472086-7472094 > ... > 17/06/13 05:46:50 DEBUG wire: http-outgoing-2 << "Content-Length: 8[\r][\n]" > .... > {noformat} > Here's what FileScanRDD prints: > {noformat} > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00004-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7473020, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00011-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472503, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00006-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472501, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00007-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7473104, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00003-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472458, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00012-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472594, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00001-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472984, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00014-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472720, partition values: [empty row] > 17/06/13 05:46:53 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00008-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472339, partition values: [empty row] > 17/06/13 05:46:53 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00015-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472437, partition values: [empty row] > 17/06/13 05:46:53 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00013-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472312, partition values: [empty row] > 17/06/13 05:46:53 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00002-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472191, partition values: [empty row] > 17/06/13 05:46:53 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00005-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472239, partition values: [empty row] > 17/06/13 05:46:53 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472094, partition values: [empty row] > 17/06/13 05:46:53 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00010-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7471960, partition values: [empty row] > 17/06/13 05:46:53 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00009-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7471520, partition values: [empty row] > {noformat} > Parquet tool info (on one of the files): > {noformat} > ~$ parquet-tools meta > part-00009-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet > file: > file:/Users/michael/part-00009-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet > creator: parquet-mr version 1.8.1 (build > 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) > extra: org.apache.spark.sql.parquet.row.metadata = > {"type":"struct","fields":[{"name":"time","type":"long","nullable":false,"metadata":{}},{"name":"country","type":"string","nullable":true,"metadata":{}}]} > file schema: spark_schema > -------------------------------------------------------------------------------- > time: REQUIRED INT64 R:0 D:0 > country: OPTIONAL BINARY O:UTF8 R:0 D:1 > row group 1: RC:625000 TS:11201229 OFFSET:4 > -------------------------------------------------------------------------------- > time: INT64 SNAPPY DO:0 FPO:4 SZ:2501723/5000239/2.00 VC:625000 > ENC:PLAIN,BIT_PACKED > country: BINARY SNAPPY DO:0 FPO:2501727 SZ:4969317/6200990/1.25 > VC:625000 ENC:PLAIN,BIT_PACKED,RLE > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org