[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r735428414 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ## @@ -531,18 +541,113 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { return is; } -private void completeRestore(Collection stateHandles) { -if (!stateHandles.isEmpty()) { -synchronized (materialized) { // ensure visibility -for (ChangelogStateBackendHandle h : stateHandles) { -if (h != null) { -materialized.addAll(h.getMaterializedStateHandles()); - restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); -} -} +public void registerCloseable(@Nullable Closeable closeable) { +closer.register(closeable); +} + +private ChangelogSnapshotState completeRestore( +Collection stateHandles) { + +List materialized = new ArrayList<>(); +List restoredNonMaterialized = new ArrayList<>(); + +for (ChangelogStateBackendHandle h : stateHandles) { +if (h != null) { +materialized.addAll(h.getMaterializedStateHandles()); + restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); } } + changelogStates.clear(); +return new ChangelogSnapshotState( +materialized, +restoredNonMaterialized, +stateChangelogWriter.initialSequenceNumber()); +} + +/** + * Initialize state materialization so that materialized data can be persisted durably and + * included into the checkpoint. + * + * This method is not thread safe. It should be called either under a lock or through task + * mailbox executor. + * + * @return a tuple of - future snapshot result from the underlying state backend - a {@link + * SequenceNumber} identifying the latest change in the changelog + */ +public Optional initMaterialization() throws Exception { +SequenceNumber upTo = getLastAppendedTo(); +SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo(); + +LOG.info( +"Initialize Materialization. Current changelog writers last append to sequence number {}", +upTo); + +if (upTo.compareTo(lastMaterializedTo) > 0) { + +LOG.info("Starting materialization from {} : {}", lastMaterializedTo, upTo); + +return Optional.of( +new MaterializationRunnable( +keyedStateBackend.snapshot( +// This ID is not needed for materialization; +// But since we are re-using the streamFactory +// that is designed for state backend snapshot, +// which requires unique checkpoint ID. +// A faked materialized Id is provided here. +// TODO: implement its own streamFactory. +materializedId++, +System.currentTimeMillis(), Review comment: Yes, you're right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r733855539 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ## @@ -531,18 +541,113 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { return is; } -private void completeRestore(Collection stateHandles) { -if (!stateHandles.isEmpty()) { -synchronized (materialized) { // ensure visibility -for (ChangelogStateBackendHandle h : stateHandles) { -if (h != null) { -materialized.addAll(h.getMaterializedStateHandles()); - restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); -} -} +public void registerCloseable(@Nullable Closeable closeable) { +closer.register(closeable); +} + +private ChangelogSnapshotState completeRestore( +Collection stateHandles) { + +List materialized = new ArrayList<>(); +List restoredNonMaterialized = new ArrayList<>(); + +for (ChangelogStateBackendHandle h : stateHandles) { +if (h != null) { +materialized.addAll(h.getMaterializedStateHandles()); + restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); } } + changelogStates.clear(); +return new ChangelogSnapshotState( +materialized, +restoredNonMaterialized, +stateChangelogWriter.initialSequenceNumber()); +} + +/** + * Initialize state materialization so that materialized data can be persisted durably and + * included into the checkpoint. + * + * This method is not thread safe. It should be called either under a lock or through task + * mailbox executor. + * + * @return a tuple of - future snapshot result from the underlying state backend - a {@link + * SequenceNumber} identifying the latest change in the changelog + */ +public Optional initMaterialization() throws Exception { +SequenceNumber upTo = getLastAppendedTo(); +SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo(); + +LOG.info( +"Initialize Materialization. Current changelog writers last append to sequence number {}", +upTo); + +if (upTo.compareTo(lastMaterializedTo) > 0) { + +LOG.info("Starting materialization from {} : {}", lastMaterializedTo, upTo); + +return Optional.of( +new MaterializationRunnable( +keyedStateBackend.snapshot( +// This ID is not needed for materialization; +// But since we are re-using the streamFactory +// that is designed for state backend snapshot, +// which requires unique checkpoint ID. +// A faked materialized Id is provided here. +// TODO: implement its own streamFactory. +materializedId++, +System.currentTimeMillis(), Review comment: WDYT about using `materializedId++` here to avoid unnecessary system call? ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java ## @@ -121,7 +124,8 @@ ttlTimeProvider, metricGroup, baseHandles, -cancelStreamRegistry)); +cancelStreamRegistry), +cancelStreamRegistry); Review comment: This argument is unused (ditto 2nd constructor). ## File path: flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See t
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r731608691 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Stateless Materialization Manager. */ +public class PeriodicMaterializationManager implements Closeable { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final String subtaskName; + +private final long periodicMaterializeDelay; + +/** Allowed number of consecutive materialization failures. */ +private final int allowedNumberOfFailures; + +/** Number of consecutive materialization failures. */ +private final AtomicInteger numberOfConsecutiveFailures; + +private final ChangelogKeyedStateBackend keyedStateBackend; + +private boolean started = false; + +PeriodicMaterializationManager( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +String subtaskName, +AsyncExceptionHandler asyncExceptionHandler, +ChangelogKeyedStateBackend keyedStateBackend, +long periodicMaterializeDelay, +int allowedNumberOfFailures) { +this.mailboxExecutor = checkNotNull(mailboxExecutor); +this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool); +this.subtaskName = checkNotNull(subtaskName); +this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler); +this.keyedStateBackend = checkNotNull(keyedStateBackend); + +this.periodicMaterializeDelay = periodicMaterializeDelay; +this.allowedNumberOfFailures = allowedNumberOfFailures; +this.numberOfConsecutiveFailures = new AtomicInteger(0); + +this.periodicExecutor = +Executors.newSingleThreadScheduledExecutor( +new ExecutorThreadFactory( +"periodic-materialization-scheduler-" + subtaskName)); +} + +public void start() { +if (!started) { + +started = true; + +LOG.info( +"Task {} starts periodic materialization, scheduling the next one in {} seconds", +subtaskName, +periodicMaterializeDelay / 1000); + +scheduleNextMaterialization(); +} +} + +private void triggerMaterialization() { +mailboxExecutor.execute( +() -> { +Optional materializationRunnableOptional = +
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r730782566 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Stateless Materialization Manager. */ +public class PeriodicMaterializationManager implements Closeable { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final String subtaskName; + +private final long periodicMaterializeDelay; + +/** Allowed number of consecutive materialization failures. */ +private final int allowedNumberOfFailures; + +/** Number of consecutive materialization failures. */ +private final AtomicInteger numberOfConsecutiveFailures; + +private final ChangelogKeyedStateBackend keyedStateBackend; + +private boolean started = false; + +PeriodicMaterializationManager( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +String subtaskName, +AsyncExceptionHandler asyncExceptionHandler, +ChangelogKeyedStateBackend keyedStateBackend, +long periodicMaterializeDelay, +int allowedNumberOfFailures) { +this.mailboxExecutor = checkNotNull(mailboxExecutor); +this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool); +this.subtaskName = checkNotNull(subtaskName); +this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler); +this.keyedStateBackend = checkNotNull(keyedStateBackend); + +this.periodicMaterializeDelay = periodicMaterializeDelay; +this.allowedNumberOfFailures = allowedNumberOfFailures; +this.numberOfConsecutiveFailures = new AtomicInteger(0); + +this.periodicExecutor = +Executors.newSingleThreadScheduledExecutor( +new ExecutorThreadFactory( +"periodic-materialization-scheduler-" + subtaskName)); +} + +public void start() { +if (!started) { + +started = true; + +LOG.info( +"Task {} starts periodic materialization, scheduling the next one in {} seconds", +subtaskName, +periodicMaterializeDelay / 1000); + +scheduleNextMaterialization(); +} +} + +private void triggerMaterialization() { +mailboxExecutor.execute( +() -> { +Optional materializationRunnableOptional = +
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r717260198 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + +/** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ +private static final CheckpointOptions CHECKPOINT_OPTIONS = +new CheckpointOptions( +CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final CheckpointStreamFactory streamFactory; + +private final Snapshotable keyedStateBackend; + +private final StateChangelogWriter stateChangelogWriter; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final int allowedNumberOfFailures; + +/** Materialization failure retries. */ +private final AtomicInteger retries; + +/** Making sure only one materialization on going at a time. */ +private final AtomicBoolean materializationOnGoing; + +private long materializedId; + +private MaterializedState materializedState; + +public PeriodicMaterializer( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +Snapshotable keyedStateBackend, +CheckpointStorageWorkerView checkpointStorageWorkerView, +StateChangelogWriter stateChangelogWriter, +AsyncExceptionHandler asyncExceptionHandler, +MaterializedState materializedState, +long periodicMaterializeInitDelay, +long periodicMaterialize
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r717269001 ## File path: flink-state-backends/flink-statebackend-changelog/src/test/resources/log4j2-test.properties ## @@ -18,11 +18,11 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = INFO Review comment: > I will change the logging level back. Thanks! > BTW, what's the problem for layout.pattern? Do you mean we should remove it? Actually, the change is correct, non-standard pattern was used before, sorry. Let's keep this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r717267357 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Stateless Materialization Manager. */ +public class PeriodicMaterializationManager implements Closeable { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final String subtaskName; + +private final long periodicMaterializeDelay; + +/** Allowed number of consecutive materialization failures. */ +private final int allowedNumberOfFailures; + +/** Number of consecutive materialization failures. */ +private final AtomicInteger numberOfConsecutiveFailures; + +private final ChangelogKeyedStateBackend keyedStateBackend; + +private boolean started = false; + +PeriodicMaterializationManager( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +String subtaskName, +AsyncExceptionHandler asyncExceptionHandler, +ChangelogKeyedStateBackend keyedStateBackend, +long periodicMaterializeDelay, +int allowedNumberOfFailures) { +this.mailboxExecutor = checkNotNull(mailboxExecutor); +this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool); +this.subtaskName = checkNotNull(subtaskName); +this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler); +this.keyedStateBackend = checkNotNull(keyedStateBackend); + +this.periodicMaterializeDelay = periodicMaterializeDelay; +this.allowedNumberOfFailures = allowedNumberOfFailures; +this.numberOfConsecutiveFailures = new AtomicInteger(0); + +this.periodicExecutor = +Executors.newSingleThreadScheduledExecutor( +new ExecutorThreadFactory( +"periodic-materialization-scheduler-" + subtaskName)); +} + +public void start() { +if (!started) { + +started = true; + +LOG.info( +"Task {} starts periodic materialization, scheduling the next one in {} seconds", +subtaskName, +periodicMaterializeDelay / 1000); + +scheduleNextMaterialization(); +} +} + +private void triggerMaterialization() { +mailboxExecutor.execute( +() -> { +Optional materializationRunnableOptional = +
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r717262224 ## File path: pom.xml ## @@ -1518,7 +1518,7 @@ under the License. random: enable it randomly, unless explicitly set unset: don't alter the configuration --> - random + on Review comment: Could you then move it to a separate commit please? (so that it's not accidentially merged) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r717262224 ## File path: pom.xml ## @@ -1518,7 +1518,7 @@ under the License. random: enable it randomly, unless explicitly set unset: don't alter the configuration --> - random + on Review comment: Could you then move it to a separate commit please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r717261649 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -127,13 +128,29 @@ private boolean forceAvro = false; private long autoWatermarkInterval = 200; +// -- statebackend related configurations -- /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; +/** Interval in milliseconds to perform periodic materialization. */ +private long periodicMaterializeInterval = + StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue(); + +/** Interval in milliseconds for initial delay of periodic materialization. */ +private long periodicMaterializeInitDelay = + StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue(); + +/** Max allowed number of failures */ +private int materializationMaxAllowedFailures = + StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue(); + +/** Flag to enable periodic materialization */ +private boolean isPeriodicMaterializationEnabled = false; Review comment: Resolved by removing the flag. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r717260198 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + +/** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ +private static final CheckpointOptions CHECKPOINT_OPTIONS = +new CheckpointOptions( +CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final CheckpointStreamFactory streamFactory; + +private final Snapshotable keyedStateBackend; + +private final StateChangelogWriter stateChangelogWriter; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final int allowedNumberOfFailures; + +/** Materialization failure retries. */ +private final AtomicInteger retries; + +/** Making sure only one materialization on going at a time. */ +private final AtomicBoolean materializationOnGoing; + +private long materializedId; + +private MaterializedState materializedState; + +public PeriodicMaterializer( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +Snapshotable keyedStateBackend, +CheckpointStorageWorkerView checkpointStorageWorkerView, +StateChangelogWriter stateChangelogWriter, +AsyncExceptionHandler asyncExceptionHandler, +MaterializedState materializedState, +long periodicMaterializeInitDelay, +long periodicMaterialize
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716882042 ## File path: flink-core/src/main/java/org/apache/flink/configuration/ChangelogOptions.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.docs.Documentation; +import org.apache.flink.configuration.description.Description; + +import java.time.Duration; + +/** A collection of all configuration options that relate to changelog. */ +public class ChangelogOptions { + +@Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG) +public static final ConfigOption PERIODIC_MATERIALIZATION_INTERVAL = + ConfigOptions.key("state.backend.changelog.periodic-materialize.interval") +.durationType() +.defaultValue(Duration.ofMinutes(10)) +.withDescription( +"Defines the interval in milliseconds to perform " ++ "periodic materialization for state backend."); + +@Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG) +public static final ConfigOption MATERIALIZATION_MAX_FAILURES_ALLOWED = +ConfigOptions.key("state.backend.changelog.max.failures.allowed") Review comment: How about `state.backend.changelog.max-failures` ? I think dots are already used here to separate sections. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716882744 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ## @@ -258,4 +259,10 @@ default void setAsyncOperationsThreadPool(ExecutorService executorService) {} default ExecutorService getAsyncOperationsThreadPool() { throw new UnsupportedOperationException(); } + +default void setCheckpointStorageAccess(CheckpointStorageAccess checkpointStorageAccess) {} + +default CheckpointStorageAccess getCheckpointStorageAccess() { +throw new UnsupportedOperationException(); +} Review comment: I think this change (and the related ones) should belong to another commit (not `move AsyncExceptionHandler ...`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716882744 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ## @@ -258,4 +259,10 @@ default void setAsyncOperationsThreadPool(ExecutorService executorService) {} default ExecutorService getAsyncOperationsThreadPool() { throw new UnsupportedOperationException(); } + +default void setCheckpointStorageAccess(CheckpointStorageAccess checkpointStorageAccess) {} + +default CheckpointStorageAccess getCheckpointStorageAccess() { +throw new UnsupportedOperationException(); +} Review comment: I think this change (and the related ones) belong to the wrong commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716883256 ## File path: flink-state-backends/flink-statebackend-changelog/src/test/resources/log4j2-test.properties ## @@ -18,11 +18,11 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = INFO Review comment: The common approach is to disable logging by default and change level on demand. Can't we use it here? ditto: `layout.pattern` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716867123 ## File path: pom.xml ## @@ -1518,7 +1518,7 @@ under the License. random: enable it randomly, unless explicitly set unset: don't alter the configuration --> - random + on Review comment: Is this change temporary and should be dropped before merging? ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Stateless Materialization Manager. */ +public class PeriodicMaterializationManager implements Closeable { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final String subtaskName; + +private final long periodicMaterializeDelay; + +/** Allowed number of consecutive materialization failures. */ +private final int allowedNumberOfFailures; + +/** Number of consecutive materialization failures. */ +private final AtomicInteger numberOfConsecutiveFailures; + +private final ChangelogKeyedStateBackend keyedStateBackend; + +private boolean started = false; + +PeriodicMaterializationManager( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +String subtaskName, +AsyncExceptionHandler asyncExceptionHandler, +ChangelogKeyedStateBackend keyedStateBackend, +long periodicMaterializeDelay, +int allowedNumberOfFailures) { +this.mailboxExecutor = checkNotNull(mailboxExecutor); +this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool); +this.subtaskName = checkNotNull(subtaskName); +this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler); +this.keyedStateBackend = checkNotNull(keyedStateBackend); + +this.periodicMaterializeDelay = periodicMaterializeDelay; +this.allowedNumberOfFailures = allowedNumberOfFailures; +this.numberOfConsecutiveFailures = new AtomicInteger(0); + +this.periodicExecutor = +Executors.newSingleThreadScheduledExecutor( +new ExecutorThreadFactory( +"periodic-materialization-scheduler-" + subtaskName)); +} + +public
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716642699 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** Stateless Materialization Manager. */ +public class PeriodicMaterializationManager implements Closeable { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final String subtaskName; + +private final long periodicMaterializeDelay; + +/** Allowed number of consecutive materialization failures. */ +private final int allowedNumberOfFailures; + +/** Number of consecutive materialization failures. */ +private final AtomicInteger numberOfConsecutiveFailures; + +private final ChangelogKeyedStateBackend keyedStateBackend; + +PeriodicMaterializationManager( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +String subtaskName, +AsyncExceptionHandler asyncExceptionHandler, +long periodicMaterializeDelay, +int allowedNumberOfFailures, +ChangelogKeyedStateBackend keyedStateBackend) { +this.mailboxExecutor = mailboxExecutor; +this.asyncOperationsThreadPool = asyncOperationsThreadPool; + +this.subtaskName = subtaskName; +this.periodicMaterializeDelay = periodicMaterializeDelay; +this.asyncExceptionHandler = asyncExceptionHandler; +this.allowedNumberOfFailures = allowedNumberOfFailures; +this.numberOfConsecutiveFailures = new AtomicInteger(0); +this.keyedStateBackend = keyedStateBackend; + +this.periodicExecutor = +Executors.newSingleThreadScheduledExecutor( +new ExecutorThreadFactory( +"periodic-materialization-scheduler-" + subtaskName)); +} + +public void start() { Review comment: It's not called twice currently - and I'm proposing to enforce this; so that it's not re-used for example. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716642699 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** Stateless Materialization Manager. */ +public class PeriodicMaterializationManager implements Closeable { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final String subtaskName; + +private final long periodicMaterializeDelay; + +/** Allowed number of consecutive materialization failures. */ +private final int allowedNumberOfFailures; + +/** Number of consecutive materialization failures. */ +private final AtomicInteger numberOfConsecutiveFailures; + +private final ChangelogKeyedStateBackend keyedStateBackend; + +PeriodicMaterializationManager( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +String subtaskName, +AsyncExceptionHandler asyncExceptionHandler, +long periodicMaterializeDelay, +int allowedNumberOfFailures, +ChangelogKeyedStateBackend keyedStateBackend) { +this.mailboxExecutor = mailboxExecutor; +this.asyncOperationsThreadPool = asyncOperationsThreadPool; + +this.subtaskName = subtaskName; +this.periodicMaterializeDelay = periodicMaterializeDelay; +this.asyncExceptionHandler = asyncExceptionHandler; +this.allowedNumberOfFailures = allowedNumberOfFailures; +this.numberOfConsecutiveFailures = new AtomicInteger(0); +this.keyedStateBackend = keyedStateBackend; + +this.periodicExecutor = +Executors.newSingleThreadScheduledExecutor( +new ExecutorThreadFactory( +"periodic-materialization-scheduler-" + subtaskName)); +} + +public void start() { Review comment: It's not called twice currently - and I'm proposing to enforce this; so that it's not re-used for example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716640216 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ## @@ -531,18 +539,76 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { return is; } -private void completeRestore(Collection stateHandles) { -if (!stateHandles.isEmpty()) { -synchronized (materialized) { // ensure visibility -for (ChangelogStateBackendHandle h : stateHandles) { -if (h != null) { -materialized.addAll(h.getMaterializedStateHandles()); - restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); -} -} +private ChangelogSnapshotState completeRestore( +Collection stateHandles) { + +List materialized = new ArrayList<>(); +List restoredNonMaterialized = new ArrayList<>(); + +for (ChangelogStateBackendHandle h : stateHandles) { +if (h != null) { +materialized.addAll(h.getMaterializedStateHandles()); + restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); } } + changelogStates.clear(); +return new ChangelogSnapshotState( +materialized, +restoredNonMaterialized, +stateChangelogWriter.initialSequenceNumber()); +} + +/** + * Initialize state materialization so that materialized data can be persisted durably and + * included into the checkpoint. + * + * This method is not thread safe. It should be called either under a lock or through task + * mailbox executor. + * + * @return a tuple of - future snapshot result from the underlying state backend - a {@link + * SequenceNumber} identifying the latest change in the changelog + */ +public Optional initMaterialization() throws Exception { +SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber(); + +if (upTo.compareTo(changelogSnapshotState.lastMaterializedTo()) > 0) { +return Optional.of( +new MaterializationRunnable( +keyedStateBackend.snapshot( +// This ID is not needed for materialization; +// But since we are re-using the streamFactory +// that is designed for state backend snapshot, +// which requires unique checkpoint ID. +// A faked materialized Id is provided here. +// TODO: implement its own streamFactory. +materializedId++, +System.currentTimeMillis(), +streamFactory, +CHECKPOINT_OPTIONS), +// TODO: add metadata to log FLINK-23170 +upTo)); +} else { +return Optional.empty(); +} +} + +/** + * This method is not thread safe. It should be called either under a lock or through task + * mailbox executor. + */ +public void updateChangelogSnapshotState( +SnapshotResult materializedSnapshot, SequenceNumber upTo) { +changelogSnapshotState = +new ChangelogSnapshotState( +getMaterializedResult(materializedSnapshot), new ArrayList<>(), upTo); Review comment: I mean a method like this in `ChangelogSnapshotState` class: ``` public static ChangelogSnapshotState materialized( SnapshotResult snapshot, SequenceNumber upTo) { return new ChangelogSnapshotState( getMaterializedResult(snapshot), Collections.emptyList(), upTo); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716222670 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** Stateless Materialization Manager. */ +public class PeriodicMaterializationManager implements Closeable { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final String subtaskName; + +private final long periodicMaterializeDelay; + +/** Allowed number of consecutive materialization failures. */ +private final int allowedNumberOfFailures; + +/** Number of consecutive materialization failures. */ +private final AtomicInteger numberOfConsecutiveFailures; + +private final ChangelogKeyedStateBackend keyedStateBackend; + +PeriodicMaterializationManager( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +String subtaskName, +AsyncExceptionHandler asyncExceptionHandler, +long periodicMaterializeDelay, +int allowedNumberOfFailures, +ChangelogKeyedStateBackend keyedStateBackend) { +this.mailboxExecutor = mailboxExecutor; +this.asyncOperationsThreadPool = asyncOperationsThreadPool; + +this.subtaskName = subtaskName; +this.periodicMaterializeDelay = periodicMaterializeDelay; +this.asyncExceptionHandler = asyncExceptionHandler; +this.allowedNumberOfFailures = allowedNumberOfFailures; +this.numberOfConsecutiveFailures = new AtomicInteger(0); +this.keyedStateBackend = keyedStateBackend; + +this.periodicExecutor = +Executors.newSingleThreadScheduledExecutor( +new ExecutorThreadFactory( +"periodic-materialization-scheduler-" + subtaskName)); +} + +public void start() { +LOG.info( +"Task {} starts periodic materialization, scheduling the next one in {} seconds", +subtaskName, +periodicMaterializeDelay / 1000); + +scheduleNextMaterialization(); +} + +private void triggerMaterialization() { +mailboxExecutor.execute( +() -> { +Optional materializationRunnableOptional = +keyedStateBackend.initMaterialization(); + +if (materializationRunnableOptional.isPresent()) { +MaterializationRunnable runnable = materializationRunnableOptional.get(); +asyn
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r716222836 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** Stateless Materialization Manager. */ +public class PeriodicMaterializationManager implements Closeable { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final String subtaskName; + +private final long periodicMaterializeDelay; + +/** Allowed number of consecutive materialization failures. */ +private final int allowedNumberOfFailures; + +/** Number of consecutive materialization failures. */ +private final AtomicInteger numberOfConsecutiveFailures; + +private final ChangelogKeyedStateBackend keyedStateBackend; + +PeriodicMaterializationManager( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +String subtaskName, +AsyncExceptionHandler asyncExceptionHandler, +long periodicMaterializeDelay, +int allowedNumberOfFailures, +ChangelogKeyedStateBackend keyedStateBackend) { +this.mailboxExecutor = mailboxExecutor; +this.asyncOperationsThreadPool = asyncOperationsThreadPool; + +this.subtaskName = subtaskName; +this.periodicMaterializeDelay = periodicMaterializeDelay; +this.asyncExceptionHandler = asyncExceptionHandler; +this.allowedNumberOfFailures = allowedNumberOfFailures; +this.numberOfConsecutiveFailures = new AtomicInteger(0); +this.keyedStateBackend = keyedStateBackend; + +this.periodicExecutor = +Executors.newSingleThreadScheduledExecutor( +new ExecutorThreadFactory( +"periodic-materialization-scheduler-" + subtaskName)); +} + +public void start() { +LOG.info( +"Task {} starts periodic materialization, scheduling the next one in {} seconds", +subtaskName, +periodicMaterializeDelay / 1000); + +scheduleNextMaterialization(); +} + +private void triggerMaterialization() { +mailboxExecutor.execute( +() -> { +Optional materializationRunnableOptional = +keyedStateBackend.initMaterialization(); + +if (materializationRunnableOptional.isPresent()) { +MaterializationRunnable runnable = materializationRunnableOptional.get(); +asyn
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r71628 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ## @@ -531,18 +539,76 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { return is; } -private void completeRestore(Collection stateHandles) { -if (!stateHandles.isEmpty()) { -synchronized (materialized) { // ensure visibility -for (ChangelogStateBackendHandle h : stateHandles) { -if (h != null) { -materialized.addAll(h.getMaterializedStateHandles()); - restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); -} -} +private ChangelogSnapshotState completeRestore( +Collection stateHandles) { + +List materialized = new ArrayList<>(); +List restoredNonMaterialized = new ArrayList<>(); + +for (ChangelogStateBackendHandle h : stateHandles) { +if (h != null) { +materialized.addAll(h.getMaterializedStateHandles()); + restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); } } + changelogStates.clear(); +return new ChangelogSnapshotState( +materialized, +restoredNonMaterialized, +stateChangelogWriter.initialSequenceNumber()); +} + +/** + * Initialize state materialization so that materialized data can be persisted durably and + * included into the checkpoint. + * + * This method is not thread safe. It should be called either under a lock or through task + * mailbox executor. + * + * @return a tuple of - future snapshot result from the underlying state backend - a {@link + * SequenceNumber} identifying the latest change in the changelog + */ +public Optional initMaterialization() throws Exception { +SequenceNumber upTo = stateChangelogWriter.lastAppendedSequenceNumber(); + +if (upTo.compareTo(changelogSnapshotState.lastMaterializedTo()) > 0) { +return Optional.of( +new MaterializationRunnable( +keyedStateBackend.snapshot( +// This ID is not needed for materialization; +// But since we are re-using the streamFactory +// that is designed for state backend snapshot, +// which requires unique checkpoint ID. +// A faked materialized Id is provided here. +// TODO: implement its own streamFactory. +materializedId++, +System.currentTimeMillis(), +streamFactory, +CHECKPOINT_OPTIONS), +// TODO: add metadata to log FLINK-23170 +upTo)); +} else { +return Optional.empty(); +} +} + +/** + * This method is not thread safe. It should be called either under a lock or through task + * mailbox executor. + */ +public void updateChangelogSnapshotState( +SnapshotResult materializedSnapshot, SequenceNumber upTo) { +changelogSnapshotState = +new ChangelogSnapshotState( +getMaterializedResult(materializedSnapshot), new ArrayList<>(), upTo); Review comment: nit: `Collections.emptyList`? nit: factory method with two args? ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ## @@ -625,4 +691,52 @@ public T get(long timeout, TimeUnit unit) } }; } + +/** + * Snapshot State for ChangelogKeyedStatebackend. + * + * It includes three parts: - materialized snapshot from the underlying delegated state + * backend - non-materialized part in the current changelog - non-materialized changelog, from + * previous logs (before failover or rescaling) + */ +private static class ChangelogSnapshotState { +/** + * Materialized snapshot from the underlying delegated state backend. Set initially on + * restore and later upon materialization. + */ +private final List materializedSnapshot; + +/** + * The {@link SequenceNumber} up to which the state is materialized, exclusive. This + * indicates the non-materialized part of the current changelog. + */ +private final SequenceNumber mater
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r703330721 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ## @@ -329,37 +332,47 @@ public boolean deregisterKeySelectionListener(KeySelectionListener listener) // materialization may truncate only a part of the previous result and the backend would // have to split it somehow for the former option, so the latter is used. lastCheckpointId = checkpointId; -lastUploadedFrom = materializedTo; +lastUploadedFrom = periodicMaterializer.getMaterializedState().lastMaterializedTo(); lastUploadedTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); LOG.debug( "snapshot for checkpoint {}, change range: {}..{}", checkpointId, lastUploadedFrom, lastUploadedTo); + +MaterializedState materializedStateCopy = periodicMaterializer.getMaterializedState(); + return toRunnableFuture( stateChangelogWriter .persist(lastUploadedFrom) -.thenApply(this::buildSnapshotResult)); +.thenApply(delta -> buildSnapshotResult(delta, materializedStateCopy))); } -private SnapshotResult buildSnapshotResult(ChangelogStateHandle delta) { -// Can be called by either task thread during the sync checkpoint phase (if persist future -// was already completed); or by the writer thread otherwise. So need to synchronize. -// todo: revisit after FLINK-21357 - use mailbox action? -synchronized (materialized) { -// collections don't change once started and handles are immutable -List prevDeltaCopy = new ArrayList<>(restoredNonMaterialized); -if (delta != null && delta.getStateSize() > 0) { -prevDeltaCopy.add(delta); -} -if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) { -return SnapshotResult.empty(); -} else { -return SnapshotResult.of( -new ChangelogStateBackendHandleImpl( -materialized, prevDeltaCopy, getKeyGroupRange())); -} +@Override +@VisibleForTesting +public void triggerMaterialization() { +periodicMaterializer.triggerMaterialization(); +} Review comment: As we discussed offline, this only works because a direct executor is used when creating materilizier (by "works" I mean the call waits for the materialization). Please add a comment at the very least. However, I still think the test or production code structure is wrong. With a proper separation between materializer and backend, we should be able to listen for materialization completion and take a snapshot only after that (instead of relying on the internal materializer behavior). edit: Can we just poll materializer in a loop and wait for materialization to happen AND complete? (So that we don't explicitly call `triggerMaterialization` and don't have this method) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r703330721 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ## @@ -329,37 +332,47 @@ public boolean deregisterKeySelectionListener(KeySelectionListener listener) // materialization may truncate only a part of the previous result and the backend would // have to split it somehow for the former option, so the latter is used. lastCheckpointId = checkpointId; -lastUploadedFrom = materializedTo; +lastUploadedFrom = periodicMaterializer.getMaterializedState().lastMaterializedTo(); lastUploadedTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); LOG.debug( "snapshot for checkpoint {}, change range: {}..{}", checkpointId, lastUploadedFrom, lastUploadedTo); + +MaterializedState materializedStateCopy = periodicMaterializer.getMaterializedState(); + return toRunnableFuture( stateChangelogWriter .persist(lastUploadedFrom) -.thenApply(this::buildSnapshotResult)); +.thenApply(delta -> buildSnapshotResult(delta, materializedStateCopy))); } -private SnapshotResult buildSnapshotResult(ChangelogStateHandle delta) { -// Can be called by either task thread during the sync checkpoint phase (if persist future -// was already completed); or by the writer thread otherwise. So need to synchronize. -// todo: revisit after FLINK-21357 - use mailbox action? -synchronized (materialized) { -// collections don't change once started and handles are immutable -List prevDeltaCopy = new ArrayList<>(restoredNonMaterialized); -if (delta != null && delta.getStateSize() > 0) { -prevDeltaCopy.add(delta); -} -if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) { -return SnapshotResult.empty(); -} else { -return SnapshotResult.of( -new ChangelogStateBackendHandleImpl( -materialized, prevDeltaCopy, getKeyGroupRange())); -} +@Override +@VisibleForTesting +public void triggerMaterialization() { +periodicMaterializer.triggerMaterialization(); +} Review comment: As we discussed offline, this only works because a direct executor is used when creating materilizier (by "works" I mean the call waits for the materialization). Please add a comment at the very least. However, I still think the test or production code structure is wrong. With a proper separation between materializer and backend, we should be able to listen for materialization completion and take a snapshot only after that (instead of relying on the internal materializer behavior). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r691590608 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + +/** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ +private static final CheckpointOptions CHECKPOINT_OPTIONS = +new CheckpointOptions( +CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final CheckpointStreamFactory streamFactory; + +private final Snapshotable keyedStateBackend; + +private final StateChangelogWriter stateChangelogWriter; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final int allowedNumberOfFailures; + +/** Materialization failure retries. */ +private final AtomicInteger retries; + +/** Making sure only one materialization on going at a time. */ +private final AtomicBoolean materializationOnGoing; + +private long materializedId; + +private MaterializedState materializedState; + +public PeriodicMaterializer( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +Snapshotable keyedStateBackend, +CheckpointStorageWorkerView checkpointStorageWorkerView, +StateChangelogWriter stateChangelogWriter, +AsyncExceptionHandler asyncExceptionHandler, +MaterializedState materializedState, +long periodicMaterializeInitDelay, +long periodicMaterialize
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r691587190 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + +/** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ +private static final CheckpointOptions CHECKPOINT_OPTIONS = +new CheckpointOptions( +CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final CheckpointStreamFactory streamFactory; + +private final Snapshotable keyedStateBackend; + +private final StateChangelogWriter stateChangelogWriter; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final int allowedNumberOfFailures; + +/** Materialization failure retries. */ +private final AtomicInteger retries; + +/** Making sure only one materialization on going at a time. */ +private final AtomicBoolean materializationOnGoing; + +private long materializedId; + +private MaterializedState materializedState; + +public PeriodicMaterializer( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +Snapshotable keyedStateBackend, +CheckpointStorageWorkerView checkpointStorageWorkerView, +StateChangelogWriter stateChangelogWriter, +AsyncExceptionHandler asyncExceptionHandler, +MaterializedState materializedState, +long periodicMaterializeInitDelay, +long periodicMaterialize
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r691577741 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + +/** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ +private static final CheckpointOptions CHECKPOINT_OPTIONS = +new CheckpointOptions( +CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final CheckpointStreamFactory streamFactory; + +private final Snapshotable keyedStateBackend; + +private final StateChangelogWriter stateChangelogWriter; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final int allowedNumberOfFailures; + +/** Materialization failure retries. */ +private final AtomicInteger retries; + +/** Making sure only one materialization on going at a time. */ +private final AtomicBoolean materializationOnGoing; + +private long materializedId; + +private MaterializedState materializedState; + +public PeriodicMaterializer( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +Snapshotable keyedStateBackend, +CheckpointStorageWorkerView checkpointStorageWorkerView, +StateChangelogWriter stateChangelogWriter, +AsyncExceptionHandler asyncExceptionHandler, +MaterializedState materializedState, +long periodicMaterializeInitDelay, +long periodicMaterialize
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r691573533 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + +/** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ +private static final CheckpointOptions CHECKPOINT_OPTIONS = +new CheckpointOptions( +CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final CheckpointStreamFactory streamFactory; + +private final Snapshotable keyedStateBackend; + +private final StateChangelogWriter stateChangelogWriter; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final int allowedNumberOfFailures; + +/** Materialization failure retries. */ +private final AtomicInteger retries; + +/** Making sure only one materialization on going at a time. */ +private final AtomicBoolean materializationOnGoing; + +private long materializedId; + +private MaterializedState materializedState; + +public PeriodicMaterializer( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +Snapshotable keyedStateBackend, +CheckpointStorageWorkerView checkpointStorageWorkerView, +StateChangelogWriter stateChangelogWriter, +AsyncExceptionHandler asyncExceptionHandler, +MaterializedState materializedState, +long periodicMaterializeInitDelay, +long periodicMaterialize
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r689777951 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ## @@ -329,37 +332,47 @@ public boolean deregisterKeySelectionListener(KeySelectionListener listener) // materialization may truncate only a part of the previous result and the backend would // have to split it somehow for the former option, so the latter is used. lastCheckpointId = checkpointId; -lastUploadedFrom = materializedTo; +lastUploadedFrom = periodicMaterializer.getMaterializedState().lastMaterializedTo(); lastUploadedTo = stateChangelogWriter.lastAppendedSequenceNumber().next(); LOG.debug( "snapshot for checkpoint {}, change range: {}..{}", checkpointId, lastUploadedFrom, lastUploadedTo); + +MaterializedState materializedStateCopy = periodicMaterializer.getMaterializedState(); + return toRunnableFuture( stateChangelogWriter .persist(lastUploadedFrom) -.thenApply(this::buildSnapshotResult)); +.thenApply(delta -> buildSnapshotResult(delta, materializedStateCopy))); } -private SnapshotResult buildSnapshotResult(ChangelogStateHandle delta) { -// Can be called by either task thread during the sync checkpoint phase (if persist future -// was already completed); or by the writer thread otherwise. So need to synchronize. -// todo: revisit after FLINK-21357 - use mailbox action? -synchronized (materialized) { -// collections don't change once started and handles are immutable -List prevDeltaCopy = new ArrayList<>(restoredNonMaterialized); -if (delta != null && delta.getStateSize() > 0) { -prevDeltaCopy.add(delta); -} -if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) { -return SnapshotResult.empty(); -} else { -return SnapshotResult.of( -new ChangelogStateBackendHandleImpl( -materialized, prevDeltaCopy, getKeyGroupRange())); -} +@Override +@VisibleForTesting +public void triggerMaterialization() { +periodicMaterializer.triggerMaterialization(); +} Review comment: I tried to disable materialization by commenting it in `ChangelogKeyedStateBackend.triggerMaterialization` and the test still passed, so I think just triggering is not enough. This suggests, that if we want to test it on this level, a more fine-grained control over materialization is needed; which can be gained by a direct access to the materializer in test. And if we have such access, exposing something through the backend isn't necessary. But probably it makes sense to first resolve the questions about backend-materializer interaction (like [this](https://github.com/apache/flink/pull/16606#discussion_r682621905) one). WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r689755975 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ## @@ -316,6 +319,11 @@ public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() { return operatorEventGateway; } +@Override +public ThroughputCalculator getThroughputMeter() { +return throughputCalculator; +} + Review comment: I see, I think it's fine to use the same commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r684092564 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -127,13 +128,29 @@ private boolean forceAvro = false; private long autoWatermarkInterval = 200; +// -- statebackend related configurations -- /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; +/** Interval in milliseconds to perform periodic materialization. */ +private long periodicMaterializeInterval = + StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue(); + +/** Interval in milliseconds for initial delay of periodic materialization. */ +private long periodicMaterializeInitDelay = + StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue(); + +/** Max allowed number of failures */ +private int materializationMaxAllowedFailures = + StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue(); + +/** Flag to enable periodic materialization */ +private boolean isPeriodicMaterializationEnabled = false; Review comment: Yes, I got what you meant regarding "changelog OR changelog + materialization" and that's why I proposed an alternative (big intervals). It doesn't introduce new config option (and new code) and that's why I think it's preferrable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r683485382 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java ## @@ -610,6 +612,10 @@ public void testSharedIncrementalStateDeRegistration() throws Exception { } } +@Override +@Test +public void testMaterializedRestore() {} + Review comment: Yes, but with this override even the changelog backend isn't tested: its tests inherit this empty method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r683484016 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -127,13 +128,29 @@ private boolean forceAvro = false; private long autoWatermarkInterval = 200; +// -- statebackend related configurations -- /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; +/** Interval in milliseconds to perform periodic materialization. */ +private long periodicMaterializeInterval = + StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue(); + +/** Interval in milliseconds for initial delay of periodic materialization. */ +private long periodicMaterializeInitDelay = + StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue(); + +/** Max allowed number of failures */ +private int materializationMaxAllowedFailures = + StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue(); + +/** Flag to enable periodic materialization */ +private boolean isPeriodicMaterializationEnabled = false; Review comment: I'm not sure whether we should randomize it; but it seems we can achieve it with a very big materialization interval, can't we? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r683481506 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -127,13 +128,29 @@ private boolean forceAvro = false; private long autoWatermarkInterval = 200; +// -- statebackend related configurations -- /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; +/** Interval in milliseconds to perform periodic materialization. */ +private long periodicMaterializeInterval = + StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue(); + +/** Interval in milliseconds for initial delay of periodic materialization. */ +private long periodicMaterializeInitDelay = + StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue(); Review comment: I think the interval should be enough (one less option to learn for users). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r683480068 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + +/** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ +private static final CheckpointOptions CHECKPOINT_OPTIONS = +new CheckpointOptions( +CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final CheckpointStreamFactory streamFactory; + +private final Snapshotable keyedStateBackend; + +private final StateChangelogWriter stateChangelogWriter; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final int allowedNumberOfFailures; + +/** Materialization failure retries. */ +private final AtomicInteger retries; + +/** Making sure only one materialization on going at a time. */ +private final AtomicBoolean materializationOnGoing; + +private long materializedId; + +private MaterializedState materializedState; + +public PeriodicMaterializer( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +Snapshotable keyedStateBackend, +CheckpointStorageWorkerView checkpointStorageWorkerView, +StateChangelogWriter stateChangelogWriter, +AsyncExceptionHandler asyncExceptionHandler, +MaterializedState materializedState, +long periodicMaterializeInitDelay, +long periodicMaterialize
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r682658355 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { Review comment: 1. If the existing classes are used (as commented [here](https://github.com/apache/flink/pull/16606#discussion_r682678359)); then this class doesn't need it's own package; and then it doesn't have to be public. Otherwise, annotate with `@Internal`? 2. How about renaming to something like `ChangelogMaterializer`? It doesn't have to be periodic, the essential part I think is changelog (I didn't pay much attention to names in the prototype to be honest :slightly_smiling_face: ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r682673421 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -127,13 +128,29 @@ private boolean forceAvro = false; private long autoWatermarkInterval = 200; +// -- statebackend related configurations -- /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; +/** Interval in milliseconds to perform periodic materialization. */ +private long periodicMaterializeInterval = + StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue(); + +/** Interval in milliseconds for initial delay of periodic materialization. */ +private long periodicMaterializeInitDelay = + StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue(); + +/** Max allowed number of failures */ +private int materializationMaxAllowedFailures = + StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue(); + +/** Flag to enable periodic materialization */ +private boolean isPeriodicMaterializationEnabled = false; Review comment: What are the use cases when this flag is false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r682621905 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + +/** + * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in + * CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441. + */ +private static final CheckpointOptions CHECKPOINT_OPTIONS = +new CheckpointOptions( +CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + +/** task mailbox executor, execute from Task Thread. */ +private final MailboxExecutor mailboxExecutor; + +/** Async thread pool, to complete async phase of materialization. */ +private final ExecutorService asyncOperationsThreadPool; + +/** scheduled executor, periodically trigger materialization. */ +private final ScheduledExecutorService periodicExecutor; + +private final CheckpointStreamFactory streamFactory; + +private final Snapshotable keyedStateBackend; + +private final StateChangelogWriter stateChangelogWriter; + +private final AsyncExceptionHandler asyncExceptionHandler; + +private final int allowedNumberOfFailures; + +/** Materialization failure retries. */ +private final AtomicInteger retries; + +/** Making sure only one materialization on going at a time. */ +private final AtomicBoolean materializationOnGoing; + +private long materializedId; + +private MaterializedState materializedState; + +public PeriodicMaterializer( +MailboxExecutor mailboxExecutor, +ExecutorService asyncOperationsThreadPool, +Snapshotable keyedStateBackend, +CheckpointStorageWorkerView checkpointStorageWorkerView, +StateChangelogWriter stateChangelogWriter, +AsyncExceptionHandler asyncExceptionHandler, +MaterializedState materializedState, +long periodicMaterializeInitDelay, +long periodicMaterialize
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r682624049 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/MaterializedState.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; + +import java.util.List; + +/** Materialization State, only accessed by task thread. */ +public class MaterializedState { +/** Set initially on restore and later upon materialization. */ +private final List materializedSnapshot; + +/** Updated initially on restore and later cleared upon materialization. */ +private final List restoredNonMaterialized; Review comment: I think NON-materialized state should not be placed inside the MaterializedState. I guess the reason for this is the need to update the encapsulated values atomically. I proposed one way to solve this [above](https://github.com/apache/flink/pull/16606#discussion_r682621905). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints
rkhachatryan commented on a change in pull request #16606: URL: https://github.com/apache/flink/pull/16606#discussion_r682624049 ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/MaterializedState.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; + +import java.util.List; + +/** Materialization State, only accessed by task thread. */ +public class MaterializedState { +/** Set initially on restore and later upon materialization. */ +private final List materializedSnapshot; + +/** Updated initially on restore and later cleared upon materialization. */ +private final List restoredNonMaterialized; Review comment: I think NON-materialized state should not be placed inside the MaterializedState. I guess the reason for this is the need to update the encapsulated values atomically. I proposed one way to solve this here. ## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.materializer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystemSafetyNet; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.mailbox.MailboxExecutor; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.Snapshotable; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.flink.util.Preconditions.checkState; + +/** Periodically make materialization for the delegated state backend. */ +public class PeriodicMaterializer { +private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializer.class); + +/** + * ChangelogStateB