Exception in snapshotState suppresses subsequent checkpoints

2021-06-26 Thread tao xiao
Hi team,

I run a simple 1.12.1 Flink job in IDE with
TolerableCheckpointFailureNumber set
where I throw an exception in source function snapshotState intentionally to
verify how Flink behaves. What I find is the first checkpoint throws the
exception and eventually time out while the main flow continues to work.
This is expected however all subsequent checkpoints don't reach the
exception anymore and report timeout when timeout reaches. I want to know
if this is expected behavior which all later checkpoints cannot finish if
there is one checkpoint that throws exception.

Below is the code the reproduce the behavior
main

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true));
env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(3_000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000);

env.addSource(new FromElementsFunctionT())
.setParallelism(1)
.print()
.setParallelism(1);
env.execute("Demo");


Source function

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.smartnews.dp.kafka.sample.flink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

/**
 * A stream source function that returns a sequence of elements.
 *
 * Upon construction, this source function serializes the elements
using Flink's type
 * information. That way, any object transport using Java
serialization will not be affected by the
 * serializability of the elements.
 *
 * NOTE: This source has a parallelism of 1.
 *
 */
@PublicEvolving
public class FromElementsFunctionT implements SourceFunction,
CheckpointedFunction {

private static final long serialVersionUID = 1L;

/** The number of elements emitted already. */
private volatile int numElementsEmitted;

/** Flag to make the source cancelable. */
private volatile boolean isRunning = true;

private transient ListState checkpointedState;

@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
Preconditions.checkState(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already
been initialized.");

this.checkpointedState =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"from-elements-state",
IntSerializer.INSTANCE));

if (context.isRestored()) {
List retrievedStates = new ArrayList<>();
for (Integer entry : this.checkpointedState.get()) {
retrievedStates.add(entry);
}

// given that the parallelism of the function is 1, we can
only have 1 state
Preconditions.checkArgument(
retrievedStates.size() == 1,
getClass().getSimpleName() + " retrieved invalid state.");

this.numElementsEmitted = retrievedStates.get(0);
}
}

@Override
public void run(SourceContext ctx) throws Exception {
final Object lock = ctx.getCheckpointLock();

while (isRunning && numElementsEmitted < Integer.MAX_VALUE) {
Thread.sleep(1000);
synchronized (lock) {
ctx.collect(numElementsEmitted++);
}
}
}

@Override
public void cancel() {
isRunning = false;
}

// 
//  Checkpointing
// 

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-26 Thread tao xiao
Btw here is the checkpoint related log

