[
https://issues.apache.org/jira/browse/PARQUET-964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Constantin Muraru updated PARQUET-964:
--------------------------------------
Description:
Hi folks!
We're working on adding support for ProtoParquet to work with Hive / AWS Athena
(Presto) \[1\]. The problem we've encountered appears whenever we declare a
repeated field (array) or a map in the protobuf schema and we then try to
convert it to parquet. The conversion works fine, but when we try to query the
data with Hive/Presto, we get some freaky errors.
We've noticed though that AvroToParquet works great, even when we declare such
fields (arrays, maps)!
Comparing the parquet schema generated by protobuf vs avro, we've noticed a few
differences.
Take the simple schema below (protobuf):
{code}
message ListOfList {
string top_field = 1;
repeated MyInnerMessage first_array = 2;
}
message MyInnerMessage {
int32 inner_field = 1;
repeated int32 second_array = 2;
}
{code}
After using ProtoParquetWriter, the resulting parquet schema is the following:
{code}
message TestProtobuf.ListOfList {
optional binary top_field (UTF8);
repeated group first_array {
optional int32 inner_field;
repeated int32 second_array;
}
}
{code}
When we try to query this data, we get parsing errors from Hive/Athena. The
parsing errors are related to the array/map fields.
However, if we create a similar avro schema, the parquet result of the
AvroParquetWriter is the following:
{code}
message TestProtobuf.ListOfList {
required binary top_field (UTF8);
required group first_array (LIST) {
repeated group array {
required int32 inner_field;
required group second_array (LIST) {
repeated int32 array;
}
}
}
}
{code}
This works beautifully with Hive/Athena. Too bad our systems are stuck with
protobuf :-) .
You can see the additional wrappers which are missing from protobuf: {{required
group first_array (LIST)}}.
Our goal is to make the ProtoParquetWriter generate a parquet schema similar to
what Avro is doing. We basically want to add these wrappers around lists/maps.
Everything seemed to work great, until we've bumped into an issue. We tuned
ProtoParquetWriter to generate the same parquet schema as AvroParquetWriter.
However, one difference between protobuf and avro is that in protobuf we can
have a bunch of Optional fields.
{code}
message TestProtobuf.ListOfList {
optional binary top_field (UTF8);
required group first_array (LIST) {
repeated group array {
optional int32 inner_field;
required group second_array (LIST) {
repeated int32 array;
}
}
}
}
{code}
Notice the: *optional* int32 inner_field (for avro that was *required*).
When testing with some real proto-parquet data, we get an error every time
inner_field is not populated, but the second_array is.
{noformat}
parquet-tools cat /tmp/test23.parquet
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
block -1 in file file:/tmp/test23.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:223)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:122)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:126)
at
org.apache.parquet.tools.command.CatCommand.execute(CatCommand.java:79)
at org.apache.parquet.proto.tools.Main.main(Main.java:214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.parquet.io.ParquetDecodingException: totalValueCount '0'
<= 0
at
org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:349)
at
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:82)
at
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:77)
at
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:272)
at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:145)
at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:107)
at
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:155)
at
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:107)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:194)
... 9 more
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
block -1 in file file:/tmp/test23.parquet
Process finished with exit code 1
{noformat}
Basically this errors occurs whenever the {{first_array.inner_field}} is not
populated, but {{first_array.second_array}} is.
I'm attaching the code used to generate the parquet files (though keep in mind
that we're working on a fork atm).
Going through the code, I've noticed that the errors stop and everything seems
to work fine, once I change this condition in ColumnReaderImpl:
From:
{code}
if (totalValueCount <= 0) {
throw new ParquetDecodingException("totalValueCount '" + totalValueCount
+ "' <= 0");
}
{code}
To:
{code}
if (totalValueCount < 0) {
throw new ParquetDecodingException("totalValueCount '" + totalValueCount
+ "' < 0");
}
{code}
https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java#L355
--->
{noformat}
[main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader -
RecordReader initialized will read a total of 10 records.
[main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - at row 0.
reading next block
[main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - block read
in memory in 27 ms. row count = 10
top_field = top_field
first_array:
.array:
..second_array:
...array = 20
top_field = top_field
first_array:
.array:
..second_array:
...array = 20
{noformat}
I am wondering what are your thoughts on this? Should we change this condition
to {{if (totalValueCount < 0)}}?
Any feedback is gladly appreciated! Let me know if I missed some information.
Thanks,
Costi
\[1\] https://aws.amazon.com/athena/
was:
Hi folks!
We're working on adding support for ProtoParquet to work with Hive / AWS Athena
(Presto) \[1\]. The problem we've encountered appears whenever we declare a
repeated field (array) or a map in the protobuf schema and we then try to
convert it to parquet. The conversion works fine, but when we try to query the
data with Hive/Presto, we get some freaky errors.
We've noticed though that AvroToParquet works great, even when we declare such
fields (arrays, maps)!
Comparing the parquet schema generated by avro vs protobuf, we've noticed a few
differences.
Take the simple schema below (protobuf):
{code}
message ListOfList {
string top_field = 1;
repeated MyInnerMessage first_array = 2;
}
message MyInnerMessage {
int32 inner_field = 1;
repeated int32 second_array = 2;
}
{code}
After using ProtoParquetWriter, the resulting parquet schema is the following:
{code}
message TestProtobuf.ListOfList {
optional binary top_field (UTF8);
repeated group first_array {
optional int32 inner_field;
repeated int32 second_array;
}
}
{code}
When we try to query this data, we get parsing errors from Hive/Athena. The
parsing errors are related to the array/map fields.
However, if we create a similar avro schema, the parquet result of the
AvroParquetWriter is the following:
{code}
message TestProtobuf.ListOfList {
required binary top_field (UTF8);
required group first_array (LIST) {
repeated group array {
required int32 inner_field;
required group second_array (LIST) {
repeated int32 array;
}
}
}
}
{code}
This works beautifully with Hive/Athena. Too bad our systems are stuck with
protobuf :-) .
You can see the additional wrappers which are missing from protobuf: {{required
group first_array (LIST)}}.
Our goal is to make the ProtoParquetWriter generate a parquet schema similar to
what Avro is doing. We basically want to add these wrappers around lists/maps.
Everything seemed to work great, until we've bumped into an issue. We tuned
ProtoParquetWriter to generate the same parquet schema as AvroParquetWriter.
However, one difference between protobuf and avro is that in protobuf we can
have a bunch of Optional fields.
{code}
message TestProtobuf.ListOfList {
optional binary top_field (UTF8);
required group first_array (LIST) {
repeated group array {
optional int32 inner_field;
required group second_array (LIST) {
repeated int32 array;
}
}
}
}
{code}
Notice the: *optional* int32 inner_field (for avro that was *required*).
When testing with some real proto-parquet data, we get an error every time we
combine an optional (eg. int) field with an array and the first field is not
populated.
{noformat}
parquet-tools cat /tmp/test23.parquet
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
block -1 in file file:/tmp/test23.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:223)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:122)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:126)
at
org.apache.parquet.tools.command.CatCommand.execute(CatCommand.java:79)
at org.apache.parquet.proto.tools.Main.main(Main.java:214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.parquet.io.ParquetDecodingException: totalValueCount '0'
<= 0
at
org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:349)
at
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:82)
at
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:77)
at
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:272)
at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:145)
at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:107)
at
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:155)
at
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:107)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:194)
... 9 more
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
block -1 in file file:/tmp/test23.parquet
Process finished with exit code 1
{noformat}
Basically this errors occurs whenever the {{first_array.inner_field}} is not
populated, but {{first_array.second_array}} is.
I'm attaching the code used to generate the parquet files (though keep in mind
that we're working on a fork atm).
Going through the code, I've noticed that the errors stop and everything seems
to work fine, once I change this condition in ColumnReaderImpl:
From:
{code}
if (totalValueCount <= 0) {
throw new ParquetDecodingException("totalValueCount '" + totalValueCount
+ "' <= 0");
}
{code}
To:
{code}
if (totalValueCount < 0) {
throw new ParquetDecodingException("totalValueCount '" + totalValueCount
+ "' < 0");
}
{code}
https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java#L355
--->
{noformat}
[main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader -
RecordReader initialized will read a total of 10 records.
[main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - at row 0.
reading next block
[main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - block read
in memory in 27 ms. row count = 10
top_field = top_field
first_array:
.array:
..second_array:
...array = 20
top_field = top_field
first_array:
.array:
..second_array:
...array = 20
{noformat}
I am wondering what are your thoughts on this? Should we change this condition
to {{if (totalValueCount < 0)}}?
Any feedback is gladly appreciated! Let me know if I missed some information.
Thanks,
Costi
\[1\] https://aws.amazon.com/athena/
> Using ProtoParquet with Hive / AWS Athena: ParquetDecodingException:
> totalValueCount '0' <= 0
> ---------------------------------------------------------------------------------------------
>
> Key: PARQUET-964
> URL: https://issues.apache.org/jira/browse/PARQUET-964
> Project: Parquet
> Issue Type: Bug
> Reporter: Constantin Muraru
> Attachments: ListOfList.proto, ListOfListProtoParquetConverter.java
>
>
> Hi folks!
> We're working on adding support for ProtoParquet to work with Hive / AWS
> Athena (Presto) \[1\]. The problem we've encountered appears whenever we
> declare a repeated field (array) or a map in the protobuf schema and we then
> try to convert it to parquet. The conversion works fine, but when we try to
> query the data with Hive/Presto, we get some freaky errors.
> We've noticed though that AvroToParquet works great, even when we declare
> such fields (arrays, maps)!
> Comparing the parquet schema generated by protobuf vs avro, we've noticed a
> few differences.
> Take the simple schema below (protobuf):
> {code}
> message ListOfList {
> string top_field = 1;
> repeated MyInnerMessage first_array = 2;
> }
> message MyInnerMessage {
> int32 inner_field = 1;
> repeated int32 second_array = 2;
> }
> {code}
> After using ProtoParquetWriter, the resulting parquet schema is the following:
> {code}
> message TestProtobuf.ListOfList {
> optional binary top_field (UTF8);
> repeated group first_array {
> optional int32 inner_field;
> repeated int32 second_array;
> }
> }
> {code}
> When we try to query this data, we get parsing errors from Hive/Athena. The
> parsing errors are related to the array/map fields.
> However, if we create a similar avro schema, the parquet result of the
> AvroParquetWriter is the following:
> {code}
> message TestProtobuf.ListOfList {
> required binary top_field (UTF8);
> required group first_array (LIST) {
> repeated group array {
> required int32 inner_field;
> required group second_array (LIST) {
> repeated int32 array;
> }
> }
> }
> }
> {code}
> This works beautifully with Hive/Athena. Too bad our systems are stuck with
> protobuf :-) .
> You can see the additional wrappers which are missing from protobuf:
> {{required group first_array (LIST)}}.
> Our goal is to make the ProtoParquetWriter generate a parquet schema similar
> to what Avro is doing. We basically want to add these wrappers around
> lists/maps.
> Everything seemed to work great, until we've bumped into an issue. We tuned
> ProtoParquetWriter to generate the same parquet schema as AvroParquetWriter.
> However, one difference between protobuf and avro is that in protobuf we can
> have a bunch of Optional fields.
> {code}
> message TestProtobuf.ListOfList {
> optional binary top_field (UTF8);
> required group first_array (LIST) {
> repeated group array {
> optional int32 inner_field;
> required group second_array (LIST) {
> repeated int32 array;
> }
> }
> }
> }
> {code}
> Notice the: *optional* int32 inner_field (for avro that was *required*).
> When testing with some real proto-parquet data, we get an error every time
> inner_field is not populated, but the second_array is.
> {noformat}
> parquet-tools cat /tmp/test23.parquet
> org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
> block -1 in file file:/tmp/test23.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:223)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:122)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:126)
> at
> org.apache.parquet.tools.command.CatCommand.execute(CatCommand.java:79)
> at org.apache.parquet.proto.tools.Main.main(Main.java:214)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
> Caused by: org.apache.parquet.io.ParquetDecodingException: totalValueCount
> '0' <= 0
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:349)
> at
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:82)
> at
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:77)
> at
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:272)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:145)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:107)
> at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:155)
> at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:107)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:194)
> ... 9 more
> org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
> block -1 in file file:/tmp/test23.parquet
> Process finished with exit code 1
> {noformat}
> Basically this errors occurs whenever the {{first_array.inner_field}} is not
> populated, but {{first_array.second_array}} is.
> I'm attaching the code used to generate the parquet files (though keep in
> mind that we're working on a fork atm).
> Going through the code, I've noticed that the errors stop and everything
> seems to work fine, once I change this condition in ColumnReaderImpl:
> From:
> {code}
> if (totalValueCount <= 0) {
> throw new ParquetDecodingException("totalValueCount '" +
> totalValueCount + "' <= 0");
> }
> {code}
> To:
> {code}
> if (totalValueCount < 0) {
> throw new ParquetDecodingException("totalValueCount '" +
> totalValueCount + "' < 0");
> }
> {code}
> https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java#L355
> --->
> {noformat}
> [main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader -
> RecordReader initialized will read a total of 10 records.
> [main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - at row 0.
> reading next block
> [main] INFO org.apache.parquet.hadoop.InternalParquetRecordReader - block
> read in memory in 27 ms. row count = 10
> top_field = top_field
> first_array:
> .array:
> ..second_array:
> ...array = 20
> top_field = top_field
> first_array:
> .array:
> ..second_array:
> ...array = 20
> {noformat}
> I am wondering what are your thoughts on this? Should we change this
> condition to {{if (totalValueCount < 0)}}?
> Any feedback is gladly appreciated! Let me know if I missed some information.
> Thanks,
> Costi
> \[1\] https://aws.amazon.com/athena/
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)