Getting exception when writing to parquet file with generic types disabled

2023-05-18 Thread Aniket Sule
Hi,
I am trying to write data to parquet files using SQL insert statements. Generic 
types are disabled in the execution environment.
There are other queries running in the same job that are counting/aggregating 
data. Generic types are disabled as a performance optimization for those 
queries.

In this scenario, whenever I try to insert data into parquet files, I get an 
exception -
Caused by: java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type java.util.List is treated as a generic 
type.

I get the above exception even when I test with a simple table that has no 
array or list data types.

Is there any way to write parquet files with generic types disabled?

Thanks and regards,
Aniket Sule.


Here is a way to reproduce what I am seeing.
My actual source is Kafka with data that is in json format.
Datagen is simply to quickly reproduce the scenario.
The environment is Flink 1.17.0.
I am using the SQL cli.

set 'sql-client.verbose'='true';
set 'table.exec.source.idle-timeout'='1000';
set 'table.optimizer.join-reorder-enabled'='true';
set 'table.exec.mini-batch.enabled'='true';
set 'table.exec.mini-batch.allow-latency'='5 s';
set 'table.exec.mini-batch.size'='5000';
set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
set 'table.optimizer.distinct-agg.split.enabled'='true';
set 'table.exec.state.ttl'='360 s';
set 'pipeline.object-reuse'='true';
set 'pipeline.generic-types'='false';
set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true';

CREATE TABLE source_t (
  order_number BIGINT,
  order_name string,
  risk float,
  order_time   TIMESTAMP(3)
  ) WITH (
'connector' = 'datagen'
  );

CREATE TABLE file_t (
  order_number BIGINT,
  order_name string,
  risk float,
  `year` string,`month` string,`day` string,`hour` string
  ) WITH (
'connector'='filesystem',
'path' = '/tmp/data',
'format'='parquet'
  );

insert into file_t
select order_number,order_name,risk ,
date_format(order_time,'') as `year`, date_format(order_time,'MM') as 
`month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH') as 
`hour`
from source_t;

Resulting exception:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., ]
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caution: External email. Do not click or open attachments unless you know and 
trust the sender.


Re: Getting exception when writing to parquet file with generic types disabled

2023-05-18 Thread Shammon FY
Hi Aniket,

Currently the filesystem connector does not support option
'pipeline.generic-types'='false', because the connector will output
`PartitionCommitInfo` messages for the downstream partition committer
operator even when there are no partitions in the sink table. There is a
`List partitions` field in `PartitionCommitInfo` which will cause
the exception you mentioned in the thread. I have created an issue [1] for
this.

[1] https://issues.apache.org/jira/browse/FLINK-32129

Best,
Shammon FY


On Thu, May 18, 2023 at 9:20 PM Aniket Sule 
wrote:

> Hi,
>
> I am trying to write data to parquet files using SQL insert statements.
> Generic types are disabled in the execution environment.
>
> There are other queries running in the same job that are
> counting/aggregating data. Generic types are disabled as a performance
> optimization for those queries.
>
>
>
> In this scenario, whenever I try to insert data into parquet files, I get
> an exception -
>
> Caused by: java.lang.UnsupportedOperationException: Generic types have
> been disabled in the ExecutionConfig and type java.util.List is treated as
> a generic type.
>
>
>
> I get the above exception even when I test with a simple table that has no
> array or list data types.
>
>
>
> Is there any way to write parquet files with generic types disabled?
>
>
>
> Thanks and regards,
>
> Aniket Sule.
>
>
>
>
>
> Here is a way to reproduce what I am seeing.
>
> My actual source is Kafka with data that is in json format.
>
> Datagen is simply to quickly reproduce the scenario.
>
> The environment is Flink 1.17.0.
>
> I am using the SQL cli.
>
>
>
> set 'sql-client.verbose'='true';
>
> set 'table.exec.source.idle-timeout'='1000';
>
> set 'table.optimizer.join-reorder-enabled'='true';
>
> set 'table.exec.mini-batch.enabled'='true';
>
> set 'table.exec.mini-batch.allow-latency'='5 s';
>
> set 'table.exec.mini-batch.size'='5000';
>
> set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
>
> set 'table.optimizer.distinct-agg.split.enabled'='true';
>
> set 'table.exec.state.ttl'='360 s';
>
> set 'pipeline.object-reuse'='true';
>
> set 'pipeline.generic-types'='false';
>
> set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true';
>
>
>
> CREATE TABLE source_t (
>
>   order_number BIGINT,
>
>   order_name string,
>
>   risk float,
>
>   order_time   TIMESTAMP(3)
>
>   ) WITH (
>
> 'connector' = 'datagen'
>
>   );
>
>
>
> CREATE TABLE file_t (
>
>   order_number BIGINT,
>
>   order_name string,
>
>   risk float,
>
>   `year` string,`month` string,`day` string,`hour` string
>
>   ) WITH (
>
> 'connector'='filesystem',
>
> 'path' = '/tmp/data',
>
> 'format'='parquet'
>
>   );
>
>
>
> insert into file_t
>
> select order_number,order_name,risk ,
>
> date_format(order_time,'') as `year`, date_format(order_time,'MM') as
> `month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH')
> as `hour`
>
> from source_t;
>
>
>
> Resulting exception:
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException:
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
> fetchResults.
>
> at
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85)
>
> at
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>
> at
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>
> at
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>
> at
> java.base/java.util.Optional.ifPresent(Optional.java:183)
>
> at
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
> at
> 

