One more question: Are you changing the parallelism when resuming from savepoint?
On Sun, May 8, 2022 at 4:05 PM Thomas Weise <t...@apache.org> wrote: > > Hi Kevin, > > Unfortunately I did not find a way to test the savepoint scenario with > the MiniCluster. Savepoints are not supported in the embedded mode. > There is a way to hack around that, but then the state of the > enumerator won't be handled. > > As for your original issue, is it reproducible consistently? Can you > capture the debug log of the enumerator? > > Thanks, > Thomas > > On Wed, May 4, 2022 at 10:05 AM Martijn Visser <martijnvis...@apache.org> > wrote: > > > > Hi Kevin, > > > > I'm hoping that @Thomas Weise could help with the issue regarding the > > recovery from the savepoint. > > > > Best regards, > > > > Martijn > > > > On Wed, 4 May 2022 at 17:05, Kevin Lam <kevin....@shopify.com> wrote: > >> > >> Following up on this, is there a good way to debug restoring from > >> savepoints locally? We currently have a set-up where we use IntelliJ to > >> run and test our pipelines locally, but would like an API to be able to > >> specify the savepoint to restore from, without needing to spin up a full > >> cluster. > >> > >> In intelliJ we just use the build and run functionality, and don't have > >> access to the Flink CLI. > >> > >> On Tue, May 3, 2022 at 2:48 PM Kevin Lam <kevin....@shopify.com> wrote: > >>> > >>> Hi, > >>> > >>> We're encountering an error using a HybridSource that is composed of a > >>> FileSource + KafkaSource, only when recovering from a savepoint [0]. This > >>> HybridSource is used to read from a Kafka topic's archives hosted on GCS > >>> via a bounded FileSource, and then automatically switch over to the data > >>> stream from the Kafka associated topic. > >>> > >>> Has anyone seen this error before? > >>> > >>> [0] > >>> ``` > >>> 2022-05-03 09:47:57 > >>> org.apache.flink.util.FlinkException: Global failure triggered by > >>> OperatorCoordinator for 'Source: ShopAppTrackingEventUpdate_V1' (operator > >>> afb3208349a953c47059c1994f800aa2). > >>> at > >>> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545) > >>> at > >>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) > >>> at > >>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) > >>> at > >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:358) > >>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > >>> Source) > >>> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > >>> Source) > >>> at java.base/java.lang.Thread.run(Unknown Source) > >>> Caused by: java.lang.NullPointerException: Source for index=0 not > >>> available > >>> at > >>> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > >>> at > >>> org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(SwitchedSources.java:36) > >>> at > >>> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:148) > >>> at > >>> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:222) > >>> at > >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182) > >>> at > >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344) > >>> ... 3 more > >>> ```