Re: Flink KafkaSource still referencing deleted topic
Hi, Robert, The configuration allowNonRestoredState should be used like this: ./bin/flink run --detached --allowNonRestoredState Best, Hang Robert Cullen 于2022年10月12日周三 23:13写道: > I don't see AllowNonRestoredState in the configuration documentation. How > would it be passed to a job? On the command line like this: > > ./bin/flink run --detached -Dallownonrestoredstate=true ... > > On Tue, Oct 4, 2022 at 4:52 PM Martijn Visser > wrote: > >> Hi Mason, >> >> Definitely! Feel free to open a PR and ping me for a review. >> >> Cheers, Martijn >> >> On Tue, Oct 4, 2022 at 3:51 PM Mason Chen wrote: >> >>> Hi Martjin, >>> >>> I notice that this question comes up quite often. Would this be a good >>> addition to the KafkaSource documentation? I'd be happy to contribute to >>> the documentation. >>> >>> Best, >>> Mason >>> >>> On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser >>> wrote: >>> Hi Robert, Based on https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic I think you'll need to change the UID for your KafkaSource and restart your job with allowNonRestoredState enabled. Best regards, Martijn On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen wrote: > We've changed the KafkaSource to ingest from a new topic but the old > name is still being referenced: > > 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure > triggered by OperatorCoordinator for 'Source: Grokfailures' (operator > feca28aff5a3958840bee985ee7de4d3). at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) >at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298) > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)Caused by: > org.apache.flink.util.FlinkRuntimeException: Failed to handle partition > splits change due to at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) >... 3 moreCaused by: java.lang.RuntimeException: Failed to get > topic metadata. at > org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59) >at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >... 3 moreCaused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57) >...
Re: Flink KafkaSource still referencing deleted topic
I don't see AllowNonRestoredState in the configuration documentation. How would it be passed to a job? On the command line like this: ./bin/flink run --detached -Dallownonrestoredstate=true ... On Tue, Oct 4, 2022 at 4:52 PM Martijn Visser wrote: > Hi Mason, > > Definitely! Feel free to open a PR and ping me for a review. > > Cheers, Martijn > > On Tue, Oct 4, 2022 at 3:51 PM Mason Chen wrote: > >> Hi Martjin, >> >> I notice that this question comes up quite often. Would this be a good >> addition to the KafkaSource documentation? I'd be happy to contribute to >> the documentation. >> >> Best, >> Mason >> >> On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser >> wrote: >> >>> Hi Robert, >>> >>> Based on >>> https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic >>> I think you'll need to change the UID for your KafkaSource and restart your >>> job with allowNonRestoredState enabled. >>> >>> Best regards, >>> >>> Martijn >>> >>> On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen >>> wrote: >>> We've changed the KafkaSource to ingest from a new topic but the old name is still being referenced: 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: Grokfailures' (operator feca28aff5a3958840bee985ee7de4d3). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to handle partition splits change due to at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic metadata. at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ... 3 moreCaused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57) ... 10 moreCaused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. -- Robert Cullen 240-475-4490 >>> -- Robert Cullen 240-475-4490
Re: Flink KafkaSource still referencing deleted topic
Hi Mason, Definitely! Feel free to open a PR and ping me for a review. Cheers, Martijn On Tue, Oct 4, 2022 at 3:51 PM Mason Chen wrote: > Hi Martjin, > > I notice that this question comes up quite often. Would this be a good > addition to the KafkaSource documentation? I'd be happy to contribute to > the documentation. > > Best, > Mason > > On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser > wrote: > >> Hi Robert, >> >> Based on >> https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic >> I think you'll need to change the UID for your KafkaSource and restart your >> job with allowNonRestoredState enabled. >> >> Best regards, >> >> Martijn >> >> On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen >> wrote: >> >>> We've changed the KafkaSource to ingest from a new topic but the old >>> name is still being referenced: >>> >>> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure >>> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator >>> feca28aff5a3958840bee985ee7de4d3).at >>> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) >>> at >>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) >>>at >>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) >>> at >>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298) >>> at >>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) >>>at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748)Caused by: >>> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition >>> splits change due to at >>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239) >>> at >>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) >>> at >>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) >>>... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic >>> metadata. at >>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59) >>>at >>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212) >>> at >>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158) >>> at >>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>... 3 moreCaused by: java.util.concurrent.ExecutionException: >>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This >>> server does not host this topic-partition. at >>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) >>> at >>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) >>> at >>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) >>> at >>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) >>> at >>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57) >>>... 10 moreCaused by: >>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This >>> server does not host this topic-partition. >>> >>> >>> -- >>> Robert Cullen >>> 240-475-4490 >>> >>
Re: Flink KafkaSource still referencing deleted topic
Hi Martjin, I notice that this question comes up quite often. Would this be a good addition to the KafkaSource documentation? I'd be happy to contribute to the documentation. Best, Mason On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser wrote: > Hi Robert, > > Based on > https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic > I think you'll need to change the UID for your KafkaSource and restart your > job with allowNonRestoredState enabled. > > Best regards, > > Martijn > > On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen > wrote: > >> We've changed the KafkaSource to ingest from a new topic but the old name >> is still being referenced: >> >> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure >> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator >> feca28aff5a3958840bee985ee7de4d3). at >> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) >> at >> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) >>at >> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) >> at >> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298) >> at >> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) >>at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748)Caused by: >> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition >> splits change due to at >> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239) >> at >> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) >> at >> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) >>... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic >> metadata. at >> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59) >>at >> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212) >> at >> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158) >> at >> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>... 3 moreCaused by: java.util.concurrent.ExecutionException: >> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server >> does not host this topic-partition. at >> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) >> at >> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) >> at >> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) >> at >> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) >> at >> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57) >>... 10 moreCaused by: >> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server >> does not host this topic-partition. >> >> >> -- >> Robert Cullen >> 240-475-4490 >> >
Re: Flink KafkaSource still referencing deleted topic
Hi Robert, Based on https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic I think you'll need to change the UID for your KafkaSource and restart your job with allowNonRestoredState enabled. Best regards, Martijn On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen wrote: > We've changed the KafkaSource to ingest from a new topic but the old name > is still being referenced: > > 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure > triggered by OperatorCoordinator for 'Source: Grokfailures' (operator > feca28aff5a3958840bee985ee7de4d3). at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) >at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298) > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)Caused by: > org.apache.flink.util.FlinkRuntimeException: Failed to handle partition > splits change due to at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) >... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic > metadata. at > org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59) >at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212) > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158) > at > org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >... 3 moreCaused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57) >... 10 moreCaused by: > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > > > -- > Robert Cullen > 240-475-4490 >
Flink KafkaSource still referencing deleted topic
We've changed the KafkaSource to ingest from a new topic but the old name is still being referenced: 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: Grokfailures' (operator feca28aff5a3958840bee985ee7de4d3).at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to handle partition splits change due to at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic metadata. at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ... 3 moreCaused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57) ... 10 moreCaused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. -- Robert Cullen 240-475-4490