[ https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Natan HP updated FLINK-27395: ----------------------------- Environment: # Minikube {noformat} ➜ minikube version minikube version: v1.25.2 commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7 {noformat} # Apache Flink Docker Images {noformat} apache/flink:1.14.4-scala_2.11{noformat} {noformat} apache/flink:1.15.0-scala_2.12-java11{noformat} was: # Minikube {noformat} ➜ minikube version minikube version: v1.25.2 commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7 {noformat} # Apache Flink Docker Image {noformat} apache/flink:1.14.4-scala_2.11{noformat} > IllegalStateException: Could not find policy 'pick_first'. on Flink > Application > ------------------------------------------------------------------------------- > > Key: FLINK-27395 > URL: https://issues.apache.org/jira/browse/FLINK-27395 > Project: Flink > Issue Type: Bug > Components: Connectors / Google Cloud PubSub > Affects Versions: 1.14.2, 1.14.4 > Environment: # Minikube > {noformat} > ➜ minikube version > minikube version: v1.25.2 > commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7 > {noformat} > # Apache Flink Docker Images > {noformat} > apache/flink:1.14.4-scala_2.11{noformat} > {noformat} > apache/flink:1.15.0-scala_2.12-java11{noformat} > Reporter: Natan HP > Priority: Major > > I got this exception on flink taskmanager, but I can see that the data is > successfully published in the pub sub. Here is the log: > > {noformat} > 2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task > > [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched > from INITIALIZING to RUNNING. > Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 > uncaughtException > SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the > SynchronizationContext. Panic! > java.lang.IllegalStateException: Could not find policy 'pick_first'. Make > sure its implementation is either registered to LoadBalancerRegistry or > included in META-INF/services/io.grpc.LoadBalancerProvider from your jar > files. > at > io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94) > at > io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65) > at > io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375) > at > io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469) > at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) > at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) > at > io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473) > at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253) > at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210) > at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) > at > com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94) > at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314) > at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288) > at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200) > at > com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) > at > com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65) > at > com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) > at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86) > at > com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) > at > com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) > at > com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82) > at > com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) > at > com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) > at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) > at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425) > at > com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471) > at > com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399) > at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88) > at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {noformat} > > > The code sample: > {code:java} > SinkFunction<String> pubsubSink = PubSubSink.newBuilder() > .withSerializationSchema((SerializationSchema<String>) s -> > s.getBytes(StandardCharsets.UTF_8)) > .withProjectName("<project-name>") > .withTopicName("<topic-name>") > .build(); > dataStream.addSink(pubsubSink) > .name("Pub-sub-sink"); {code} > > I use Maven Assembly Plugin to create the uber JAR: > {noformat} > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-assembly-plugin</artifactId> > <version>2.6</version> > <configuration> > <archive> > <manifest> > <mainClass>org.example.flink.Main</mainClass> > </manifest> > </archive> > <descriptorRefs> > <descriptorRef>jar-with-dependencies</descriptorRef> > </descriptorRefs> > </configuration> > <executions> > <execution> > <id>make-assembly</id> > <phase>package</phase> > <goals> > <goal>single</goal> > </goals> > </execution> > </executions> > </plugin>{noformat} > > The content of the JAR: > {noformat} > ➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider > io/grpc/LoadBalancerProvider$UnknownConfig.class > META-INF/services/io.grpc.LoadBalancerProvider > io/grpc/LoadBalancerProvider.class > ➜ jar tf MyApp.jar | grep io.grpc.NameResolverProvider > io/grpc/NameResolverProvider.class > META-INF/services/io.grpc.NameResolverProvider > {noformat} > > What I've tried to solve this: > # Downgrading version to 1.14.2 > {noformat} > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-gcp-pubsub_2.12</artifactId> > <version>1.14.2</version> > </dependency>{noformat} > # Using maven shade plugin (along side maven assembly plugin) with the > following config as suggedted in [here|#issuecomment-474739796]:] > {noformat} > <transformer > implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> > <resource>META-INF/services</resource> > <file>io.grpc.LoadBalancerProvider</file> > </transformer> > <transformer > implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> > <resource>META-INF/services</resource> > <file>io.grpc.NameResolverProvider</file> > </transformer>{noformat} > 3. Creating files inside META-INF/services as suggested in > [here|https://github.com/googleapis/google-cloud-java/issues/4700#issuecomment-477658832]: > {noformat} > Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on > master [⇡!+?] > ➜ ls > io.grpc.LoadBalancerProvider > io.grpc.NameResolverProviderData-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services > on master [⇡!+?] > ➜ cat io.grpc.LoadBalancerProvider > io.grpc.internal.PickFirstLoadBalancerProvider > Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on > master [⇡!+?] > ➜ cat io.grpc.NameResolverProvider > io.grpc.internal.DnsNameResolverProvider > {noformat} > > [UPDATE 09 June 2022]: > I confirm that this exception causing checkpoint to fail: > {noformat} > java.io.IOException: Could not perform checkpoint 1 for operator Sink: > Pub-sub-sink (2/2)#2. > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493) > at > org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:78) > at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:55) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 1 for operator Sink: Pub-sub-sink (2/2)#2. Failure reason: > Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198) > ... 22 more > Caused by: java.lang.RuntimeException: Failed trying to publish message > at > org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink$FailureHandler.onFailure(PubSubSink.java:342) > at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) > at > com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050) > at > org.apache.flink.util.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217) > at > com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176) > at > com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969) > at > com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760) > at > com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95) > at > com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77) > at > com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:52) > at > com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onFailure(Publisher.java:519) > at > com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1500(Publisher.java:486) > at com.google.cloud.pubsub.v1.Publisher$3.onFailure(Publisher.java:462) > at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) > at > com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050) > at > com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) > at > com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176) > at > com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969) > at > com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760) > at > com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:179) > at > com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135) > at > com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:117) > at > com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) > at > com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176) > at > com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969) > at > com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760) > at > com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95) > at > com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77) > at > com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) > at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) > at > com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050) > at > com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) > at > com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176) > at > com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969) > at > com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760) > at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545) > at > io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515) > at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426) > at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751) > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740) > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > ... 1 more > Caused by: com.google.api.gax.rpc.InternalException: > io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug! > at > com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:67) > at > com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) > at > com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) > ... 23 more > Caused by: io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug! > at io.grpc.Status.asRuntimeException(Status.java:533) > ... 15 more > Caused by: java.lang.IllegalStateException: Could not find policy > 'pick_first'. Make sure its implementation is either registered to > LoadBalancerRegistry or included in > META-INF/services/io.grpc.LoadBalancerProvider from your jar files. > at > io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94) > at > io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65) > at > io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375) > at > io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469) > at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) > at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) > at > io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473) > at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253) > at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210) > at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) > at > com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94) > at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314) > at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288) > at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200) > at > com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) > at > com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65) > at > com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) > at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86) > at > com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) > at > com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) > at > com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82) > at > com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) > at > com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) > at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) > at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425) > at > com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471) > at com.google.cloud.pubsub.v1.Publisher.access$800(Publisher.java:88) > at com.google.cloud.pubsub.v1.Publisher$1.run(Publisher.java:292) > ... 6 more{noformat} -- This message was sent by Atlassian Jira (v8.20.7#820007)