RE: Getting exception when writing to parquet file with generic types disabled

2023-05-19 Thread Aniket Sule
Thank you for the explanation and creating the JIRA issue. Appreciate your help.

Regards
Aniket Sule

From: Shammon FY 
Sent: Thursday, May 18, 2023 10:28 PM
To: Aniket Sule 
Cc: user@flink.apache.org
Subject: Re: Getting exception when writing to parquet file with generic types 
disabled

You don't often get email from zjur...@gmail.com<mailto:zjur...@gmail.com>. 
Learn why this is important<https://aka.ms/LearnAboutSenderIdentification>
CAUTION:External email. Do not click or open attachments unless you know and 
trust the sender.

Hi Aniket,

Currently the filesystem connector does not support option 
'pipeline.generic-types'='false', because the connector will output 
`PartitionCommitInfo` messages for the downstream partition committer operator 
even when there are no partitions in the sink table. There is a `List 
partitions` field in `PartitionCommitInfo` which will cause the exception you 
mentioned in the thread. I have created an issue [1] for this.

[1] https://issues.apache.org/jira/browse/FLINK-32129

Best,
Shammon FY


On Thu, May 18, 2023 at 9:20 PM Aniket Sule 
mailto:aniket.s...@netwitness.com>> wrote:
Hi,
I am trying to write data to parquet files using SQL insert statements. Generic 
types are disabled in the execution environment.
There are other queries running in the same job that are counting/aggregating 
data. Generic types are disabled as a performance optimization for those 
queries.

In this scenario, whenever I try to insert data into parquet files, I get an 
exception -
Caused by: java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type java.util.List is treated as a generic 
type.

I get the above exception even when I test with a simple table that has no 
array or list data types.

Is there any way to write parquet files with generic types disabled?

Thanks and regards,
Aniket Sule.


Here is a way to reproduce what I am seeing.
My actual source is Kafka with data that is in json format.
Datagen is simply to quickly reproduce the scenario.
The environment is Flink 1.17.0.
I am using the SQL cli.

set 'sql-client.verbose'='true';
set 'table.exec.source.idle-timeout'='1000';
set 'table.optimizer.join-reorder-enabled'='true';
set 'table.exec.mini-batch.enabled'='true';
set 'table.exec.mini-batch.allow-latency'='5 s';
set 'table.exec.mini-batch.size'='5000';
set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
set 'table.optimizer.distinct-agg.split.enabled'='true';
set 'table.exec.state.ttl'='360 s';
set 'pipeline.object-reuse'='true';
set 'pipeline.generic-types'='false';
set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true';

CREATE TABLE source_t (
  order_number BIGINT,
  order_name string,
  risk float,
  order_time   TIMESTAMP(3)
  ) WITH (
'connector' = 'datagen'
  );

CREATE TABLE file_t (
  order_number BIGINT,
  order_name string,
  risk float,
  `year` string,`month` string,`day` string,`hour` string
  ) WITH (
'connector'='filesystem',
'path' = '/tmp/data',
'format'='parquet'
  );

insert into file_t
select order_number,order_name,risk ,
date_format(order_time,'') as `year`, date_format(order_time,'MM') as 
`month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH') as 
`hour`
from source_t;

Resulting exception:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., ]
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caution: External email. Do not click or open attachments unless you know and 
trust the sender.
Caution: External email. Do not click or open attachments unless you know and 
trust the sender.