[2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
[2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
Checkpoint was declined.
(org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out
(1/1)#0. Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
Caused by: org.apache.flink.util.SerializedThrowable: npe
at
com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
~[classes/:?]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
... 20 more
[2021-06-26 16:08:55,357] INFO Checkpoint 1 of job
afde4a82f41e8284cb0bfff20497a5cc expired before completing.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
output
[2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT) @

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-27 Thread Yun Tang
Hi Tao,

I'm afraid that your Flink job continues to be in high backpressued and all 
subsequent checkpoints did not ever run 'FromElementsFunctionT#snapshotState' 
which means your code to throw exception never be executed. You could check 
those expired checkpoints to see whether your tasks containing 
'FromElementsFunctionT' has ever been completed.

Best
Yun Tang

From: tao xiao 
Sent: Saturday, June 26, 2021 16:40
To: user 
Subject: Re: Exception in snapshotState suppresses subsequent checkpoints

Btw here is the checkpoint related log

[2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
[2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator 
Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: 
Checkpoint was declined. 
(org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out 
(1/1)#0. Failure reason: Checkpoint was declined.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) 
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
Caused by: org.apache.flink.util.SerializedThrowable: npe
at 
com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
 ~[classes/:?]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
 ~[flink-streaming-

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-28 Thread tao xiao
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
... 20 more
[2021-06-26 16:08:55,357] INFO Checkpoint 1 of job
afde4a82f41e8284cb0bfff20497a5cc expired before completing.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
output
[2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
33
34
35
[2021-06-26 16:09:15,349] INFO Checkpoint 2 of job
afde4a82f41e8284cb0bfff20497a5cc expired before completing.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
36
37
38


Main function
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true));
env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(3_000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000);

env.addSource(new FromElementsFunctionT())
.setParallelism(1)
.print()
.setParallelism(1);
env.execute("Demo");

source funciton
package sample.flink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

/**
* A stream source function that returns a sequence of elements.
*
* Upon construction, this source function serializes the elements using
Flink's type
* information. That way, any object transport using Java serialization will
not be affected by the
* serializability of the elements.
*
* NOTE: This source has a parallelism of 1.
*
*/
@PublicEvolving
public class FromElementsFunctionT implements SourceFunction,
CheckpointedFunction
{

private static final long serialVersionUID = 1L;

/** The number of elements emitted already. */
private volatile int numElementsEmitted;

/** Flag to make the source cancelable. */
private volatile boolean isRunning = true;

private transient ListState checkpointedState;

@Override
public void initializeState(FunctionInitializationContext context)
throws Exception
{
Preconditions.checkState(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");

this.checkpointedState =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"from-elements-state", IntSerializer.INSTANCE));

if (context.isRestored()) {
List retrievedStates = new ArrayList<>();
for (Integer entry : this.checkpointedState.get()) {
retrievedStates.add(entry);
}

// given that the parallelism of the function is 1, we can only have 1 state
Preconditions.checkArgument(
retrievedStates.size() == 1,
getClass().getSimpleName() + " retrieved invalid state.");

this.numElementsEmitted = retrievedStates.get(0);
}
}

@Override
public void run(SourceContext ctx) throws Exception {
final Object lock = ctx.getCheckpointLock();

while (isRunning && numElementsEmitted < Integer.MAX_VALUE) {
Thread.sleep(1000);
synchronized (lock) {
ctx.collect(numElementsEmitted++);
}
}
}

@Override
public void cancel() {
isRunning = false;
}

// 
// Checkpointing
// 

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception
{
Preconditions.checkState(
this.checkpointedState != null,
"The " + getClass().getSimpleName() + " has not been properly initialized.")
;

this.checkpointedState.clear();
this.checkpointedState.add(this.numElementsEmitted);
throw new NullPointerException("npe");
}
}




On Mon, Jun 28, 2021 at 2:36 PM Yun Tang  wrote:

> Hi Tao,
>
> I'm afraid that your Flink job continues to be in high backpressued and
> all subsequent checkpoints did not ever run
> 'FromElementsFunctionT#snapshotState' which means your code to throw
> exception never be executed. You could check those expired checkpoints to
> see whether your tasks containing 'FromElementsFunctionT' has ever been
> completed.
>
> Best
> Yun Tang
> --
> *From:* tao xiao 
> *Sent:* Saturday, June 26, 2021 16:40
> *To:* user 
> *Subject:* Re: Exception in snapshotState suppresses subsequent
> checkpoints
>
> Btw here is the checkpoint related log
>
> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKP

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-30 Thread tao xiao
unctionSnapshotContext context) throws Exception
> {
> Preconditions.checkState(
> this.checkpointedState != null,
> "The " + getClass().getSimpleName() + " has not been properly
> initialized.");
>
> this.checkpointedState.clear();
> this.checkpointedState.add(this.numElementsEmitted);
> throw new NullPointerException("npe");
> }
> }
>
>
>
>
> On Mon, Jun 28, 2021 at 2:36 PM Yun Tang  wrote:
>
>> Hi Tao,
>>
>> I'm afraid that your Flink job continues to be in high backpressued and
>> all subsequent checkpoints did not ever run
>> 'FromElementsFunctionT#snapshotState' which means your code to throw
>> exception never be executed. You could check those expired checkpoints to
>> see whether your tasks containing 'FromElementsFunctionT' has ever been
>> completed.
>>
>> Best
>> Yun Tang
>> --
>> *From:* tao xiao 
>> *Sent:* Saturday, June 26, 2021 16:40
>> *To:* user 
>> *Subject:* Re: Exception in snapshotState suppresses subsequent
>> checkpoints
>>
>> Btw here is the checkpoint related log
>>
>> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT)
>> @ 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
>> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
>> [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
>> Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
>> Checkpoint was declined.
>> (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 1 for operator Source: Custom Source -> Sink: Print to
>> Std. Out (1/1)#0. Failure reason: Checkpoint was declined.
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProces

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-07-01 Thread Matthias Pohl
;> new ListStateDescriptor<>(
>> "from-elements-state", IntSerializer.INSTANCE));
>>
>> if (context.isRestored()) {
>> List retrievedStates = new ArrayList<>();
>> for (Integer entry : this.checkpointedState.get()) {
>> retrievedStates.add(entry);
>> }
>>
>> // given that the parallelism of the function is 1, we can only have 1
>> state
>> Preconditions.checkArgument(
>> retrievedStates.size() == 1,
>> getClass().getSimpleName() + " retrieved invalid state.");
>>
>> this.numElementsEmitted = retrievedStates.get(0);
>> }
>> }
>>
>> @Override
>> public void run(SourceContext ctx) throws Exception {
>> final Object lock = ctx.getCheckpointLock();
>>
>> while (isRunning && numElementsEmitted < Integer.MAX_VALUE) {
>> Thread.sleep(1000);
>> synchronized (lock) {
>> ctx.collect(numElementsEmitted++);
>> }
>> }
>> }
>>
>> @Override
>> public void cancel() {
>> isRunning = false;
>> }
>>
>> //
>> 
>> // Checkpointing
>> //
>> 
>>
>> @Override
>> public void snapshotState(FunctionSnapshotContext context) throws Exception
>> {
>> Preconditions.checkState(
>> this.checkpointedState != null,
>> "The " + getClass().getSimpleName() + " has not been properly
>> initialized.");
>>
>> this.checkpointedState.clear();
>> this.checkpointedState.add(this.numElementsEmitted);
>> throw new NullPointerException("npe");
>> }
>> }
>>
>>
>>
>>
>> On Mon, Jun 28, 2021 at 2:36 PM Yun Tang  wrote:
>>
>>> Hi Tao,
>>>
>>> I'm afraid that your Flink job continues to be in high backpressued and
>>> all subsequent checkpoints did not ever run
>>> 'FromElementsFunctionT#snapshotState' which means your code to throw
>>> exception never be executed. You could check those expired checkpoints to
>>> see whether your tasks containing 'FromElementsFunctionT' has ever been
>>> completed.
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* tao xiao 
>>> *Sent:* Saturday, June 26, 2021 16:40
>>> *To:* user 
>>> *Subject:* Re: Exception in snapshotState suppresses subsequent
>>> checkpoints
>>>
>>> Btw here is the checkpoint related log
>>>
>>> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT)
>>> @ 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
>>> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
>>> [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for
>>> operator Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure
>>> reason: Checkpoint was declined.
>>> (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>> complete snapshot 1 for operator Source: Custom Source -> Sink: Print to
>>> Std. Out (1/1)#0. Failure reason: Checkpoint was declined.
>>> at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
>>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
>>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
>>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
>>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
>>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
>>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.streaming.run

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-07-01 Thread Yun Tang
Hi Tao,

I run your program with Flink-1.12.1 and found the problem you described really 
existed. And things would go normal if switching to Flink-1.12.2 version.

After dig into the root cause, I found this is caused by a fixed bug [1]: If a 
legacy source task fails outside of the legacy thread, the legacy thread blocks 
proper cancellation (completion future never completed). As you throw the NPE 
within the source operator, it will never exit and cannot handle subsequent 
checkpoint requirements then. That's why you see all subsequent checkpoints 
cannot finish.


[1] 
https://github.com/apache/flink/commit/b332ce40d88be84d9cf896f446c7c6e26dbc8b6a#diff-54e9ce3b15d6badcc9376ab144df066eb46c4e516d6ee31ef8eb38e2d4359042

Best
Yun Tang

From: Matthias Pohl 
Sent: Thursday, July 1, 2021 16:41
To: tao xiao 
Cc: Yun Tang ; user ; Roman 
Khachatryan 
Subject: Re: Exception in snapshotState suppresses subsequent checkpoints

Hi Tao,
it looks like it should work considering that you have a sleep of 1 second 
before each emission. I'm going to add Roman to this thread. Maybe, he has sees 
something obvious which I'm missing.
Could you run the job with the log level set to debug and provide the logs once 
more? Additionally, having both the TaskManager's and the JobManager's logs 
available would help in understanding what's going on.

Best,
Matthias

On Wed, Jun 30, 2021 at 6:14 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi team,

Does anyone have a clue?

On Mon, Jun 28, 2021 at 3:27 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
My job is very simple as you can see from the code I pasted. I simply print out 
the number to stdout. If you look at the log the number continued to print out 
after checkpoint 1 which indicated no back pressure was happening.  It is very 
easy to reproduce this if you run the code I provided in IDE


LOG

[2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
[2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator 
Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: 
Checkpoint was declined. 
(org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out 
(1/1)#0. Failure reason: Checkpoint was declined.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-07-01 Thread tao xiao
Thanks for the pointer. let me try upgrading the flink

On Thu, Jul 1, 2021 at 5:29 PM Yun Tang  wrote:

> Hi Tao,
>
> I run your program with Flink-1.12.1 and found the problem you described
> really existed. And things would go normal if switching to Flink-1.12.2
> version.
>
> After dig into the root cause, I found this is caused by a fixed bug
> [1]: If a legacy source task fails outside of the legacy thread, the legacy
> thread blocks proper cancellation (completion future never completed). As
> you throw the NPE within the source operator, it will never exit and cannot
> handle subsequent checkpoint requirements then. That's why you see all
> subsequent checkpoints cannot finish.
>
>
> [1]
> https://github.com/apache/flink/commit/b332ce40d88be84d9cf896f446c7c6e26dbc8b6a#diff-54e9ce3b15d6badcc9376ab144df066eb46c4e516d6ee31ef8eb38e2d4359042
>
> Best
> Yun Tang
> --
> *From:* Matthias Pohl 
> *Sent:* Thursday, July 1, 2021 16:41
> *To:* tao xiao 
> *Cc:* Yun Tang ; user ; Roman
> Khachatryan 
> *Subject:* Re: Exception in snapshotState suppresses subsequent
> checkpoints
>
> Hi Tao,
> it looks like it should work considering that you have a sleep of 1 second
> before each emission. I'm going to add Roman to this thread. Maybe, he has
> sees something obvious which I'm missing.
> Could you run the job with the log level set to debug and provide the logs
> once more? Additionally, having both the TaskManager's and the
> JobManager's logs available would help in understanding what's going on.
>
> Best,
> Matthias
>
> On Wed, Jun 30, 2021 at 6:14 PM tao xiao  wrote:
>
> Hi team,
>
> Does anyone have a clue?
>
> On Mon, Jun 28, 2021 at 3:27 PM tao xiao  wrote:
>
> My job is very simple as you can see from the code I pasted. I simply
> print out the number to stdout. If you look at the log the number continued
> to print out after checkpoint 1 which indicated no back pressure was
> happening.  It is very easy to reproduce this if you run the code I
> provided in IDE
>
>
> LOG
>
> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
> [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
> Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
> Checkpoint was declined.
> (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Source: Custom Source -> Sink: Print to
> Std. Out (1/1)#0. Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
> ~[flink-streaming-java_2.11-1