[
https://issues.apache.org/jira/browse/DRILL-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312383#comment-16312383
]
ASF GitHub Bot commented on DRILL-5970:
---------------------------------------
Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/1047#discussion_r159799621
--- Diff: exec/vector/src/main/codegen/templates/BaseWriter.java ---
@@ -106,37 +114,37 @@
MapOrListWriter list(String name);
boolean isMapWriter();
boolean isListWriter();
- UInt1Writer uInt1(String name);
- UInt2Writer uInt2(String name);
- UInt4Writer uInt4(String name);
- UInt8Writer uInt8(String name);
- VarCharWriter varChar(String name);
- Var16CharWriter var16Char(String name);
- TinyIntWriter tinyInt(String name);
- SmallIntWriter smallInt(String name);
- IntWriter integer(String name);
- BigIntWriter bigInt(String name);
- Float4Writer float4(String name);
- Float8Writer float8(String name);
- BitWriter bit(String name);
- VarBinaryWriter varBinary(String name);
+ UInt1Writer uInt1(String name, TypeProtos.DataMode dataMode);
--- End diff --
Really not sure we want to do this. These writers are also used in JSON,
and are used for every field in every object. Now, every request to get a
writer will have to pass the mode. This seems like we are making the problem
far, far more complex than necessary.
The current code has rules for the type to choose. In general, the type is
OPTIONAL for single (scalar) values and REPEATED for repeated (array) values.
The mode passed here *cannot* be REPEATED: just won't work. So, can we pass
REQUIRED?
Let's think about how these are used in JSON. In JSON, we discover fields
as we read each object. A field need not appear in the first object, it might
appear 20 objects in. Then, after the 25th object, it may not ever appear again.
So, in JSON, we can *never* use REQUIRED; we *must* use OPTIONAL. Key
reason: JSON provides no schema and Drill cannot predict what the schema will
turn out to be.
Now, let's move to Parquet. Parquet does have a schema. In fact, with
Parquet, we know the schema before we read the first row. And, in Parquet, a
scalar column can be REQUIRED or OPTIONAL.
All of this suggests a better solution. (Indeed, the solution implemented
in the new column writer layer.) Allow Parquet to declare an "early" schema.
That is, prior to the first row, call methods that declare each column with its
cardinality.
Then, when reading the field, always pass the name as today. If the field
is new, it *must* be OPTIONAL. Otherwise, it will use whatever was used before.
Let's say this another way. Below these methods is a call to an
`addOrGet()` method. In Parquet, call those methods before the first row to do
the "add" part. Then, later, the method will do only the "get" part.
The result is that you won't have to modify so many files, won't have to
complicate the APIs and won't have to worry about a client passing in REPEATED
mode for a scalar column.
> DrillParquetReader always builds the schema with "OPTIONAL" dataMode columns
> instead of "REQUIRED" ones
> -------------------------------------------------------------------------------------------------------
>
> Key: DRILL-5970
> URL: https://issues.apache.org/jira/browse/DRILL-5970
> Project: Apache Drill
> Issue Type: Bug
> Components: Execution - Codegen, Execution - Data Types, Storage -
> Parquet
> Affects Versions: 1.11.0
> Reporter: Vitalii Diravka
> Assignee: Vitalii Diravka
>
> The root cause of the issue is that adding REQUIRED (not-nullable) data types
> to the container in the all MapWriters is not implemented.
> It can lead to get invalid schema.
> {code}
> 0: jdbc:drill:zk=local> CREATE TABLE dfs.tmp.bof_repro_1 as select * from
> (select CONVERT_FROM('["hello","hai"]','JSON') AS MYCOL, 'Bucket1' AS Bucket
> FROM (VALUES(1)));
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> +-----------+----------------------------+
> | Fragment | Number of records written |
> +-----------+----------------------------+
> | 0_0 | 1 |
> +-----------+----------------------------+
> 1 row selected (2.376 seconds)
> {code}
> Run from Drill unit test framework (to see "data mode"):
> {code}
> @Test
> public void test() throws Exception {
> setColumnWidths(new int[] {25, 25});
> List<QueryDataBatch> queryDataBatches = testSqlWithResults("select * from
> dfs.tmp.bof_repro_1");
> printResult(queryDataBatches);
> }
> 1 row(s):
> -------------------------------------------------------
> | MYCOL<VARCHAR(REPEATED)> | Bucket<VARCHAR(OPTIONAL)>|
> -------------------------------------------------------
> | ["hello","hai"] | Bucket1 |
> -------------------------------------------------------
> Total record count: 1
> {code}
> {code}
> vitalii@vitalii-pc:~/parquet-tools/parquet-mr/parquet-tools/target$ java -jar
> parquet-tools-1.6.0rc3-SNAPSHOT.jar schema /tmp/bof_repro_1/0_0_0.parquet
> message root {
> repeated binary MYCOL (UTF8);
> required binary Bucket (UTF8);
> }
> {code}
> To simulate of obtaining the wrong result you can try the query with
> aggregation by using a new parquet reader (used by default for complex data
> types) and old parquet reader. False "Hash aggregate does not support schema
> changes" error will happen.
> 1) Create two parquet files.
> {code}
> 0: jdbc:drill:schema=dfs> CREATE TABLE dfs.tmp.bof_repro_1 as select * from
> (select CONVERT_FROM('["hello","hai"]','JSON') AS MYCOL, 'Bucket1' AS Bucket
> FROM (VALUES(1)));
> +-----------+----------------------------+
> | Fragment | Number of records written |
> +-----------+----------------------------+
> | 0_0 | 1 |
> +-----------+----------------------------+
> 1 row selected (1.122 seconds)
> 0: jdbc:drill:schema=dfs> CREATE TABLE dfs.tmp.bof_repro_2 as select * from
> (select CONVERT_FROM('[]','JSON') AS MYCOL, 'Bucket1' AS Bucket FROM
> (VALUES(1)));
> +-----------+----------------------------+
> | Fragment | Number of records written |
> +-----------+----------------------------+
> | 0_0 | 1 |
> +-----------+----------------------------+
> 1 row selected (0.552 seconds)
> 0: jdbc:drill:schema=dfs> select * from dfs.tmp.bof_repro_2;
> {code}
> 2) Copy the parquet files from bof_repro_1 to bof_repro_2.
> {code}
> [root@naravm1 ~]# hadoop fs -ls /tmp/bof_repro_1
> Found 1 items
> -rw-r--r-- 3 mapr mapr 415 2017-07-25 11:46
> /tmp/bof_repro_1/0_0_0.parquet
> [root@naravm1 ~]# hadoop fs -ls /tmp/bof_repro_2
> Found 1 items
> -rw-r--r-- 3 mapr mapr 368 2017-07-25 11:46
> /tmp/bof_repro_2/0_0_0.parquet
> [root@naravm1 ~]# hadoop fs -cp /tmp/bof_repro_1/0_0_0.parquet
> /tmp/bof_repro_2/0_0_1.parquet
> [root@naravm1 ~]#
> {code}
> 3) Query the table.
> {code}
> 0: jdbc:drill:schema=dfs> ALTER SESSION SET `planner.enable_streamagg`=false;
> +-------+------------------------------------+
> | ok | summary |
> +-------+------------------------------------+
> | true | planner.enable_streamagg updated. |
> +-------+------------------------------------+
> 1 row selected (0.124 seconds)
> 0: jdbc:drill:schema=dfs> select * from dfs.tmp.bof_repro_2;
> +------------------+----------+
> | MYCOL | Bucket |
> +------------------+----------+
> | ["hello","hai"] | Bucket1 |
> | null | Bucket1 |
> +------------------+----------+
> 2 rows selected (0.247 seconds)
> 0: jdbc:drill:schema=dfs> select bucket, count(*) from dfs.tmp.bof_repro_2
> group by bucket;
> Error: UNSUPPORTED_OPERATION ERROR: Hash aggregate does not support schema
> changes
> Fragment 0:0
> [Error Id: 60f8ada3-5f00-4413-a676-4881fc8cb409 on naravm3:31010]
> (state=,code=0)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)