Hi team,

Does anyone have a clue?

On Mon, Jun 28, 2021 at 3:27 PM tao xiao <xiaotao...@gmail.com> wrote:

> My job is very simple as you can see from the code I pasted. I simply
> print out the number to stdout. If you look at the log the number continued
> to print out after checkpoint 1 which indicated no back pressure was
> happening.  It is very easy to reproduce this if you run the code I
> provided in IDE
>
>
> ------------LOG----------------
>
> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
> [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
> Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
> Checkpoint was declined.
> (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Source: Custom Source -> Sink: Print to
> Std. Out (1/1)#0. Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> [flink-runtime_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> [flink-runtime_2.11-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
> Caused by: org.apache.flink.util.SerializedThrowable: npe
> at
> com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
> ~[classes/:?]
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> ... 20 more
> [2021-06-26 16:08:55,357] INFO Checkpoint 1 of job
> afde4a82f41e8284cb0bfff20497a5cc expired before completing.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
> output
> [2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
> 33
> 34
> 35
> [2021-06-26 16:09:15,349] INFO Checkpoint 2 of job
> afde4a82f41e8284cb0bfff20497a5cc expired before completing.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
> 36
> 37
> 38
>
>
> Main function
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
> env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true));
> env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(3_000);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000);
>
> env.addSource(new FromElementsFunctionT())
> .setParallelism(1)
> .print()
> .setParallelism(1);
> env.execute("Demo");
>
> source funciton
> package sample.flink;
>
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.flink.annotation.PublicEvolving;
> import org.apache.flink.api.common.state.ListState;
> import org.apache.flink.api.common.state.ListStateDescriptor;
> import org.apache.flink.api.common.typeutils.base.IntSerializer;
> import org.apache.flink.runtime.state.FunctionInitializationContext;
> import org.apache.flink.runtime.state.FunctionSnapshotContext;
> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.util.Preconditions;
>
> /**
> * A stream source function that returns a sequence of elements.
> *
> * <p>Upon construction, this source function serializes the elements
> using Flink's type
> * information. That way, any object transport using Java serialization
> will not be affected by the
> * serializability of the elements.
> *
> * <p><b>NOTE:</b> This source has a parallelism of 1.
> *
> */
> @PublicEvolving
> public class FromElementsFunctionT implements SourceFunction<Integer>, 
> CheckpointedFunction
> {
>
> private static final long serialVersionUID = 1L;
>
> /** The number of elements emitted already. */
> private volatile int numElementsEmitted;
>
> /** Flag to make the source cancelable. */
> private volatile boolean isRunning = true;
>
> private transient ListState<Integer> checkpointedState;
>
> @Override
> public void initializeState(FunctionInitializationContext context) throws 
> Exception
> {
> Preconditions.checkState(
> this.checkpointedState == null,
> "The " + getClass().getSimpleName() + " has already been initialized.");
>
> this.checkpointedState =
> context.getOperatorStateStore()
> .getListState(
> new ListStateDescriptor<>(
> "from-elements-state", IntSerializer.INSTANCE));
>
> if (context.isRestored()) {
> List<Integer> retrievedStates = new ArrayList<>();
> for (Integer entry : this.checkpointedState.get()) {
> retrievedStates.add(entry);
> }
>
> // given that the parallelism of the function is 1, we can only have 1
> state
> Preconditions.checkArgument(
> retrievedStates.size() == 1,
> getClass().getSimpleName() + " retrieved invalid state.");
>
> this.numElementsEmitted = retrievedStates.get(0);
> }
> }
>
> @Override
> public void run(SourceContext<Integer> ctx) throws Exception {
> final Object lock = ctx.getCheckpointLock();
>
> while (isRunning && numElementsEmitted < Integer.MAX_VALUE) {
> Thread.sleep(1000);
> synchronized (lock) {
> ctx.collect(numElementsEmitted++);
> }
> }
> }
>
> @Override
> public void cancel() {
> isRunning = false;
> }
>
> // ------------------------------------------------------------------------
> // Checkpointing
> // ------------------------------------------------------------------------
>
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws Exception
> {
> Preconditions.checkState(
> this.checkpointedState != null,
> "The " + getClass().getSimpleName() + " has not been properly
> initialized.");
>
> this.checkpointedState.clear();
> this.checkpointedState.add(this.numElementsEmitted);
> throw new NullPointerException("npe");
> }
> }
>
>
>
>
> On Mon, Jun 28, 2021 at 2:36 PM Yun Tang <myas...@live.com> wrote:
>
>> Hi Tao,
>>
>> I'm afraid that your Flink job continues to be in high backpressued and
>> all subsequent checkpoints did not ever run
>> 'FromElementsFunctionT#snapshotState' which means your code to throw
>> exception never be executed. You could check those expired checkpoints to
>> see whether your tasks containing 'FromElementsFunctionT' has ever been
>> completed.
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* tao xiao <xiaotao...@gmail.com>
>> *Sent:* Saturday, June 26, 2021 16:40
>> *To:* user <user@flink.apache.org>
>> *Subject:* Re: Exception in snapshotState suppresses subsequent
>> checkpoints
>>
>> Btw here is the checkpoint related log
>>
>> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT)
>> @ 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
>> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
>> [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
>> Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
>> Checkpoint was declined.
>> (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 1 for operator Source: Custom Source -> Sink: Print to
>> Std. Out (1/1)#0. Failure reason: Checkpoint was declined.
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> [flink-runtime_2.11-1.12.1.jar:1.12.1]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> [flink-runtime_2.11-1.12.1.jar:1.12.1]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
>> Caused by: org.apache.flink.util.SerializedThrowable: npe
>> at
>> com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
>> ~[classes/:?]
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
>> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>> ... 20 more
>> [2021-06-26 16:08:55,357] INFO Checkpoint 1 of job
>> afde4a82f41e8284cb0bfff20497a5cc expired before completing.
>> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
>> output
>> [2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT)
>> @ 1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc.
>> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
>> 33
>> 34
>> 35
>> [2021-06-26 16:09:15,349] INFO Checkpoint 2 of job
>> afde4a82f41e8284cb0bfff20497a5cc expired before completing.
>> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
>>
>> On Sat, Jun 26, 2021 at 4:36 PM tao xiao <xiaotao...@gmail.com> wrote:
>>
>> Hi team,
>>
>> I run a simple 1.12.1 Flink job in IDE with
>> TolerableCheckpointFailureNumber set where I throw an exception in
>> source function snapshotState intentionally to verify how Flink behaves.
>> What I find is the first checkpoint throws the exception and eventually
>> time out while the main flow continues to work. This is expected however
>> all subsequent checkpoints don't reach the exception anymore and report
>> timeout when timeout reaches. I want to know if this is expected behavior
>> which all later checkpoints cannot finish if there is one checkpoint that
>> throws exception.
>>
>> Below is the code the reproduce the behavior
>> main
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true));
>> env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE);
>> env.getCheckpointConfig().setCheckpointTimeout(3_000);
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000);
>>
>> env.addSource(new FromElementsFunctionT())
>>     .setParallelism(1)
>>     .print()
>>     .setParallelism(1);
>> env.execute("Demo");
>>
>>
>> Source function
>>
>> /*
>>  * Licensed to the Apache Software Foundation (ASF) under one or more
>>  * contributor license agreements.  See the NOTICE file distributed with
>>  * this work for additional information regarding copyright ownership.
>>  * The ASF licenses this file to You under the Apache License, Version 2.0
>>  * (the "License"); you may not use this file except in compliance with
>>  * the License.  You may obtain a copy of the License at
>>  *
>>  *    http://www.apache.org/licenses/LICENSE-2.0
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>>
>> package sample.flink;
>>
>> import java.util.ArrayList;
>> import java.util.List;
>> import org.apache.flink.annotation.PublicEvolving;
>> import org.apache.flink.api.common.state.ListState;
>> import org.apache.flink.api.common.state.ListStateDescriptor;
>> import org.apache.flink.api.common.typeutils.base.IntSerializer;
>> import org.apache.flink.runtime.state.FunctionInitializationContext;
>> import org.apache.flink.runtime.state.FunctionSnapshotContext;
>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> import org.apache.flink.util.Preconditions;
>>
>> /**
>>  * A stream source function that returns a sequence of elements.
>>  *
>>  * <p>Upon construction, this source function serializes the elements using 
>> Flink's type
>>  * information. That way, any object transport using Java serialization will 
>> not be affected by the
>>  * serializability of the elements.
>>  *
>>  * <p><b>NOTE:</b> This source has a parallelism of 1.
>>  *
>>  */
>> @PublicEvolving
>> public class FromElementsFunctionT implements SourceFunction<Integer>, 
>> CheckpointedFunction {
>>
>>     private static final long serialVersionUID = 1L;
>>
>>     /** The number of elements emitted already. */
>>     private volatile int numElementsEmitted;
>>
>>     /** Flag to make the source cancelable. */
>>     private volatile boolean isRunning = true;
>>
>>     private transient ListState<Integer> checkpointedState;
>>
>>     @Override
>>     public void initializeState(FunctionInitializationContext context) 
>> throws Exception {
>>         Preconditions.checkState(
>>                 this.checkpointedState == null,
>>                 "The " + getClass().getSimpleName() + " has already been 
>> initialized.");
>>
>>         this.checkpointedState =
>>                 context.getOperatorStateStore()
>>                         .getListState(
>>                                 new ListStateDescriptor<>(
>>                                         "from-elements-state", 
>> IntSerializer.INSTANCE));
>>
>>         if (context.isRestored()) {
>>             List<Integer> retrievedStates = new ArrayList<>();
>>             for (Integer entry : this.checkpointedState.get()) {
>>                 retrievedStates.add(entry);
>>             }
>>
>>             // given that the parallelism of the function is 1, we can only 
>> have 1 state
>>             Preconditions.checkArgument(
>>                     retrievedStates.size() == 1,
>>                     getClass().getSimpleName() + " retrieved invalid 
>> state.");
>>
>>             this.numElementsEmitted = retrievedStates.get(0);
>>         }
>>     }
>>
>>     @Override
>>     public void run(SourceContext<Integer> ctx) throws Exception {
>>         final Object lock = ctx.getCheckpointLock();
>>
>>         while (isRunning && numElementsEmitted < Integer.MAX_VALUE) {
>>             Thread.sleep(1000);
>>             synchronized (lock) {
>>                 ctx.collect(numElementsEmitted++);
>>             }
>>         }
>>     }
>>
>>     @Override
>>     public void cancel() {
>>         isRunning = false;
>>     }
>>
>>     // 
>> ------------------------------------------------------------------------
>>     //  Checkpointing
>>     // 
>> ------------------------------------------------------------------------
>>
>>     @Override
>>     public void snapshotState(FunctionSnapshotContext context) throws 
>> Exception {
>>         Preconditions.checkState(
>>                 this.checkpointedState != null,
>>                 "The " + getClass().getSimpleName() + " has not been 
>> properly initialized.");
>>
>>         this.checkpointedState.clear();
>>         this.checkpointedState.add(this.numElementsEmitted);
>>         throw new NullPointerException("npe");
>>     }
>> }
>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao

Reply via email to