I don't see AllowNonRestoredState in the configuration documentation.  How
would it be passed to a job? On the command line like this:

./bin/flink run --detached -Dallownonrestoredstate=true ...

On Tue, Oct 4, 2022 at 4:52 PM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Mason,
>
> Definitely! Feel free to open a PR and ping me for a review.
>
> Cheers, Martijn
>
> On Tue, Oct 4, 2022 at 3:51 PM Mason Chen <mas.chen6...@gmail.com> wrote:
>
>> Hi Martjin,
>>
>> I notice that this question comes up quite often. Would this be a good
>> addition to the KafkaSource documentation? I'd be happy to contribute to
>> the documentation.
>>
>> Best,
>> Mason
>>
>> On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser <martijnvis...@apache.org>
>> wrote:
>>
>>> Hi Robert,
>>>
>>> Based on
>>> https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
>>> I think you'll need to change the UID for your KafkaSource and restart your
>>> job with allowNonRestoredState enabled.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen <cinquate...@gmail.com>
>>> wrote:
>>>
>>>> We've changed the KafkaSource to ingest from a new topic but the old
>>>> name is still being referenced:
>>>>
>>>> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
>>>> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
>>>> feca28aff5a3958840bee985ee7de4d3).   at 
>>>> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
>>>>       at 
>>>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
>>>>    at 
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
>>>>       at 
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
>>>>  at 
>>>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>>>>        at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>       at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>       at java.lang.Thread.run(Thread.java:748)Caused by: 
>>>> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
>>>> splits change due to         at 
>>>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
>>>>  at 
>>>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
>>>>  at 
>>>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>>>>        ... 3 moreCaused by: java.lang.RuntimeException: Failed to get 
>>>> topic metadata.  at 
>>>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
>>>>    at 
>>>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
>>>>  at 
>>>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
>>>>       at 
>>>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
>>>>      at 
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)    
>>>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>         at 
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>        ... 3 moreCaused by: java.util.concurrent.ExecutionException: 
>>>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
>>>> server does not host this topic-partition.  at 
>>>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>>>>       at 
>>>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>>>>         at 
>>>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>>>>         at 
>>>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>>>>       at 
>>>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
>>>>    ... 10 moreCaused by: 
>>>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
>>>> server does not host this topic-partition.
>>>>
>>>>
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>

-- 
Robert Cullen
240-475-4490

Reply via email to