Hi Aniket,

Currently the filesystem connector does not support option
'pipeline.generic-types'='false', because the connector will output
`PartitionCommitInfo` messages for the downstream partition committer
operator even when there are no partitions in the sink table. There is a
`List<String> partitions` field in `PartitionCommitInfo` which will cause
the exception you mentioned in the thread. I have created an issue [1] for
this.

[1] https://issues.apache.org/jira/browse/FLINK-32129

Best,
Shammon FY


On Thu, May 18, 2023 at 9:20 PM Aniket Sule <aniket.s...@netwitness.com>
wrote:

> Hi,
>
> I am trying to write data to parquet files using SQL insert statements.
> Generic types are disabled in the execution environment.
>
> There are other queries running in the same job that are
> counting/aggregating data. Generic types are disabled as a performance
> optimization for those queries.
>
>
>
> In this scenario, whenever I try to insert data into parquet files, I get
> an exception -
>
> Caused by: java.lang.UnsupportedOperationException: Generic types have
> been disabled in the ExecutionConfig and type java.util.List is treated as
> a generic type.
>
>
>
> I get the above exception even when I test with a simple table that has no
> array or list data types.
>
>
>
> Is there any way to write parquet files with generic types disabled?
>
>
>
> Thanks and regards,
>
> Aniket Sule.
>
>
>
>
>
> Here is a way to reproduce what I am seeing.
>
> My actual source is Kafka with data that is in json format.
>
> Datagen is simply to quickly reproduce the scenario.
>
> The environment is Flink 1.17.0.
>
> I am using the SQL cli.
>
>
>
> set 'sql-client.verbose'='true';
>
> set 'table.exec.source.idle-timeout'='1000';
>
> set 'table.optimizer.join-reorder-enabled'='true';
>
> set 'table.exec.mini-batch.enabled'='true';
>
> set 'table.exec.mini-batch.allow-latency'='5 s';
>
> set 'table.exec.mini-batch.size'='5000';
>
> set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
>
> set 'table.optimizer.distinct-agg.split.enabled'='true';
>
> set 'table.exec.state.ttl'='360 s';
>
> set 'pipeline.object-reuse'='true';
>
> set 'pipeline.generic-types'='false';
>
> set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true';
>
>
>
> CREATE TABLE source_t (
>
>           order_number BIGINT,
>
>           order_name string,
>
>           risk float,
>
>           order_time   TIMESTAMP(3)
>
>       ) WITH (
>
>         'connector' = 'datagen'
>
>       );
>
>
>
> CREATE TABLE file_t (
>
>           order_number BIGINT,
>
>           order_name string,
>
>           risk float,
>
>           `year` string,`month` string,`day` string,`hour` string
>
>       ) WITH (
>
>         'connector'='filesystem',
>
> 'path' = '/tmp/data',
>
> 'format'='parquet'
>
>       );
>
>
>
> insert into file_t
>
> select order_number,order_name,risk ,
>
> date_format(order_time,'yyyy') as `year`, date_format(order_time,'MM') as
> `month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH')
> as `hour`
>
> from source_t;
>
>
>
> Resulting exception:
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., <Exception on server side:
>
> org.apache.flink.table.gateway.api.utils.SqlGatewayException:
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
> fetchResults.
>
>                 at
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85)
>
>                 at
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>
>                 at
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>
>                 at
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>
>                 at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>
>                 at
> java.base/java.util.Optional.ifPresent(Optional.java:183)
>
>                 at
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>
>                 at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>
>                 at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>
>                 at
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>
>                 at
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>
>                 at
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>
>                 at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
>
>                 at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
>
>                 at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>
>                 at java.base/java.lang.Thread.run(Thread.java:829)
>
> Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException:
> Failed to fetchResults.
>
>                 at
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229)
>
>                 at
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83)
>
>                 ... 48 more
>
> Caused by:
> org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed
> to execute the operation 91796fc6-f257-4093-ab0c-1b4addf11e5b.
>
>                 at
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414)
>
>                 at
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267)
>
>                 at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>
>                 at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
>                 at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>
>                 at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
>                 at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
>                 at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
>                 ... 1 more
>
> Caused by: java.lang.UnsupportedOperationException: Generic types have
> been disabled in the ExecutionConfig and type java.util.List is treated as
> a generic type.
>
>                 at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>
>                 at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
>
>                 at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
>
>                 at
> org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
>
>                 at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
>
>                 at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
>
>                 at
> org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:64)
>
>                 at
> org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:65)
>
>                 at
> org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:37)
>
>                 at
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
>
>                 at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849)
>
>                 at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
>
>                 at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870)
>
>                 at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828)
>
>                 at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
>
>                 at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319)
>
>                 at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2276)
>
>                 at
> org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83)
>
>                 at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
>
>                 at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883)
>
>                 at
> org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:515)
>
>                 at
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
>
>                 at
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:200)
>
>                 at
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
>
>                 at
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
>
>                 at
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
>
>                 ... 7 more
>
>
>
> End of exception on server side>]
>
>                 at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
>
>                 at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
>
>                 at
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
>
>                 at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>
>                 at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
>                 at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
>                 at java.base/java.lang.Thread.run(Thread.java:829)
> Caution: External email. Do not click or open attachments unless you know
> and trust the sender.
>

Reply via email to