[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-10-25 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r735428414



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##
@@ -531,18 +541,113 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
 return is;
 }
 
-private void completeRestore(Collection 
stateHandles) {
-if (!stateHandles.isEmpty()) {
-synchronized (materialized) { // ensure visibility
-for (ChangelogStateBackendHandle h : stateHandles) {
-if (h != null) {
-materialized.addAll(h.getMaterializedStateHandles());
-
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
-}
-}
+public void registerCloseable(@Nullable Closeable closeable) {
+closer.register(closeable);
+}
+
+private ChangelogSnapshotState completeRestore(
+Collection stateHandles) {
+
+List materialized = new ArrayList<>();
+List restoredNonMaterialized = new ArrayList<>();
+
+for (ChangelogStateBackendHandle h : stateHandles) {
+if (h != null) {
+materialized.addAll(h.getMaterializedStateHandles());
+
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
 }
 }
+
 changelogStates.clear();
+return new ChangelogSnapshotState(
+materialized,
+restoredNonMaterialized,
+stateChangelogWriter.initialSequenceNumber());
+}
+
+/**
+ * Initialize state materialization so that materialized data can be 
persisted durably and
+ * included into the checkpoint.
+ *
+ * This method is not thread safe. It should be called either under a 
lock or through task
+ * mailbox executor.
+ *
+ * @return a tuple of - future snapshot result from the underlying state 
backend - a {@link
+ * SequenceNumber} identifying the latest change in the changelog
+ */
+public Optional initMaterialization() throws 
Exception {
+SequenceNumber upTo = getLastAppendedTo();
+SequenceNumber lastMaterializedTo = 
changelogSnapshotState.lastMaterializedTo();
+
+LOG.info(
+"Initialize Materialization. Current changelog writers last 
append to sequence number {}",
+upTo);
+
+if (upTo.compareTo(lastMaterializedTo) > 0) {
+
+LOG.info("Starting materialization from {} : {}", 
lastMaterializedTo, upTo);
+
+return Optional.of(
+new MaterializationRunnable(
+keyedStateBackend.snapshot(
+// This ID is not needed for 
materialization;
+// But since we are re-using the 
streamFactory
+// that is designed for state backend 
snapshot,
+// which requires unique checkpoint ID.
+// A faked materialized Id is provided 
here.
+// TODO: implement its own streamFactory.
+materializedId++,
+System.currentTimeMillis(),

Review comment:
   Yes, you're right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-10-21 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r733855539



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##
@@ -531,18 +541,113 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
 return is;
 }
 
-private void completeRestore(Collection 
stateHandles) {
-if (!stateHandles.isEmpty()) {
-synchronized (materialized) { // ensure visibility
-for (ChangelogStateBackendHandle h : stateHandles) {
-if (h != null) {
-materialized.addAll(h.getMaterializedStateHandles());
-
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
-}
-}
+public void registerCloseable(@Nullable Closeable closeable) {
+closer.register(closeable);
+}
+
+private ChangelogSnapshotState completeRestore(
+Collection stateHandles) {
+
+List materialized = new ArrayList<>();
+List restoredNonMaterialized = new ArrayList<>();
+
+for (ChangelogStateBackendHandle h : stateHandles) {
+if (h != null) {
+materialized.addAll(h.getMaterializedStateHandles());
+
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
 }
 }
+
 changelogStates.clear();
+return new ChangelogSnapshotState(
+materialized,
+restoredNonMaterialized,
+stateChangelogWriter.initialSequenceNumber());
+}
+
+/**
+ * Initialize state materialization so that materialized data can be 
persisted durably and
+ * included into the checkpoint.
+ *
+ * This method is not thread safe. It should be called either under a 
lock or through task
+ * mailbox executor.
+ *
+ * @return a tuple of - future snapshot result from the underlying state 
backend - a {@link
+ * SequenceNumber} identifying the latest change in the changelog
+ */
+public Optional initMaterialization() throws 
Exception {
+SequenceNumber upTo = getLastAppendedTo();
+SequenceNumber lastMaterializedTo = 
changelogSnapshotState.lastMaterializedTo();
+
+LOG.info(
+"Initialize Materialization. Current changelog writers last 
append to sequence number {}",
+upTo);
+
+if (upTo.compareTo(lastMaterializedTo) > 0) {
+
+LOG.info("Starting materialization from {} : {}", 
lastMaterializedTo, upTo);
+
+return Optional.of(
+new MaterializationRunnable(
+keyedStateBackend.snapshot(
+// This ID is not needed for 
materialization;
+// But since we are re-using the 
streamFactory
+// that is designed for state backend 
snapshot,
+// which requires unique checkpoint ID.
+// A faked materialized Id is provided 
here.
+// TODO: implement its own streamFactory.
+materializedId++,
+System.currentTimeMillis(),

Review comment:
   WDYT about using `materializedId++` here to avoid unnecessary system 
call?

##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##
@@ -121,7 +124,8 @@
 ttlTimeProvider,
 metricGroup,
 baseHandles,
-cancelStreamRegistry));
+cancelStreamRegistry),
+cancelStreamRegistry);

Review comment:
   This argument is unused (ditto 2nd constructor).

##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See t

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-10-19 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r731608691



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final String subtaskName;
+
+private final long periodicMaterializeDelay;
+
+/** Allowed number of consecutive materialization failures. */
+private final int allowedNumberOfFailures;
+
+/** Number of consecutive materialization failures. */
+private final AtomicInteger numberOfConsecutiveFailures;
+
+private final ChangelogKeyedStateBackend keyedStateBackend;
+
+private boolean started = false;
+
+PeriodicMaterializationManager(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+String subtaskName,
+AsyncExceptionHandler asyncExceptionHandler,
+ChangelogKeyedStateBackend keyedStateBackend,
+long periodicMaterializeDelay,
+int allowedNumberOfFailures) {
+this.mailboxExecutor = checkNotNull(mailboxExecutor);
+this.asyncOperationsThreadPool = 
checkNotNull(asyncOperationsThreadPool);
+this.subtaskName = checkNotNull(subtaskName);
+this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
+this.keyedStateBackend = checkNotNull(keyedStateBackend);
+
+this.periodicMaterializeDelay = periodicMaterializeDelay;
+this.allowedNumberOfFailures = allowedNumberOfFailures;
+this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+this.periodicExecutor =
+Executors.newSingleThreadScheduledExecutor(
+new ExecutorThreadFactory(
+"periodic-materialization-scheduler-" + 
subtaskName));
+}
+
+public void start() {
+if (!started) {
+
+started = true;
+
+LOG.info(
+"Task {} starts periodic materialization, scheduling the 
next one in {} seconds",
+subtaskName,
+periodicMaterializeDelay / 1000);
+
+scheduleNextMaterialization();
+}
+}
+
+private void triggerMaterialization() {
+mailboxExecutor.execute(
+() -> {
+Optional 
materializationRunnableOptional =
+ 

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-10-18 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r730782566



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final String subtaskName;
+
+private final long periodicMaterializeDelay;
+
+/** Allowed number of consecutive materialization failures. */
+private final int allowedNumberOfFailures;
+
+/** Number of consecutive materialization failures. */
+private final AtomicInteger numberOfConsecutiveFailures;
+
+private final ChangelogKeyedStateBackend keyedStateBackend;
+
+private boolean started = false;
+
+PeriodicMaterializationManager(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+String subtaskName,
+AsyncExceptionHandler asyncExceptionHandler,
+ChangelogKeyedStateBackend keyedStateBackend,
+long periodicMaterializeDelay,
+int allowedNumberOfFailures) {
+this.mailboxExecutor = checkNotNull(mailboxExecutor);
+this.asyncOperationsThreadPool = 
checkNotNull(asyncOperationsThreadPool);
+this.subtaskName = checkNotNull(subtaskName);
+this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
+this.keyedStateBackend = checkNotNull(keyedStateBackend);
+
+this.periodicMaterializeDelay = periodicMaterializeDelay;
+this.allowedNumberOfFailures = allowedNumberOfFailures;
+this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+this.periodicExecutor =
+Executors.newSingleThreadScheduledExecutor(
+new ExecutorThreadFactory(
+"periodic-materialization-scheduler-" + 
subtaskName));
+}
+
+public void start() {
+if (!started) {
+
+started = true;
+
+LOG.info(
+"Task {} starts periodic materialization, scheduling the 
next one in {} seconds",
+subtaskName,
+periodicMaterializeDelay / 1000);
+
+scheduleNextMaterialization();
+}
+}
+
+private void triggerMaterialization() {
+mailboxExecutor.execute(
+() -> {
+Optional 
materializationRunnableOptional =
+ 

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-28 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r717260198



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+/**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest 
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details 
in FLINK-23441.
+ */
+private static final CheckpointOptions CHECKPOINT_OPTIONS =
+new CheckpointOptions(
+CheckpointType.CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final CheckpointStreamFactory streamFactory;
+
+private final Snapshotable keyedStateBackend;
+
+private final StateChangelogWriter 
stateChangelogWriter;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final int allowedNumberOfFailures;
+
+/** Materialization failure retries. */
+private final AtomicInteger retries;
+
+/** Making sure only one materialization on going at a time. */
+private final AtomicBoolean materializationOnGoing;
+
+private long materializedId;
+
+private MaterializedState materializedState;
+
+public PeriodicMaterializer(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+Snapshotable keyedStateBackend,
+CheckpointStorageWorkerView checkpointStorageWorkerView,
+StateChangelogWriter stateChangelogWriter,
+AsyncExceptionHandler asyncExceptionHandler,
+MaterializedState materializedState,
+long periodicMaterializeInitDelay,
+long periodicMaterialize

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r717269001



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/test/resources/log4j2-test.properties
##
@@ -18,11 +18,11 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review comment:
   > I will change the logging level back.
   
   Thanks!
   > BTW, what's the problem for layout.pattern? Do you mean we should remove 
it?
   
   Actually, the change is correct, non-standard pattern was used before, 
sorry. Let's keep this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r717267357



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final String subtaskName;
+
+private final long periodicMaterializeDelay;
+
+/** Allowed number of consecutive materialization failures. */
+private final int allowedNumberOfFailures;
+
+/** Number of consecutive materialization failures. */
+private final AtomicInteger numberOfConsecutiveFailures;
+
+private final ChangelogKeyedStateBackend keyedStateBackend;
+
+private boolean started = false;
+
+PeriodicMaterializationManager(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+String subtaskName,
+AsyncExceptionHandler asyncExceptionHandler,
+ChangelogKeyedStateBackend keyedStateBackend,
+long periodicMaterializeDelay,
+int allowedNumberOfFailures) {
+this.mailboxExecutor = checkNotNull(mailboxExecutor);
+this.asyncOperationsThreadPool = 
checkNotNull(asyncOperationsThreadPool);
+this.subtaskName = checkNotNull(subtaskName);
+this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
+this.keyedStateBackend = checkNotNull(keyedStateBackend);
+
+this.periodicMaterializeDelay = periodicMaterializeDelay;
+this.allowedNumberOfFailures = allowedNumberOfFailures;
+this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+this.periodicExecutor =
+Executors.newSingleThreadScheduledExecutor(
+new ExecutorThreadFactory(
+"periodic-materialization-scheduler-" + 
subtaskName));
+}
+
+public void start() {
+if (!started) {
+
+started = true;
+
+LOG.info(
+"Task {} starts periodic materialization, scheduling the 
next one in {} seconds",
+subtaskName,
+periodicMaterializeDelay / 1000);
+
+scheduleNextMaterialization();
+}
+}
+
+private void triggerMaterialization() {
+mailboxExecutor.execute(
+() -> {
+Optional 
materializationRunnableOptional =
+ 

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r717262224



##
File path: pom.xml
##
@@ -1518,7 +1518,7 @@ under the License.
random: enable it 
randomly, unless explicitly set
unset: don't alter the 
configuration
-->
-   
random
+   
on

Review comment:
   Could you then move it to a separate commit please?
   (so that it's not accidentially merged)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r717262224



##
File path: pom.xml
##
@@ -1518,7 +1518,7 @@ under the License.
random: enable it 
randomly, unless explicitly set
unset: don't alter the 
configuration
-->
-   
random
+   
on

Review comment:
   Could you then move it to a separate commit please?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r717261649



##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
##
@@ -127,13 +128,29 @@
 private boolean forceAvro = false;
 private long autoWatermarkInterval = 200;
 
+// -- statebackend related configurations 
--
 /**
  * Interval in milliseconds for sending latency tracking marks from the 
sources to the sinks.
  */
 private long latencyTrackingInterval = 
MetricOptions.LATENCY_INTERVAL.defaultValue();
 
 private boolean isLatencyTrackingConfigured = false;
 
+/** Interval in milliseconds to perform periodic materialization. */
+private long periodicMaterializeInterval =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue();
+
+/** Interval in milliseconds for initial delay of periodic 
materialization. */
+private long periodicMaterializeInitDelay =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue();
+
+/** Max allowed number of failures */
+private int materializationMaxAllowedFailures =
+
StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue();
+
+/** Flag to enable periodic materialization */
+private boolean isPeriodicMaterializationEnabled = false;

Review comment:
   Resolved by removing the flag.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r717260198



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+/**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest 
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details 
in FLINK-23441.
+ */
+private static final CheckpointOptions CHECKPOINT_OPTIONS =
+new CheckpointOptions(
+CheckpointType.CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final CheckpointStreamFactory streamFactory;
+
+private final Snapshotable keyedStateBackend;
+
+private final StateChangelogWriter 
stateChangelogWriter;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final int allowedNumberOfFailures;
+
+/** Materialization failure retries. */
+private final AtomicInteger retries;
+
+/** Making sure only one materialization on going at a time. */
+private final AtomicBoolean materializationOnGoing;
+
+private long materializedId;
+
+private MaterializedState materializedState;
+
+public PeriodicMaterializer(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+Snapshotable keyedStateBackend,
+CheckpointStorageWorkerView checkpointStorageWorkerView,
+StateChangelogWriter stateChangelogWriter,
+AsyncExceptionHandler asyncExceptionHandler,
+MaterializedState materializedState,
+long periodicMaterializeInitDelay,
+long periodicMaterialize

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716882042



##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ChangelogOptions.java
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+
+/** A collection of all configuration options that relate to changelog. */
+public class ChangelogOptions {
+
+@Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+public static final ConfigOption 
PERIODIC_MATERIALIZATION_INTERVAL =
+
ConfigOptions.key("state.backend.changelog.periodic-materialize.interval")
+.durationType()
+.defaultValue(Duration.ofMinutes(10))
+.withDescription(
+"Defines the interval in milliseconds to perform "
++ "periodic materialization for state 
backend.");
+
+@Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+public static final ConfigOption 
MATERIALIZATION_MAX_FAILURES_ALLOWED =
+ConfigOptions.key("state.backend.changelog.max.failures.allowed")

Review comment:
   How about `state.backend.changelog.max-failures` ?
   I think dots are already used here to separate sections.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716882744



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##
@@ -258,4 +259,10 @@ default void setAsyncOperationsThreadPool(ExecutorService 
executorService) {}
 default ExecutorService getAsyncOperationsThreadPool() {
 throw new UnsupportedOperationException();
 }
+
+default void setCheckpointStorageAccess(CheckpointStorageAccess 
checkpointStorageAccess) {}
+
+default CheckpointStorageAccess getCheckpointStorageAccess() {
+throw new UnsupportedOperationException();
+}

Review comment:
   I think this change (and the related ones) should belong to another 
commit (not `move AsyncExceptionHandler ...`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716882744



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##
@@ -258,4 +259,10 @@ default void setAsyncOperationsThreadPool(ExecutorService 
executorService) {}
 default ExecutorService getAsyncOperationsThreadPool() {
 throw new UnsupportedOperationException();
 }
+
+default void setCheckpointStorageAccess(CheckpointStorageAccess 
checkpointStorageAccess) {}
+
+default CheckpointStorageAccess getCheckpointStorageAccess() {
+throw new UnsupportedOperationException();
+}

Review comment:
   I think this change (and the related ones) belong to the wrong commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716883256



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/test/resources/log4j2-test.properties
##
@@ -18,11 +18,11 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review comment:
   The common approach is to disable logging by default and change level on 
demand. Can't we use it here?
   
   ditto: `layout.pattern`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716867123



##
File path: pom.xml
##
@@ -1518,7 +1518,7 @@ under the License.
random: enable it 
randomly, unless explicitly set
unset: don't alter the 
configuration
-->
-   
random
+   
on

Review comment:
   Is this change temporary and should be dropped before merging?

##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final String subtaskName;
+
+private final long periodicMaterializeDelay;
+
+/** Allowed number of consecutive materialization failures. */
+private final int allowedNumberOfFailures;
+
+/** Number of consecutive materialization failures. */
+private final AtomicInteger numberOfConsecutiveFailures;
+
+private final ChangelogKeyedStateBackend keyedStateBackend;
+
+private boolean started = false;
+
+PeriodicMaterializationManager(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+String subtaskName,
+AsyncExceptionHandler asyncExceptionHandler,
+ChangelogKeyedStateBackend keyedStateBackend,
+long periodicMaterializeDelay,
+int allowedNumberOfFailures) {
+this.mailboxExecutor = checkNotNull(mailboxExecutor);
+this.asyncOperationsThreadPool = 
checkNotNull(asyncOperationsThreadPool);
+this.subtaskName = checkNotNull(subtaskName);
+this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
+this.keyedStateBackend = checkNotNull(keyedStateBackend);
+
+this.periodicMaterializeDelay = periodicMaterializeDelay;
+this.allowedNumberOfFailures = allowedNumberOfFailures;
+this.numberOfConsecutiveFailures = new AtomicInteger(0);
+
+this.periodicExecutor =
+Executors.newSingleThreadScheduledExecutor(
+new ExecutorThreadFactory(
+"periodic-materialization-scheduler-" + 
subtaskName));
+}
+
+public

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716642699



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final String subtaskName;
+
+private final long periodicMaterializeDelay;
+
+/** Allowed number of consecutive materialization failures. */
+private final int allowedNumberOfFailures;
+
+/** Number of consecutive materialization failures. */
+private final AtomicInteger numberOfConsecutiveFailures;
+
+private final ChangelogKeyedStateBackend keyedStateBackend;
+
+PeriodicMaterializationManager(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+String subtaskName,
+AsyncExceptionHandler asyncExceptionHandler,
+long periodicMaterializeDelay,
+int allowedNumberOfFailures,
+ChangelogKeyedStateBackend keyedStateBackend) {
+this.mailboxExecutor = mailboxExecutor;
+this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+
+this.subtaskName = subtaskName;
+this.periodicMaterializeDelay = periodicMaterializeDelay;
+this.asyncExceptionHandler = asyncExceptionHandler;
+this.allowedNumberOfFailures = allowedNumberOfFailures;
+this.numberOfConsecutiveFailures = new AtomicInteger(0);
+this.keyedStateBackend = keyedStateBackend;
+
+this.periodicExecutor =
+Executors.newSingleThreadScheduledExecutor(
+new ExecutorThreadFactory(
+"periodic-materialization-scheduler-" + 
subtaskName));
+}
+
+public void start() {

Review comment:
   It's not called twice currently - and I'm proposing to enforce this; so 
that it's not re-used for example.
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716642699



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final String subtaskName;
+
+private final long periodicMaterializeDelay;
+
+/** Allowed number of consecutive materialization failures. */
+private final int allowedNumberOfFailures;
+
+/** Number of consecutive materialization failures. */
+private final AtomicInteger numberOfConsecutiveFailures;
+
+private final ChangelogKeyedStateBackend keyedStateBackend;
+
+PeriodicMaterializationManager(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+String subtaskName,
+AsyncExceptionHandler asyncExceptionHandler,
+long periodicMaterializeDelay,
+int allowedNumberOfFailures,
+ChangelogKeyedStateBackend keyedStateBackend) {
+this.mailboxExecutor = mailboxExecutor;
+this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+
+this.subtaskName = subtaskName;
+this.periodicMaterializeDelay = periodicMaterializeDelay;
+this.asyncExceptionHandler = asyncExceptionHandler;
+this.allowedNumberOfFailures = allowedNumberOfFailures;
+this.numberOfConsecutiveFailures = new AtomicInteger(0);
+this.keyedStateBackend = keyedStateBackend;
+
+this.periodicExecutor =
+Executors.newSingleThreadScheduledExecutor(
+new ExecutorThreadFactory(
+"periodic-materialization-scheduler-" + 
subtaskName));
+}
+
+public void start() {

Review comment:
   It's not called twice currently - and I'm proposing to enforce this; so 
that it's not re-used for example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-27 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716640216



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##
@@ -531,18 +539,76 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
 return is;
 }
 
-private void completeRestore(Collection 
stateHandles) {
-if (!stateHandles.isEmpty()) {
-synchronized (materialized) { // ensure visibility
-for (ChangelogStateBackendHandle h : stateHandles) {
-if (h != null) {
-materialized.addAll(h.getMaterializedStateHandles());
-
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
-}
-}
+private ChangelogSnapshotState completeRestore(
+Collection stateHandles) {
+
+List materialized = new ArrayList<>();
+List restoredNonMaterialized = new ArrayList<>();
+
+for (ChangelogStateBackendHandle h : stateHandles) {
+if (h != null) {
+materialized.addAll(h.getMaterializedStateHandles());
+
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
 }
 }
+
 changelogStates.clear();
+return new ChangelogSnapshotState(
+materialized,
+restoredNonMaterialized,
+stateChangelogWriter.initialSequenceNumber());
+}
+
+/**
+ * Initialize state materialization so that materialized data can be 
persisted durably and
+ * included into the checkpoint.
+ *
+ * This method is not thread safe. It should be called either under a 
lock or through task
+ * mailbox executor.
+ *
+ * @return a tuple of - future snapshot result from the underlying state 
backend - a {@link
+ * SequenceNumber} identifying the latest change in the changelog
+ */
+public Optional initMaterialization() throws 
Exception {
+SequenceNumber upTo = 
stateChangelogWriter.lastAppendedSequenceNumber();
+
+if (upTo.compareTo(changelogSnapshotState.lastMaterializedTo()) > 0) {
+return Optional.of(
+new MaterializationRunnable(
+keyedStateBackend.snapshot(
+// This ID is not needed for 
materialization;
+// But since we are re-using the 
streamFactory
+// that is designed for state backend 
snapshot,
+// which requires unique checkpoint ID.
+// A faked materialized Id is provided 
here.
+// TODO: implement its own streamFactory.
+materializedId++,
+System.currentTimeMillis(),
+streamFactory,
+CHECKPOINT_OPTIONS),
+// TODO: add metadata to log FLINK-23170
+upTo));
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * This method is not thread safe. It should be called either under a lock 
or through task
+ * mailbox executor.
+ */
+public void updateChangelogSnapshotState(
+SnapshotResult materializedSnapshot, 
SequenceNumber upTo) {
+changelogSnapshotState =
+new ChangelogSnapshotState(
+getMaterializedResult(materializedSnapshot), new 
ArrayList<>(), upTo);

Review comment:
   I mean a method like this in `ChangelogSnapshotState` class:
   ```
   public static ChangelogSnapshotState materialized(
   SnapshotResult snapshot, SequenceNumber 
upTo) {
   return new ChangelogSnapshotState(
   getMaterializedResult(snapshot), 
Collections.emptyList(), upTo);
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-26 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716222670



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final String subtaskName;
+
+private final long periodicMaterializeDelay;
+
+/** Allowed number of consecutive materialization failures. */
+private final int allowedNumberOfFailures;
+
+/** Number of consecutive materialization failures. */
+private final AtomicInteger numberOfConsecutiveFailures;
+
+private final ChangelogKeyedStateBackend keyedStateBackend;
+
+PeriodicMaterializationManager(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+String subtaskName,
+AsyncExceptionHandler asyncExceptionHandler,
+long periodicMaterializeDelay,
+int allowedNumberOfFailures,
+ChangelogKeyedStateBackend keyedStateBackend) {
+this.mailboxExecutor = mailboxExecutor;
+this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+
+this.subtaskName = subtaskName;
+this.periodicMaterializeDelay = periodicMaterializeDelay;
+this.asyncExceptionHandler = asyncExceptionHandler;
+this.allowedNumberOfFailures = allowedNumberOfFailures;
+this.numberOfConsecutiveFailures = new AtomicInteger(0);
+this.keyedStateBackend = keyedStateBackend;
+
+this.periodicExecutor =
+Executors.newSingleThreadScheduledExecutor(
+new ExecutorThreadFactory(
+"periodic-materialization-scheduler-" + 
subtaskName));
+}
+
+public void start() {
+LOG.info(
+"Task {} starts periodic materialization, scheduling the next 
one in {} seconds",
+subtaskName,
+periodicMaterializeDelay / 1000);
+
+scheduleNextMaterialization();
+}
+
+private void triggerMaterialization() {
+mailboxExecutor.execute(
+() -> {
+Optional 
materializationRunnableOptional =
+keyedStateBackend.initMaterialization();
+
+if (materializationRunnableOptional.isPresent()) {
+MaterializationRunnable runnable = 
materializationRunnableOptional.get();
+asyn

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-26 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716222836



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Stateless Materialization Manager. */
+public class PeriodicMaterializationManager implements Closeable {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializationManager.class);
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final String subtaskName;
+
+private final long periodicMaterializeDelay;
+
+/** Allowed number of consecutive materialization failures. */
+private final int allowedNumberOfFailures;
+
+/** Number of consecutive materialization failures. */
+private final AtomicInteger numberOfConsecutiveFailures;
+
+private final ChangelogKeyedStateBackend keyedStateBackend;
+
+PeriodicMaterializationManager(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+String subtaskName,
+AsyncExceptionHandler asyncExceptionHandler,
+long periodicMaterializeDelay,
+int allowedNumberOfFailures,
+ChangelogKeyedStateBackend keyedStateBackend) {
+this.mailboxExecutor = mailboxExecutor;
+this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+
+this.subtaskName = subtaskName;
+this.periodicMaterializeDelay = periodicMaterializeDelay;
+this.asyncExceptionHandler = asyncExceptionHandler;
+this.allowedNumberOfFailures = allowedNumberOfFailures;
+this.numberOfConsecutiveFailures = new AtomicInteger(0);
+this.keyedStateBackend = keyedStateBackend;
+
+this.periodicExecutor =
+Executors.newSingleThreadScheduledExecutor(
+new ExecutorThreadFactory(
+"periodic-materialization-scheduler-" + 
subtaskName));
+}
+
+public void start() {
+LOG.info(
+"Task {} starts periodic materialization, scheduling the next 
one in {} seconds",
+subtaskName,
+periodicMaterializeDelay / 1000);
+
+scheduleNextMaterialization();
+}
+
+private void triggerMaterialization() {
+mailboxExecutor.execute(
+() -> {
+Optional 
materializationRunnableOptional =
+keyedStateBackend.initMaterialization();
+
+if (materializationRunnableOptional.isPresent()) {
+MaterializationRunnable runnable = 
materializationRunnableOptional.get();
+asyn

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-26 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r71628



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##
@@ -531,18 +539,76 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
 return is;
 }
 
-private void completeRestore(Collection 
stateHandles) {
-if (!stateHandles.isEmpty()) {
-synchronized (materialized) { // ensure visibility
-for (ChangelogStateBackendHandle h : stateHandles) {
-if (h != null) {
-materialized.addAll(h.getMaterializedStateHandles());
-
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
-}
-}
+private ChangelogSnapshotState completeRestore(
+Collection stateHandles) {
+
+List materialized = new ArrayList<>();
+List restoredNonMaterialized = new ArrayList<>();
+
+for (ChangelogStateBackendHandle h : stateHandles) {
+if (h != null) {
+materialized.addAll(h.getMaterializedStateHandles());
+
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
 }
 }
+
 changelogStates.clear();
+return new ChangelogSnapshotState(
+materialized,
+restoredNonMaterialized,
+stateChangelogWriter.initialSequenceNumber());
+}
+
+/**
+ * Initialize state materialization so that materialized data can be 
persisted durably and
+ * included into the checkpoint.
+ *
+ * This method is not thread safe. It should be called either under a 
lock or through task
+ * mailbox executor.
+ *
+ * @return a tuple of - future snapshot result from the underlying state 
backend - a {@link
+ * SequenceNumber} identifying the latest change in the changelog
+ */
+public Optional initMaterialization() throws 
Exception {
+SequenceNumber upTo = 
stateChangelogWriter.lastAppendedSequenceNumber();
+
+if (upTo.compareTo(changelogSnapshotState.lastMaterializedTo()) > 0) {
+return Optional.of(
+new MaterializationRunnable(
+keyedStateBackend.snapshot(
+// This ID is not needed for 
materialization;
+// But since we are re-using the 
streamFactory
+// that is designed for state backend 
snapshot,
+// which requires unique checkpoint ID.
+// A faked materialized Id is provided 
here.
+// TODO: implement its own streamFactory.
+materializedId++,
+System.currentTimeMillis(),
+streamFactory,
+CHECKPOINT_OPTIONS),
+// TODO: add metadata to log FLINK-23170
+upTo));
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * This method is not thread safe. It should be called either under a lock 
or through task
+ * mailbox executor.
+ */
+public void updateChangelogSnapshotState(
+SnapshotResult materializedSnapshot, 
SequenceNumber upTo) {
+changelogSnapshotState =
+new ChangelogSnapshotState(
+getMaterializedResult(materializedSnapshot), new 
ArrayList<>(), upTo);

Review comment:
   nit: `Collections.emptyList`?
   nit: factory method with two args?
   

##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##
@@ -625,4 +691,52 @@ public T get(long timeout, TimeUnit unit)
 }
 };
 }
+
+/**
+ * Snapshot State for ChangelogKeyedStatebackend.
+ *
+ * It includes three parts: - materialized snapshot from the underlying 
delegated state
+ * backend - non-materialized part in the current changelog - 
non-materialized changelog, from
+ * previous logs (before failover or rescaling)
+ */
+private static class ChangelogSnapshotState {
+/**
+ * Materialized snapshot from the underlying delegated state backend. 
Set initially on
+ * restore and later upon materialization.
+ */
+private final List materializedSnapshot;
+
+/**
+ * The {@link SequenceNumber} up to which the state is materialized, 
exclusive. This
+ * indicates the non-materialized part of the current changelog.
+ */
+private final SequenceNumber mater

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-07 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r703330721



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##
@@ -329,37 +332,47 @@ public boolean 
deregisterKeySelectionListener(KeySelectionListener listener)
 // materialization may truncate only a part of the previous result and 
the backend would
 // have to split it somehow for the former option, so the latter is 
used.
 lastCheckpointId = checkpointId;
-lastUploadedFrom = materializedTo;
+lastUploadedFrom = 
periodicMaterializer.getMaterializedState().lastMaterializedTo();
 lastUploadedTo = 
stateChangelogWriter.lastAppendedSequenceNumber().next();
 
 LOG.debug(
 "snapshot for checkpoint {}, change range: {}..{}",
 checkpointId,
 lastUploadedFrom,
 lastUploadedTo);
+
+MaterializedState materializedStateCopy = 
periodicMaterializer.getMaterializedState();
+
 return toRunnableFuture(
 stateChangelogWriter
 .persist(lastUploadedFrom)
-.thenApply(this::buildSnapshotResult));
+.thenApply(delta -> buildSnapshotResult(delta, 
materializedStateCopy)));
 }
 
-private SnapshotResult 
buildSnapshotResult(ChangelogStateHandle delta) {
-// Can be called by either task thread during the sync checkpoint 
phase (if persist future
-// was already completed); or by the writer thread otherwise. So need 
to synchronize.
-// todo: revisit after FLINK-21357 - use mailbox action?
-synchronized (materialized) {
-// collections don't change once started and handles are immutable
-List prevDeltaCopy = new 
ArrayList<>(restoredNonMaterialized);
-if (delta != null && delta.getStateSize() > 0) {
-prevDeltaCopy.add(delta);
-}
-if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) {
-return SnapshotResult.empty();
-} else {
-return SnapshotResult.of(
-new ChangelogStateBackendHandleImpl(
-materialized, prevDeltaCopy, 
getKeyGroupRange()));
-}
+@Override
+@VisibleForTesting
+public void triggerMaterialization() {
+periodicMaterializer.triggerMaterialization();
+}

Review comment:
   As we discussed offline, this only works because a direct executor is 
used when creating materilizier (by "works" I mean the call waits for the 
materialization).
   Please add a comment at the very least.
   
   However, I still think the test or production code structure is wrong. With 
a proper separation between materializer and backend, we should be able to 
listen for materialization completion and take a snapshot only after that 
(instead of relying on the internal materializer behavior).
   
   edit:
   Can we just poll materializer in a loop and wait for materialization to 
happen AND complete?
   (So that we don't explicitly call `triggerMaterialization` and don't have 
this method)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-09-07 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r703330721



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##
@@ -329,37 +332,47 @@ public boolean 
deregisterKeySelectionListener(KeySelectionListener listener)
 // materialization may truncate only a part of the previous result and 
the backend would
 // have to split it somehow for the former option, so the latter is 
used.
 lastCheckpointId = checkpointId;
-lastUploadedFrom = materializedTo;
+lastUploadedFrom = 
periodicMaterializer.getMaterializedState().lastMaterializedTo();
 lastUploadedTo = 
stateChangelogWriter.lastAppendedSequenceNumber().next();
 
 LOG.debug(
 "snapshot for checkpoint {}, change range: {}..{}",
 checkpointId,
 lastUploadedFrom,
 lastUploadedTo);
+
+MaterializedState materializedStateCopy = 
periodicMaterializer.getMaterializedState();
+
 return toRunnableFuture(
 stateChangelogWriter
 .persist(lastUploadedFrom)
-.thenApply(this::buildSnapshotResult));
+.thenApply(delta -> buildSnapshotResult(delta, 
materializedStateCopy)));
 }
 
-private SnapshotResult 
buildSnapshotResult(ChangelogStateHandle delta) {
-// Can be called by either task thread during the sync checkpoint 
phase (if persist future
-// was already completed); or by the writer thread otherwise. So need 
to synchronize.
-// todo: revisit after FLINK-21357 - use mailbox action?
-synchronized (materialized) {
-// collections don't change once started and handles are immutable
-List prevDeltaCopy = new 
ArrayList<>(restoredNonMaterialized);
-if (delta != null && delta.getStateSize() > 0) {
-prevDeltaCopy.add(delta);
-}
-if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) {
-return SnapshotResult.empty();
-} else {
-return SnapshotResult.of(
-new ChangelogStateBackendHandleImpl(
-materialized, prevDeltaCopy, 
getKeyGroupRange()));
-}
+@Override
+@VisibleForTesting
+public void triggerMaterialization() {
+periodicMaterializer.triggerMaterialization();
+}

Review comment:
   As we discussed offline, this only works because a direct executor is 
used when creating materilizier (by "works" I mean the call waits for the 
materialization).
   Please add a comment at the very least.
   
   However, I still think the test or production code structure is wrong. With 
a proper separation between materializer and backend, we should be able to 
listen for materialization completion and take a snapshot only after that 
(instead of relying on the internal materializer behavior).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-18 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r691590608



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+/**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest 
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details 
in FLINK-23441.
+ */
+private static final CheckpointOptions CHECKPOINT_OPTIONS =
+new CheckpointOptions(
+CheckpointType.CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final CheckpointStreamFactory streamFactory;
+
+private final Snapshotable keyedStateBackend;
+
+private final StateChangelogWriter 
stateChangelogWriter;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final int allowedNumberOfFailures;
+
+/** Materialization failure retries. */
+private final AtomicInteger retries;
+
+/** Making sure only one materialization on going at a time. */
+private final AtomicBoolean materializationOnGoing;
+
+private long materializedId;
+
+private MaterializedState materializedState;
+
+public PeriodicMaterializer(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+Snapshotable keyedStateBackend,
+CheckpointStorageWorkerView checkpointStorageWorkerView,
+StateChangelogWriter stateChangelogWriter,
+AsyncExceptionHandler asyncExceptionHandler,
+MaterializedState materializedState,
+long periodicMaterializeInitDelay,
+long periodicMaterialize

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-18 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r691587190



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+/**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest 
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details 
in FLINK-23441.
+ */
+private static final CheckpointOptions CHECKPOINT_OPTIONS =
+new CheckpointOptions(
+CheckpointType.CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final CheckpointStreamFactory streamFactory;
+
+private final Snapshotable keyedStateBackend;
+
+private final StateChangelogWriter 
stateChangelogWriter;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final int allowedNumberOfFailures;
+
+/** Materialization failure retries. */
+private final AtomicInteger retries;
+
+/** Making sure only one materialization on going at a time. */
+private final AtomicBoolean materializationOnGoing;
+
+private long materializedId;
+
+private MaterializedState materializedState;
+
+public PeriodicMaterializer(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+Snapshotable keyedStateBackend,
+CheckpointStorageWorkerView checkpointStorageWorkerView,
+StateChangelogWriter stateChangelogWriter,
+AsyncExceptionHandler asyncExceptionHandler,
+MaterializedState materializedState,
+long periodicMaterializeInitDelay,
+long periodicMaterialize

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-18 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r691577741



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+/**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest 
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details 
in FLINK-23441.
+ */
+private static final CheckpointOptions CHECKPOINT_OPTIONS =
+new CheckpointOptions(
+CheckpointType.CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final CheckpointStreamFactory streamFactory;
+
+private final Snapshotable keyedStateBackend;
+
+private final StateChangelogWriter 
stateChangelogWriter;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final int allowedNumberOfFailures;
+
+/** Materialization failure retries. */
+private final AtomicInteger retries;
+
+/** Making sure only one materialization on going at a time. */
+private final AtomicBoolean materializationOnGoing;
+
+private long materializedId;
+
+private MaterializedState materializedState;
+
+public PeriodicMaterializer(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+Snapshotable keyedStateBackend,
+CheckpointStorageWorkerView checkpointStorageWorkerView,
+StateChangelogWriter stateChangelogWriter,
+AsyncExceptionHandler asyncExceptionHandler,
+MaterializedState materializedState,
+long periodicMaterializeInitDelay,
+long periodicMaterialize

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-18 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r691573533



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+/**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest 
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details 
in FLINK-23441.
+ */
+private static final CheckpointOptions CHECKPOINT_OPTIONS =
+new CheckpointOptions(
+CheckpointType.CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final CheckpointStreamFactory streamFactory;
+
+private final Snapshotable keyedStateBackend;
+
+private final StateChangelogWriter 
stateChangelogWriter;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final int allowedNumberOfFailures;
+
+/** Materialization failure retries. */
+private final AtomicInteger retries;
+
+/** Making sure only one materialization on going at a time. */
+private final AtomicBoolean materializationOnGoing;
+
+private long materializedId;
+
+private MaterializedState materializedState;
+
+public PeriodicMaterializer(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+Snapshotable keyedStateBackend,
+CheckpointStorageWorkerView checkpointStorageWorkerView,
+StateChangelogWriter stateChangelogWriter,
+AsyncExceptionHandler asyncExceptionHandler,
+MaterializedState materializedState,
+long periodicMaterializeInitDelay,
+long periodicMaterialize

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-16 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r689777951



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##
@@ -329,37 +332,47 @@ public boolean 
deregisterKeySelectionListener(KeySelectionListener listener)
 // materialization may truncate only a part of the previous result and 
the backend would
 // have to split it somehow for the former option, so the latter is 
used.
 lastCheckpointId = checkpointId;
-lastUploadedFrom = materializedTo;
+lastUploadedFrom = 
periodicMaterializer.getMaterializedState().lastMaterializedTo();
 lastUploadedTo = 
stateChangelogWriter.lastAppendedSequenceNumber().next();
 
 LOG.debug(
 "snapshot for checkpoint {}, change range: {}..{}",
 checkpointId,
 lastUploadedFrom,
 lastUploadedTo);
+
+MaterializedState materializedStateCopy = 
periodicMaterializer.getMaterializedState();
+
 return toRunnableFuture(
 stateChangelogWriter
 .persist(lastUploadedFrom)
-.thenApply(this::buildSnapshotResult));
+.thenApply(delta -> buildSnapshotResult(delta, 
materializedStateCopy)));
 }
 
-private SnapshotResult 
buildSnapshotResult(ChangelogStateHandle delta) {
-// Can be called by either task thread during the sync checkpoint 
phase (if persist future
-// was already completed); or by the writer thread otherwise. So need 
to synchronize.
-// todo: revisit after FLINK-21357 - use mailbox action?
-synchronized (materialized) {
-// collections don't change once started and handles are immutable
-List prevDeltaCopy = new 
ArrayList<>(restoredNonMaterialized);
-if (delta != null && delta.getStateSize() > 0) {
-prevDeltaCopy.add(delta);
-}
-if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) {
-return SnapshotResult.empty();
-} else {
-return SnapshotResult.of(
-new ChangelogStateBackendHandleImpl(
-materialized, prevDeltaCopy, 
getKeyGroupRange()));
-}
+@Override
+@VisibleForTesting
+public void triggerMaterialization() {
+periodicMaterializer.triggerMaterialization();
+}

Review comment:
   I tried to disable materialization by commenting it in 
`ChangelogKeyedStateBackend.triggerMaterialization` and the test still passed, 
so I think just triggering is not enough.
   
   This suggests, that if we want to test it on this level, a more fine-grained 
control over materialization is needed; which can be gained by a direct access 
to the materializer in test.
   
   And if we have such access, exposing something through the backend isn't 
necessary.
   But probably it makes sense to first resolve the questions about 
backend-materializer interaction (like 
[this](https://github.com/apache/flink/pull/16606#discussion_r682621905) one).
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-16 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r689755975



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
##
@@ -316,6 +319,11 @@ public TaskOperatorEventGateway 
getOperatorCoordinatorEventGateway() {
 return operatorEventGateway;
 }
 
+@Override
+public ThroughputCalculator getThroughputMeter() {
+return throughputCalculator;
+}
+

Review comment:
   I see, I think it's fine to use the same commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-06 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r684092564



##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
##
@@ -127,13 +128,29 @@
 private boolean forceAvro = false;
 private long autoWatermarkInterval = 200;
 
+// -- statebackend related configurations 
--
 /**
  * Interval in milliseconds for sending latency tracking marks from the 
sources to the sinks.
  */
 private long latencyTrackingInterval = 
MetricOptions.LATENCY_INTERVAL.defaultValue();
 
 private boolean isLatencyTrackingConfigured = false;
 
+/** Interval in milliseconds to perform periodic materialization. */
+private long periodicMaterializeInterval =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue();
+
+/** Interval in milliseconds for initial delay of periodic 
materialization. */
+private long periodicMaterializeInitDelay =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue();
+
+/** Max allowed number of failures */
+private int materializationMaxAllowedFailures =
+
StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue();
+
+/** Flag to enable periodic materialization */
+private boolean isPeriodicMaterializationEnabled = false;

Review comment:
   Yes, I got what you meant regarding "changelog OR changelog + 
materialization" and that's why I proposed an alternative (big intervals).
   It doesn't introduce new config option (and new code) and that's why I think 
it's preferrable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-05 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r683485382



##
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
##
@@ -610,6 +612,10 @@ public void testSharedIncrementalStateDeRegistration() 
throws Exception {
 }
 }
 
+@Override
+@Test
+public void testMaterializedRestore() {}
+

Review comment:
   Yes, but with this override even the changelog backend isn't tested: its 
tests inherit this empty method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-05 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r683484016



##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
##
@@ -127,13 +128,29 @@
 private boolean forceAvro = false;
 private long autoWatermarkInterval = 200;
 
+// -- statebackend related configurations 
--
 /**
  * Interval in milliseconds for sending latency tracking marks from the 
sources to the sinks.
  */
 private long latencyTrackingInterval = 
MetricOptions.LATENCY_INTERVAL.defaultValue();
 
 private boolean isLatencyTrackingConfigured = false;
 
+/** Interval in milliseconds to perform periodic materialization. */
+private long periodicMaterializeInterval =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue();
+
+/** Interval in milliseconds for initial delay of periodic 
materialization. */
+private long periodicMaterializeInitDelay =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue();
+
+/** Max allowed number of failures */
+private int materializationMaxAllowedFailures =
+
StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue();
+
+/** Flag to enable periodic materialization */
+private boolean isPeriodicMaterializationEnabled = false;

Review comment:
   I'm not sure whether we should randomize it; but it seems we can achieve 
it with a very big materialization interval, can't we?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-05 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r683481506



##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
##
@@ -127,13 +128,29 @@
 private boolean forceAvro = false;
 private long autoWatermarkInterval = 200;
 
+// -- statebackend related configurations 
--
 /**
  * Interval in milliseconds for sending latency tracking marks from the 
sources to the sinks.
  */
 private long latencyTrackingInterval = 
MetricOptions.LATENCY_INTERVAL.defaultValue();
 
 private boolean isLatencyTrackingConfigured = false;
 
+/** Interval in milliseconds to perform periodic materialization. */
+private long periodicMaterializeInterval =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue();
+
+/** Interval in milliseconds for initial delay of periodic 
materialization. */
+private long periodicMaterializeInitDelay =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue();

Review comment:
   I think the interval should be enough (one less option to learn for 
users).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-05 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r683480068



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+/**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest 
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details 
in FLINK-23441.
+ */
+private static final CheckpointOptions CHECKPOINT_OPTIONS =
+new CheckpointOptions(
+CheckpointType.CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final CheckpointStreamFactory streamFactory;
+
+private final Snapshotable keyedStateBackend;
+
+private final StateChangelogWriter 
stateChangelogWriter;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final int allowedNumberOfFailures;
+
+/** Materialization failure retries. */
+private final AtomicInteger retries;
+
+/** Making sure only one materialization on going at a time. */
+private final AtomicBoolean materializationOnGoing;
+
+private long materializedId;
+
+private MaterializedState materializedState;
+
+public PeriodicMaterializer(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+Snapshotable keyedStateBackend,
+CheckpointStorageWorkerView checkpointStorageWorkerView,
+StateChangelogWriter stateChangelogWriter,
+AsyncExceptionHandler asyncExceptionHandler,
+MaterializedState materializedState,
+long periodicMaterializeInitDelay,
+long periodicMaterialize

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-04 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r682658355



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {

Review comment:
   1. If the existing classes are used (as commented 
[here](https://github.com/apache/flink/pull/16606#discussion_r682678359)); then 
this class doesn't need it's own package; and then it doesn't have to be public.
   Otherwise, annotate with `@Internal`?
   
   2. How about renaming to something like `ChangelogMaterializer`?  It doesn't 
have to be periodic, the essential part I think is changelog 
   (I didn't pay much attention to names in the prototype to be honest 
:slightly_smiling_face: )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-04 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r682673421



##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
##
@@ -127,13 +128,29 @@
 private boolean forceAvro = false;
 private long autoWatermarkInterval = 200;
 
+// -- statebackend related configurations 
--
 /**
  * Interval in milliseconds for sending latency tracking marks from the 
sources to the sinks.
  */
 private long latencyTrackingInterval = 
MetricOptions.LATENCY_INTERVAL.defaultValue();
 
 private boolean isLatencyTrackingConfigured = false;
 
+/** Interval in milliseconds to perform periodic materialization. */
+private long periodicMaterializeInterval =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue();
+
+/** Interval in milliseconds for initial delay of periodic 
materialization. */
+private long periodicMaterializeInitDelay =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue();
+
+/** Max allowed number of failures */
+private int materializationMaxAllowedFailures =
+
StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue();
+
+/** Flag to enable periodic materialization */
+private boolean isPeriodicMaterializationEnabled = false;

Review comment:
   What are the use cases when this flag is false?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-04 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r682621905



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+/**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest 
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details 
in FLINK-23441.
+ */
+private static final CheckpointOptions CHECKPOINT_OPTIONS =
+new CheckpointOptions(
+CheckpointType.CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+
+/** task mailbox executor, execute from Task Thread. */
+private final MailboxExecutor mailboxExecutor;
+
+/** Async thread pool, to complete async phase of materialization. */
+private final ExecutorService asyncOperationsThreadPool;
+
+/** scheduled executor, periodically trigger materialization. */
+private final ScheduledExecutorService periodicExecutor;
+
+private final CheckpointStreamFactory streamFactory;
+
+private final Snapshotable keyedStateBackend;
+
+private final StateChangelogWriter 
stateChangelogWriter;
+
+private final AsyncExceptionHandler asyncExceptionHandler;
+
+private final int allowedNumberOfFailures;
+
+/** Materialization failure retries. */
+private final AtomicInteger retries;
+
+/** Making sure only one materialization on going at a time. */
+private final AtomicBoolean materializationOnGoing;
+
+private long materializedId;
+
+private MaterializedState materializedState;
+
+public PeriodicMaterializer(
+MailboxExecutor mailboxExecutor,
+ExecutorService asyncOperationsThreadPool,
+Snapshotable keyedStateBackend,
+CheckpointStorageWorkerView checkpointStorageWorkerView,
+StateChangelogWriter stateChangelogWriter,
+AsyncExceptionHandler asyncExceptionHandler,
+MaterializedState materializedState,
+long periodicMaterializeInitDelay,
+long periodicMaterialize

[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-04 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r682624049



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/MaterializedState.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+
+import java.util.List;
+
+/** Materialization State, only accessed by task thread. */
+public class MaterializedState {
+/** Set initially on restore and later upon materialization. */
+private final List materializedSnapshot;
+
+/** Updated initially on restore and later cleared upon materialization. */
+private final List restoredNonMaterialized;

Review comment:
   I think NON-materialized state should not be placed inside the 
MaterializedState.
   I guess the reason for this is the need to update the encapsulated values 
atomically.
   I proposed one way to solve this 
[above](https://github.com/apache/flink/pull/16606#discussion_r682621905).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #16606: [FLINK-21357][runtime/statebackend]Periodic materialization for generalized incremental checkpoints

2021-08-04 Thread GitBox


rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r682624049



##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/MaterializedState.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+
+import java.util.List;
+
+/** Materialization State, only accessed by task thread. */
+public class MaterializedState {
+/** Set initially on restore and later upon materialization. */
+private final List materializedSnapshot;
+
+/** Updated initially on restore and later cleared upon materialization. */
+private final List restoredNonMaterialized;

Review comment:
   I think NON-materialized state should not be placed inside the 
MaterializedState.
   I guess the reason for this is the need to update the encapsulated values 
atomically.
   I proposed one way to solve this here.

##
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+/**
+ * ChangelogStateB