Hi Aniket, Currently the filesystem connector does not support option 'pipeline.generic-types'='false', because the connector will output `PartitionCommitInfo` messages for the downstream partition committer operator even when there are no partitions in the sink table. There is a `List<String> partitions` field in `PartitionCommitInfo` which will cause the exception you mentioned in the thread. I have created an issue [1] for this.
[1] https://issues.apache.org/jira/browse/FLINK-32129 Best, Shammon FY On Thu, May 18, 2023 at 9:20 PM Aniket Sule <aniket.s...@netwitness.com> wrote: > Hi, > > I am trying to write data to parquet files using SQL insert statements. > Generic types are disabled in the execution environment. > > There are other queries running in the same job that are > counting/aggregating data. Generic types are disabled as a performance > optimization for those queries. > > > > In this scenario, whenever I try to insert data into parquet files, I get > an exception - > > Caused by: java.lang.UnsupportedOperationException: Generic types have > been disabled in the ExecutionConfig and type java.util.List is treated as > a generic type. > > > > I get the above exception even when I test with a simple table that has no > array or list data types. > > > > Is there any way to write parquet files with generic types disabled? > > > > Thanks and regards, > > Aniket Sule. > > > > > > Here is a way to reproduce what I am seeing. > > My actual source is Kafka with data that is in json format. > > Datagen is simply to quickly reproduce the scenario. > > The environment is Flink 1.17.0. > > I am using the SQL cli. > > > > set 'sql-client.verbose'='true'; > > set 'table.exec.source.idle-timeout'='1000'; > > set 'table.optimizer.join-reorder-enabled'='true'; > > set 'table.exec.mini-batch.enabled'='true'; > > set 'table.exec.mini-batch.allow-latency'='5 s'; > > set 'table.exec.mini-batch.size'='5000'; > > set 'table.optimizer.agg-phase-strategy'='TWO_PHASE'; > > set 'table.optimizer.distinct-agg.split.enabled'='true'; > > set 'table.exec.state.ttl'='360 s'; > > set 'pipeline.object-reuse'='true'; > > set 'pipeline.generic-types'='false'; > > set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true'; > > > > CREATE TABLE source_t ( > > order_number BIGINT, > > order_name string, > > risk float, > > order_time TIMESTAMP(3) > > ) WITH ( > > 'connector' = 'datagen' > > ); > > > > CREATE TABLE file_t ( > > order_number BIGINT, > > order_name string, > > risk float, > > `year` string,`month` string,`day` string,`hour` string > > ) WITH ( > > 'connector'='filesystem', > > 'path' = '/tmp/data', > > 'format'='parquet' > > ); > > > > insert into file_t > > select order_number,order_name,risk , > > date_format(order_time,'yyyy') as `year`, date_format(order_time,'MM') as > `month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH') > as `hour` > > from source_t; > > > > Resulting exception: > > [ERROR] Could not execute SQL statement. Reason: > > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., <Exception on server side: > > org.apache.flink.table.gateway.api.utils.SqlGatewayException: > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to > fetchResults. > > at > org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85) > > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84) > > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52) > > at > org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) > > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) > > at > java.base/java.util.Optional.ifPresent(Optional.java:183) > > at > org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) > > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) > > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) > > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) > > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) > > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) > > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208) > > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) > > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > > at > org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) > > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336) > > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308) > > at > org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) > > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) > > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) > > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) > > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) > > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) > > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > > at java.base/java.lang.Thread.run(Thread.java:829) > > Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: > Failed to fetchResults. > > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229) > > at > org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83) > > ... 48 more > > Caused by: > org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed > to execute the operation 91796fc6-f257-4093-ab0c-1b4addf11e5b. > > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414) > > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267) > > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > > at > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > > at > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > ... 1 more > > Caused by: java.lang.UnsupportedOperationException: Generic types have > been disabled in the ExecutionConfig and type java.util.List is treated as > a generic type. > > at > org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) > > at > org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350) > > at > org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342) > > at > org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037) > > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419) > > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391) > > at > org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:64) > > at > org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:65) > > at > org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator.translateForStreamingInternal(OneInputTransformationTranslator.java:37) > > at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849) > > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) > > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870) > > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828) > > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) > > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2276) > > at > org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) > > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:515) > > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431) > > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:200) > > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) > > at > org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) > > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) > > ... 7 more > > > > End of exception on server side>] > > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536) > > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516) > > at > java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) > > at > java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > at java.base/java.lang.Thread.run(Thread.java:829) > Caution: External email. Do not click or open attachments unless you know > and trust the sender. >