[ https://issues.apache.org/jira/browse/FLINK-30158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640182#comment-17640182 ]
James Mcguire commented on FLINK-30158: --------------------------------------- Sorry about that, I meant to provide the Flink schema! It would look something like this for the above protobuf schema: {code:java} CREATE TABLE TestMessages ( first array<BIGINT>, second array<row<nested_first BIGINT, one_of_first BIGINT, one_of_second STRING>> ) COMMENT '' WITH ( 'connector' = 'kafka', 'format' = 'protobuf', 'protobuf.message-class-name' = 'com.example.message.Test', 'properties.auto.offset.reset' = 'earliest', 'properties.bootstrap.servers' = 'host.docker.internal:9092', 'properties.group.id' = 'drawings-1', 'topic' = 'development.integration_events.connect' );{code} > [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using > repeated or map attributes > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-30158 > URL: https://issues.apache.org/jira/browse/FLINK-30158 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem > Affects Versions: 1.16.0 > Reporter: James Mcguire > Priority: Major > > I am encountering a {{java.lang.NullPointerException}} exception when trying > to use Flink SQL to query a kafka topic that uses either {{repeated}} and/or > {{map}} attributes. > > {*}{*}{*}Replication{*} *steps* > # Use a protobuf definition that either uses repeated and/or map. This > protobuf schema should cover a few of the problematic scenarios I ran into: > > {code:java} > syntax = "proto3"; > package example.message; > option java_package = "com.example.message"; > option java_multiple_files = true; > message NestedType { > int64 nested_first = 1; > oneof nested_second { > int64 one_of_first = 2; > string one_of_second = 3; > } > } > message Test { > repeated int64 first = 1; > map<string, NestedType> second = 2; > } {code} > 2. Attempt query on topic, even excluding problematic columns: > > {code:java} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.formats.protobuf.PbCodegenException: > java.lang.NullPointerException{code} > > > log file: > > {code:java} > 2022-11-22 15:33:59,510 WARN org.apache.flink.table.client.cli.CliClient > [] - Could not execute SQL > statement.org.apache.flink.table.client.gateway.SqlExecutionException: Error > while retrieving result. at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.lang.RuntimeException: > Failed to fetch next result at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) > ~[?:?] at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.io.IOException: Failed > to fetch job execution result at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) > ~[?:?] at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: > java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: bc869097009a92d0601add881a6b920c) at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > ~[?:?] at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) > ~[?:?] at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) > ~[?:?] at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: bc869097009a92d0601add881a6b920c) at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) > ~[flink-dist-1.16.0.jar:1.16.0] at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > ~[?:?] at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] at > org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) > ~[flink-dist-1.16.0.jar:1.16.0] at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] at > org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) > ~[flink-dist-1.16.0.jar:1.16.0] at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] at > org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) > ~[flink-dist-1.16.0.jar:1.16.0] at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) > ~[?:?] at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] at java.lang.Thread.run(Thread.java:829) ~[?:?]Caused by: > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128) > ~[flink-dist-1.16.0.jar:1.16.0] at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > ~[?:?] at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] at > org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) > ~[flink-dist-1.16.0.jar:1.16.0] at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] at > org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) > ~[flink-dist-1.16.0.jar:1.16.0] at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] at > org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) > ~[flink-dist-1.16.0.jar:1.16.0] at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) > ~[?:?] at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] at java.lang.Thread.run(Thread.java:829) ~[?:?]Caused by: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) > ~[flink-dist-1.16.0.jar:1.16.0] at > jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source) ~[?:?] > at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) > ~[?:?] at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > ~[?:?] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) > ~[?:?] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) > ~[?:?] at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) > ~[?:?] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) > ~[?:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > ~[?:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > ~[?:?] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > ~[flink-scala_2.12-1.16.0.jar:1.16.0] at > scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > ~[flink-scala_2.12-1.16.0.jar:1.16.0] at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-scala_2.12-1.16.0.jar:1.16.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > ~[flink-scala_2.12-1.16.0.jar:1.16.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > ~[flink-scala_2.12-1.16.0.jar:1.16.0] at > akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?] at > akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?] at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?] at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?] at > akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?] at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?] at > akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?] at > akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?] at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?] at > java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) > ~[?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) > ~[?:?] at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?] > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) > ~[?:?]Caused by: org.apache.flink.formats.protobuf.PbCodegenException: > java.lang.NullPointerException at > org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:126) > ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) > ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) > ~[?:?] at > org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) > ~[?:?] at > org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) > ~[?:?] at > org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) > ~[?:?] at > org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:286) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:94) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:692) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > ~[flink-dist-1.16.0.jar:1.16.0] at java.lang.Thread.run(Thread.java:829) > ~[?:?]Caused by: java.lang.NullPointerException at > org.apache.flink.formats.protobuf.deserialize.PbCodegenRowDeserializer.pbGetMessageElementCode(PbCodegenRowDeserializer.java:106) > ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at > org.apache.flink.formats.protobuf.deserialize.PbCodegenRowDeserializer.codegen(PbCodegenRowDeserializer.java:84) > ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at > org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:109) > ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) > ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) > ~[?:?] at > org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) > ~[?:?] at > org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) > ~[?:?] at > org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) > ~[?:?] at > org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:286) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:94) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:692) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > ~[flink-dist-1.16.0.jar:1.16.0] at java.lang.Thread.run(Thread.java:829) > ~[?:?]{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)