[
https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648738#comment-17648738
]
Dmitry Yaraev commented on FLINK-30314:
---------------------------------------
[~martijnvisser] Yes, that's exactly what happened. As I wrote in the ticket
description, the problem is that Flink stops reading the file once the reading
position exceeds the compressed file size, which is wrong. It should either
rely on the uncompressed file size or read to the end of the stream. I think
the latter would be better because we wouldn't have to calculate the
uncompressed file size.
As far as I understand this is not an easy fix as there are multiple
dependencies, but I think in case of an unsupported format at least an
exception should be thrown.
> Unable to read all records from compressed line-delimited JSON files using
> Table API
> ------------------------------------------------------------------------------------
>
> Key: FLINK-30314
> URL: https://issues.apache.org/jira/browse/FLINK-30314
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem, Table SQL / API
> Affects Versions: 1.16.0, 1.15.2
> Reporter: Dmitry Yaraev
> Priority: Major
> Attachments: input.json, input.json.gz, input.json.zip
>
>
> I am reading gzipped JSON line-delimited files in the batch mode using
> [FileSystem
> Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/].
> For reading the files a new table is created with the following
> configuration:
> {code:sql}
> CREATE TEMPORARY TABLE `my_database`.`my_table` (
> `my_field1` BIGINT,
> `my_field2` INT,
> `my_field3` VARCHAR(2147483647)
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'path-to-input-dir',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'false',
> 'json.fail-on-missing-field' = 'true'
> ) {code}
> In the input directory I have two files: input-00000.json.gz and
> input-00001.json.gz. As it comes from the filenames, the files are compressed
> with GZIP. Each of the files contains 10 records. The issue is that only 2
> records from each file are read (4 in total). If decompressed versions of the
> same data files are used, all 20 records are read.
> As far as I understand, that problem may be related to the fact that split
> length, which is used when the files are read, is in fact the length of a
> compressed file. So files are closed before all records are read from them
> because read position of the decompressed file stream exceeds split length.
> Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we
> could identify if the file compressed or not. The flag can be set to true in
> {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file
> streams. With such a flag it could be possible to differentiate
> non-splittable compressed files and only rely on the end of the stream.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)