FYI,

Parquet schema output:

message pig_schema {
  optional binary cust_id (UTF8);
  optional int32 part_num;
  optional group ip_list (LIST) {
    repeated group ip_t {
      optional binary ip (UTF8);
    }
  }
  optional group vid_list (LIST) {
    repeated group vid_t {
      optional binary vid (UTF8);
    }
  }
  optional group fso_list (LIST) {
    repeated group fso_t {
      optional binary fso (UTF8);
    }
  }
}


And Parquet meta output:

creator:     [parquet-mr (build ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7),
parquet-mr version 1.6.0rc7 (build
ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr]
extra:       pig.schema = cust_id: chararray,part_num: int,ip_list: {ip_t:
(ip: chararray)},vid_list: {vid_t: (vid: chararray)},fso_list: {fso_t:
(fso: chararray)}

file schema: pig_schema
--------------------------------------------------------------------------------
cust_id:     OPTIONAL BINARY O:UTF8 R:0 D:1
part_num:    OPTIONAL INT32 R:0 D:1
ip_list:     OPTIONAL F:1
.ip_t:       REPEATED F:1
..ip:        OPTIONAL BINARY O:UTF8 R:1 D:3
vid_list:    OPTIONAL F:1
.vid_t:      REPEATED F:1
..vid:       OPTIONAL BINARY O:UTF8 R:1 D:3
fso_list:    OPTIONAL F:1
.fso_t:      REPEATED F:1
..fso:       OPTIONAL BINARY O:UTF8 R:1 D:3

row group 1: RC:1201092 TS:537930256 OFFSET:4
--------------------------------------------------------------------------------
cust_id:      BINARY GZIP DO:0 FPO:4 SZ:10629422/27627221/2.60 VC:1201092
ENC:PLAIN,RLE,BIT_PACKED
part_num:     INT32 GZIP DO:0 FPO:10629426 SZ:358/252/0.70 VC:1201092
ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED
ip_list:
.ip_t:
..ip:         BINARY GZIP DO:0 FPO:10629784 SZ:41331065/180501686/4.37
VC:10540378 ENC:PLAIN,RLE
vid_list:
.vid_t:
..vid:        BINARY GZIP DO:0 FPO:51960849 SZ:58820404/254819721/4.33
VC:11011894 ENC:PLAIN,RLE
fso_list:
.fso_t:
..fso:        BINARY GZIP DO:0 FPO:110781253 SZ:21363255/74981376/3.51
VC:5612655 ENC:PLAIN,RLE

row group 2: RC:1830769 TS:1045506907 OFFSET:132144508
--------------------------------------------------------------------------------
cust_id:      BINARY GZIP DO:0 FPO:132144508 SZ:17720131/42110882/2.38
VC:1830769 ENC:PLAIN,RLE,BIT_PACKED
part_num:     INT32 GZIP DO:0 FPO:149864639 SZ:486/346/0.71 VC:1830769
ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED
ip_list:
.ip_t:
..ip:         BINARY GZIP DO:0 FPO:149865125 SZ:37687630/342050955/9.08
VC:20061916 ENC:PLAIN,PLAIN_DICTIONARY,RLE
vid_list:
.vid_t:
..vid:        BINARY GZIP DO:0 FPO:187552755 SZ:56498124/516700215/9.15
VC:22410351 ENC:PLAIN,PLAIN_DICTIONARY,RLE
fso_list:
.fso_t:
..fso:        BINARY GZIP DO:0 FPO:244050879 SZ:20110276/144644509/7.19
VC:10739272 ENC:PLAIN,PLAIN_DICTIONARY,RLE

row group 3: RC:22445 TS:4304290 OFFSET:264161155
--------------------------------------------------------------------------------
cust_id:      BINARY GZIP DO:0 FPO:264161155 SZ:221527/516312/2.33 VC:22445
ENC:PLAIN,RLE,BIT_PACKED
part_num:     INT32 GZIP DO:0 FPO:264382682 SZ:102/64/0.63 VC:22445
ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED
ip_list:
.ip_t:
..ip:         BINARY GZIP DO:0 FPO:264382784 SZ:483962/1204312/2.49
VC:123097 ENC:PLAIN_DICTIONARY,RLE
vid_list:
.vid_t:
..vid:        BINARY GZIP DO:0 FPO:264866746 SZ:622977/2122080/3.41
VC:133136 ENC:PLAIN,PLAIN_DICTIONARY,RLE
fso_list:
.fso_t:
..fso:        BINARY GZIP DO:0 FPO:265489723 SZ:240588/461522/1.92 VC:62173
ENC:PLAIN_DICTIONARY,RLE

Jianshi


On Mon, Apr 27, 2015 at 12:40 PM, Cheng Lian <lian.cs....@gmail.com> wrote:

>  Had an offline discussion with Jianshi, the dataset was generated by Pig.
>
> Jianshi - Could you please attach the output of "parquet-schema
> <path-to-parquet-file>"? I guess this is a Parquet format
> backwards-compatibility issue. Parquet hadn't standardized representation
> of LIST and MAP until recently, thus many systems made their own choice and
> are not easily inter-operatable. In earlier days, Spark SQL used LIST and
> MAP formats similar to Avro, which was unfortunately not chosen as the
> current standard format. Details can be found here:
> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md This
> document also defines backwards-compatibility rules to handle legacy
> Parquet data written by old Parquet implementations in various systems.
>
> So ideally, now Spark SQL should always write data following the standard,
> and implement all backwards-compatibility rules to read legacy data. JIRA
> issue for this is https://issues.apache.org/jira/browse/SPARK-6774
>
> I'm working on a PR https://github.com/apache/spark/pull/5422 for this.
> To fix SPARK-6774, we need to implement backwards-compatibility rules in
> both record converter and schema converter together. This PR has fixed the
> former, but I still need some time to finish the latter part and add tests.
>
> Cheng
>
> On 4/25/15 2:22 AM, Yin Huai wrote:
>
> oh, I missed that. It is fixed in 1.3.0.
>
>  Also, Jianshi, the dataset was not generated by Spark SQL, right?
>
> On Fri, Apr 24, 2015 at 11:09 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Yin:
>> Fix Version of SPARK-4520 is not set.
>> I assume it was fixed in 1.3.0
>>
>>  Cheers
>> Fix Version
>>
>> On Fri, Apr 24, 2015 at 11:00 AM, Yin Huai <yh...@databricks.com> wrote:
>>
>>> The exception looks like the one mentioned in
>>> https://issues.apache.org/jira/browse/SPARK-4520. What is the version
>>> of Spark?
>>>
>>> On Fri, Apr 24, 2015 at 2:40 AM, Jianshi Huang <jianshi.hu...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>  My data looks like this:
>>>>
>>>>  +-----------+----------------------------+----------+
>>>> | col_name  |         data_type          | comment  |
>>>> +-----------+----------------------------+----------+
>>>> | cust_id   | string                     |          |
>>>> | part_num  | int                        |          |
>>>> | ip_list   | array<struct<ip:string>>   |          |
>>>> | vid_list  | array<struct<vid:string>>  |          |
>>>> | fso_list  | array<struct<fso:string>>  |          |
>>>> | src       | string                     |          |
>>>> | date      | int                        |          |
>>>> +-----------+----------------------------+----------+
>>>> And I did select *, it reports ParquetDecodingException.
>>>> Is this type not supported in SparkSQL?
>>>> Detailed error message here:
>>>>
>>>> Error: org.apache.spark.SparkException: Job aborted due to stage failure: 
>>>> Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in 
>>>> stage 27.0 (TID 510, lvshdc5dn0542.lvs.paypal.com): 
>>>> parquet.io.ParquetDecodingException:
>>>> Can not read value at 0 in block -1 in file 
>>>> hdfs://xxx/part-m-00000.gz.parquet
>>>>         at 
>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
>>>>         at 
>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>>>         at 
>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
>>>>         at 
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>         at 
>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>         at 
>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>         at 
>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>         at 
>>>> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>>>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>         at 
>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>         at 
>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>         at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
>>>>         at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
>>>>         at 
>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
>>>>         at 
>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
>>>>         at 
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>         at 
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>         at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>         at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>         at java.lang.Thread.run(Thread.java:724)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>         at java.util.ArrayList.elementData(ArrayList.java:400)
>>>>         at java.util.ArrayList.get(ArrayList.java:413)
>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>         at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
>>>>         at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
>>>>         at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
>>>>         at 
>>>> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:290)
>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>>>>         at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>>>>         at 
>>>> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>>>>         at 
>>>> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>>>>         at 
>>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>>>>         at 
>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>>>
>>>>
>>>>  --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to