Thanks for the detailed information!

Now I can confirm that this is a backwards-compatibility issue. The data written by parquet 1.6rc7 follows the standard LIST structure. However, Spark SQL still uses old parquet-avro style two-level structures, which causes the problem.

Cheng

On 4/27/15 7:07 PM, Jianshi Huang wrote:
FYI,

Parquet schema output:

message pig_schema {
  optional binary cust_id (UTF8);
  optional int32 part_num;
  optional group ip_list (LIST) {
    repeated group ip_t {
      optional binary ip (UTF8);
    }
  }
  optional group vid_list (LIST) {
    repeated group vid_t {
      optional binary vid (UTF8);
    }
  }
  optional group fso_list (LIST) {
    repeated group fso_t {
      optional binary fso (UTF8);
    }
  }
}


And Parquet meta output:

creator: [parquet-mr (build ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr version 1.6.0rc7 (build ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr] extra: pig.schema = cust_id: chararray,part_num: int,ip_list: {ip_t: (ip: chararray)},vid_list: {vid_t: (vid: chararray)},fso_list: {fso_t: (fso: chararray)}

file schema: pig_schema
--------------------------------------------------------------------------------
cust_id:     OPTIONAL BINARY O:UTF8 R:0 D:1
part_num:    OPTIONAL INT32 R:0 D:1
ip_list:     OPTIONAL F:1
.ip_t:       REPEATED F:1
..ip:        OPTIONAL BINARY O:UTF8 R:1 D:3
vid_list:    OPTIONAL F:1
.vid_t:      REPEATED F:1
..vid:       OPTIONAL BINARY O:UTF8 R:1 D:3
fso_list:    OPTIONAL F:1
.fso_t:      REPEATED F:1
..fso:       OPTIONAL BINARY O:UTF8 R:1 D:3

row group 1: RC:1201092 TS:537930256 OFFSET:4
--------------------------------------------------------------------------------
cust_id: BINARY GZIP DO:0 FPO:4 SZ:10629422/27627221/2.60 VC:1201092 ENC:PLAIN,RLE,BIT_PACKED part_num: INT32 GZIP DO:0 FPO:10629426 SZ:358/252/0.70 VC:1201092 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED
ip_list:
.ip_t:
..ip: BINARY GZIP DO:0 FPO:10629784 SZ:41331065/180501686/4.37 VC:10540378 ENC:PLAIN,RLE
vid_list:
.vid_t:
..vid: BINARY GZIP DO:0 FPO:51960849 SZ:58820404/254819721/4.33 VC:11011894 ENC:PLAIN,RLE
fso_list:
.fso_t:
..fso: BINARY GZIP DO:0 FPO:110781253 SZ:21363255/74981376/3.51 VC:5612655 ENC:PLAIN,RLE

row group 2: RC:1830769 TS:1045506907 OFFSET:132144508
--------------------------------------------------------------------------------
cust_id: BINARY GZIP DO:0 FPO:132144508 SZ:17720131/42110882/2.38 VC:1830769 ENC:PLAIN,RLE,BIT_PACKED part_num: INT32 GZIP DO:0 FPO:149864639 SZ:486/346/0.71 VC:1830769 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED
ip_list:
.ip_t:
..ip: BINARY GZIP DO:0 FPO:149865125 SZ:37687630/342050955/9.08 VC:20061916 ENC:PLAIN,PLAIN_DICTIONARY,RLE
vid_list:
.vid_t:
..vid: BINARY GZIP DO:0 FPO:187552755 SZ:56498124/516700215/9.15 VC:22410351 ENC:PLAIN,PLAIN_DICTIONARY,RLE
fso_list:
.fso_t:
..fso: BINARY GZIP DO:0 FPO:244050879 SZ:20110276/144644509/7.19 VC:10739272 ENC:PLAIN,PLAIN_DICTIONARY,RLE

row group 3: RC:22445 TS:4304290 OFFSET:264161155
--------------------------------------------------------------------------------
cust_id: BINARY GZIP DO:0 FPO:264161155 SZ:221527/516312/2.33 VC:22445 ENC:PLAIN,RLE,BIT_PACKED part_num: INT32 GZIP DO:0 FPO:264382682 SZ:102/64/0.63 VC:22445 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED
ip_list:
.ip_t:
..ip: BINARY GZIP DO:0 FPO:264382784 SZ:483962/1204312/2.49 VC:123097 ENC:PLAIN_DICTIONARY,RLE
vid_list:
.vid_t:
..vid: BINARY GZIP DO:0 FPO:264866746 SZ:622977/2122080/3.41 VC:133136 ENC:PLAIN,PLAIN_DICTIONARY,RLE
fso_list:
.fso_t:
..fso: BINARY GZIP DO:0 FPO:265489723 SZ:240588/461522/1.92 VC:62173 ENC:PLAIN_DICTIONARY,RLE

Jianshi


On Mon, Apr 27, 2015 at 12:40 PM, Cheng Lian <lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>> wrote:

    Had an offline discussion with Jianshi, the dataset was generated
    by Pig.

    Jianshi - Could you please attach the output of "parquet-schema
    <path-to-parquet-file>"? I guess this is a Parquet format
    backwards-compatibility issue. Parquet hadn't standardized
    representation of LIST and MAP until recently, thus many systems
    made their own choice and are not easily inter-operatable. In
    earlier days, Spark SQL used LIST and MAP formats similar to Avro,
    which was unfortunately not chosen as the current standard format.
    Details can be found here:
    https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
    This document also defines backwards-compatibility rules to handle
    legacy Parquet data written by old Parquet implementations in
    various systems.

    So ideally, now Spark SQL should always write data following the
    standard, and implement all backwards-compatibility rules to read
    legacy data. JIRA issue for this is
    https://issues.apache.org/jira/browse/SPARK-6774

    I'm working on a PR https://github.com/apache/spark/pull/5422 for
    this. To fix SPARK-6774, we need to implement
    backwards-compatibility rules in both record converter and schema
    converter together. This PR has fixed the former, but I still need
    some time to finish the latter part and add tests.

    Cheng

    On 4/25/15 2:22 AM, Yin Huai wrote:
    oh, I missed that. It is fixed in 1.3.0.

    Also, Jianshi, the dataset was not generated by Spark SQL, right?

    On Fri, Apr 24, 2015 at 11:09 AM, Ted Yu <yuzhih...@gmail.com
    <mailto:yuzhih...@gmail.com>> wrote:

        Yin:
        Fix Version of SPARK-4520 is not set.
        I assume it was fixed in 1.3.0

        Cheers
        Fix Version

        On Fri, Apr 24, 2015 at 11:00 AM, Yin Huai
        <yh...@databricks.com <mailto:yh...@databricks.com>> wrote:

            The exception looks like the one mentioned in
            https://issues.apache.org/jira/browse/SPARK-4520. What is
            the version of Spark?

            On Fri, Apr 24, 2015 at 2:40 AM, Jianshi Huang
            <jianshi.hu...@gmail.com
            <mailto:jianshi.hu...@gmail.com>> wrote:

                Hi,

                My data looks like this:

                +-----------+----------------------------+----------+
                | col_name | data_type | comment |
                +-----------+----------------------------+----------+
                | cust_id | string | |
                | part_num | int | |
                | ip_list | array<struct<ip:string>> | |
                | vid_list | array<struct<vid:string>> | |
                | fso_list | array<struct<fso:string>> | |
                | src | string | |
                | date | int | |
                +-----------+----------------------------+----------+
                And I did select *, it reports ParquetDecodingException.
                Is this type not supported in SparkSQL?
                Detailed error message here:

                Error: org.apache.spark.SparkException: Job aborted
                due to stage failure: Task 0 in stage 27.0 failed 4
                times, most recent failure: Lost task 0.3 in stage
                27.0 (TID 510, lvshdc5dn0542.lvs.paypal.com
                <http://lvshdc5dn0542.lvs.paypal.com>):
                parquet.io.ParquetDecodingException:
                Can not read value at 0 in block -1 in file
                hdfs://xxx/part-m-00000.gz.parquet
                at
                
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)

                at
                
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)

                at
                
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)

                at
                
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

                at
                scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

                at
                scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

                at
                scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)

                at
                scala.collection.Iterator$class.foreach(Iterator.scala:727)

                at
                scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

                at
                
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

                at
                
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

                at
                
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

                at scala.collection.TraversableOnce$class.to
                <http://class.to>(TraversableOnce.scala:273)
                at scala.collection.AbstractIterator.to
                
<http://scala.collection.AbstractIterator.to>(Iterator.scala:1157)

                at
                
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

                at
                scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

                at
                
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

                at
                scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

                at
                
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)

                at
                
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)

                at
                
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)

                at
                
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)

                at
                
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

                at org.apache.spark.scheduler.Task.run(Task.scala:64)
                at
                
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

                at
                
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

                at
                
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

                at java.lang.Thread.run(Thread.java:724)
                Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
                at java.util.ArrayList.elementData(ArrayList.java:400)
                at java.util.ArrayList.get(ArrayList.java:413)
                at
                parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
                at
                parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
                at
                parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)

                at
                parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)

                at
                
parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:290)

                at
                parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)

                at
                parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)

                at
                
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)

                at
                
parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)

                at
                
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)

                at
                
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)


-- Jianshi Huang

                LinkedIn: jianshi
                Twitter: @jshuang
                Github & Blog: http://huangjs.github.com/








--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to