One more question: Are you changing the parallelism when resuming from
savepoint?

On Sun, May 8, 2022 at 4:05 PM Thomas Weise <t...@apache.org> wrote:
>
> Hi Kevin,
>
> Unfortunately I did not find a way to test the savepoint scenario with
> the MiniCluster. Savepoints are not supported in the embedded mode.
> There is a way to hack around that, but then the state of the
> enumerator won't be handled.
>
> As for your original issue, is it reproducible consistently? Can you
> capture the debug log of the enumerator?
>
> Thanks,
> Thomas
>
> On Wed, May 4, 2022 at 10:05 AM Martijn Visser <martijnvis...@apache.org> 
> wrote:
> >
> > Hi Kevin,
> >
> > I'm hoping that @Thomas Weise could help with the issue regarding the 
> > recovery from the savepoint.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Wed, 4 May 2022 at 17:05, Kevin Lam <kevin....@shopify.com> wrote:
> >>
> >> Following up on this, is there a good way to debug restoring from 
> >> savepoints locally? We currently have a set-up where we use IntelliJ to 
> >> run and test our pipelines locally, but would like an API to be able to 
> >> specify the savepoint to restore from, without needing to spin up a full 
> >> cluster.
> >>
> >> In intelliJ we just use the build and run functionality, and don't have 
> >> access to the Flink CLI.
> >>
> >> On Tue, May 3, 2022 at 2:48 PM Kevin Lam <kevin....@shopify.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> We're encountering an error using a HybridSource that is composed of a 
> >>> FileSource + KafkaSource, only when recovering from a savepoint [0]. This 
> >>> HybridSource is used to read from a Kafka topic's archives hosted on GCS 
> >>> via a bounded FileSource, and then automatically switch over to the data 
> >>> stream from the Kafka associated topic.
> >>>
> >>> Has anyone seen this error before?
> >>>
> >>> [0]
> >>> ```
> >>> 2022-05-03 09:47:57
> >>> org.apache.flink.util.FlinkException: Global failure triggered by 
> >>> OperatorCoordinator for 'Source: ShopAppTrackingEventUpdate_V1' (operator 
> >>> afb3208349a953c47059c1994f800aa2).
> >>> at 
> >>> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
> >>> at 
> >>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
> >>> at 
> >>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
> >>> at 
> >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:358)
> >>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> >>> Source)
> >>> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> >>> Source)
> >>> at java.base/java.lang.Thread.run(Unknown Source)
> >>> Caused by: java.lang.NullPointerException: Source for index=0 not 
> >>> available
> >>> at 
> >>> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> >>> at 
> >>> org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(SwitchedSources.java:36)
> >>> at 
> >>> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:148)
> >>> at 
> >>> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:222)
> >>> at 
> >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)
> >>> at 
> >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)
> >>> ... 3 more
> >>> ```

Reply via email to