[jira] [Created] (FLINK-8544) JSONKeyValueDeserializationSchema throws NPE when message key is null
Bill Lee created FLINK-8544: --- Summary: JSONKeyValueDeserializationSchema throws NPE when message key is null Key: FLINK-8544 URL: https://issues.apache.org/jira/browse/FLINK-8544 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.0 Reporter: Bill Lee JSONKeyValueDeserializationSchema call Jaskon to deserialize the message key without validation. If a message with key == null is read, flink throws an NPE. {code:java} @Override public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { if (mapper == null) { mapper = new ObjectMapper(); } ObjectNode node = mapper.createObjectNode(); node.set("key", mapper.readValue(messageKey, JsonNode.class)); // messageKey is not validate against null. node.set("value", mapper.readValue(message, JsonNode.class)); {code} The fix is very straightforward. {code:java} if (messageKey == null) { node.set("key", null) } else { node.set("key", mapper.readValue(messageKey, JsonNode.class)); } {code} If it is appreciated, I would send a pull request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chris snow updated FLINK-8543: -- Description: I'm hitting an issue with my BucketingSink from a streaming job. {code:java} return new BucketingSink>(path) .setWriter(writer) .setBucketer(new DateTimeBucketer>(formatString)); {code} I can see that a few files have run into issues with uploading to S3: !Screen Shot 2018-01-30 at 18.34.51.png! The Flink console output is showing an exception being thrown by S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster and added some additional logging to the checkOpen() method to log the 'key' just before the exception is thrown: {code:java} /* * Decompiled with CFR. */ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; import com.amazonaws.event.ProgressListener; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.transfer.Upload; import com.amazonaws.services.s3.transfer.model.UploadResult; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.ProgressableProgressListener; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @InterfaceAudience.Private @InterfaceStability.Evolving public class S3AOutputStream extends OutputStream { private final OutputStream backupStream; private final File backupFile; private final AtomicBoolean closed = new AtomicBoolean(false); private final String key; private final Progressable progress; private final S3AFileSystem fs; public static final Logger LOG = S3AFileSystem.LOG; public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException { this.key = key; this.progress = progress; this.fs = fs; this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); LOG.debug("OutputStream for key '{}' writing to tempfile: {}", (Object)key, (Object)this.backupFile); this.backupStream = new BufferedOutputStream(new FileOutputStream(this.backupFile)); } void checkOpen() throws IOException { if (!this.closed.get()) return; // vv-- Additional logging --vvv LOG.error("OutputStream for key '{}' closed.", (Object)this.key); throw new IOException("Output Stream closed"); } @Override public void flush() throws IOException { this.checkOpen(); this.backupStream.flush(); } @Override public void close() throws IOException { if (this.closed.getAndSet(true)) { return; } this.backupStream.close(); LOG.debug("OutputStream for key '{}' closed. Now beginning upload", (Object)this.key); try { ObjectMetadata om = this.fs.newObjectMetadata(this.backupFile.length()); Upload upload = this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile)); ProgressableProgressListener listener = new ProgressableProgressListener(this.fs, this.key, upload, this.progress); upload.addProgressListener((ProgressListener)listener); upload.waitForUploadResult(); listener.uploadCompleted(); this.fs.finishedWrite(this.key); } catch (InterruptedException e) { throw (InterruptedIOException)new InterruptedIOException(e.toString()).initCause(e); } catch (AmazonClientException e) { throw S3AUtils.translateException("saving output", this.key, e); } finally { if (!this.backupFile.delete()) { LOG.warn("Could not delete temporary s3a file: {}", (Object)this.backupFile); } super.close(); } LOG.debug("OutputStream for key '{}' upload complete", (Object)this.key); } @Override public void write(int b) throws IOException { this.checkOpen(); this.backupStream.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { this.checkOpen(); this.backupStream.write(b, off, len); } static { } } {code} You can see from this addition log output that the S3AOutputStream#close() method **appears** to be called before the S3AOutputStream#flush() method: {code:java} 2018-02-01 1
[jira] [Updated] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chris snow updated FLINK-8543: -- Attachment: Screen Shot 2018-01-30 at 18.34.51.png Description: I'm hitting an issue with my BucketingSink from a streaming job. {code:java} return new BucketingSink>(path) .setWriter(writer) .setBucketer(new DateTimeBucketer>(formatString)); {code} I can see that a few files have run into issues with uploading to S3: !Screen Shot 2018-01-30 at 18.34.51.png! I've grabbed the S3AOutputStream class from my cluster and added some additional logging to the checkOpen() method to log the 'key' just before the exception is thrown: {code:java} /* * Decompiled with CFR. */ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; import com.amazonaws.event.ProgressListener; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.transfer.Upload; import com.amazonaws.services.s3.transfer.model.UploadResult; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.ProgressableProgressListener; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @InterfaceAudience.Private @InterfaceStability.Evolving public class S3AOutputStream extends OutputStream { private final OutputStream backupStream; private final File backupFile; private final AtomicBoolean closed = new AtomicBoolean(false); private final String key; private final Progressable progress; private final S3AFileSystem fs; public static final Logger LOG = S3AFileSystem.LOG; public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException { this.key = key; this.progress = progress; this.fs = fs; this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); LOG.debug("OutputStream for key '{}' writing to tempfile: {}", (Object)key, (Object)this.backupFile); this.backupStream = new BufferedOutputStream(new FileOutputStream(this.backupFile)); } void checkOpen() throws IOException { if (!this.closed.get()) return; // vv-- Additional logging --vvv LOG.error("OutputStream for key '{}' closed.", (Object)this.key); throw new IOException("Output Stream closed"); } @Override public void flush() throws IOException { this.checkOpen(); this.backupStream.flush(); } @Override public void close() throws IOException { if (this.closed.getAndSet(true)) { return; } this.backupStream.close(); LOG.debug("OutputStream for key '{}' closed. Now beginning upload", (Object)this.key); try { ObjectMetadata om = this.fs.newObjectMetadata(this.backupFile.length()); Upload upload = this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile)); ProgressableProgressListener listener = new ProgressableProgressListener(this.fs, this.key, upload, this.progress); upload.addProgressListener((ProgressListener)listener); upload.waitForUploadResult(); listener.uploadCompleted(); this.fs.finishedWrite(this.key); } catch (InterruptedException e) { throw (InterruptedIOException)new InterruptedIOException(e.toString()).initCause(e); } catch (AmazonClientException e) { throw S3AUtils.translateException("saving output", this.key, e); } finally { if (!this.backupFile.delete()) { LOG.warn("Could not delete temporary s3a file: {}", (Object)this.backupFile); } super.close(); } LOG.debug("OutputStream for key '{}' upload complete", (Object)this.key); } @Override public void write(int b) throws IOException { this.checkOpen(); this.backupStream.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { this.checkOpen(); this.backupStream.write(b, off, len); } static { } } {code} You can see from this addition log output that the S3AOutputStream#close() method **appears** to be called before the S3AOutputStream#flush() method: {code:java} 2018-02-01 12:42:20,698 DEBUG org.apache.h
[GitHub] flink pull request #:
Github user tzulitai commented on the pull request: https://github.com/apache/flink/commit/a2533f406d46b1c5acb5f70c263f9afad839dffe#commitcomment-27262148 In flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java: In flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java on line 74: Question regarding this change: Why do we already test for `v1_5`? I think we should / can only add that variant once we cut the `release-1.5` branch. Otherwise, strictly speaking, any savepoints taken with the current `master` snapshot is not a proper 1.5 savepoint. ---
[jira] [Created] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
chris snow created FLINK-8543: - Summary: Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen Key: FLINK-8543 URL: https://issues.apache.org/jira/browse/FLINK-8543 Project: Flink Issue Type: Bug Components: Streaming Connectors Affects Versions: 1.4.0 Environment: IBM Analytics Engine - [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] The cluster is based on Hortonworks Data Platform 2.6.2. The following components are made available. Apache Spark 2.1.1 Hadoop 2.7.3 Apache Livy 0.3.0 Knox 0.12.0 Ambari 2.5.2 Anaconda with Python 2.7.13 and 3.5.2 Jupyter Enterprise Gateway 0.5.0 HBase 1.1.2 * Hive 1.2.1 * Oozie 4.2.0 * Flume 1.5.2 * Tez 0.7.0 * Pig 0.16.0 * Sqoop 1.4.6 * Slider 0.92.0 * Reporter: chris snow I'm hitting an issue with my BucketingSink from a streaming job. {code:java} return new BucketingSink>(path) .setWriter(writer) .setBucketer(new DateTimeBucketer>(formatString)); {code} I can see that a few files have run into issues with uploading to S3: !Screen Shot 2018-01-30 at 18.34.51.png! I've grabbed the S3AOutputStream class from my cluster and added some additional logging to the checkOpen() method to log the 'key' just before the exception is thrown: {code:java} /* * Decompiled with CFR. */ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; import com.amazonaws.event.ProgressListener; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.transfer.Upload; import com.amazonaws.services.s3.transfer.model.UploadResult; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.ProgressableProgressListener; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @InterfaceAudience.Private @InterfaceStability.Evolving public class S3AOutputStream extends OutputStream { private final OutputStream backupStream; private final File backupFile; private final AtomicBoolean closed = new AtomicBoolean(false); private final String key; private final Progressable progress; private final S3AFileSystem fs; public static final Logger LOG = S3AFileSystem.LOG; public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException { this.key = key; this.progress = progress; this.fs = fs; this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); LOG.debug("OutputStream for key '{}' writing to tempfile: {}", (Object)key, (Object)this.backupFile); this.backupStream = new BufferedOutputStream(new FileOutputStream(this.backupFile)); } void checkOpen() throws IOException { if (!this.closed.get()) return; // vv-- Additional logging --vvv LOG.error("OutputStream for key '{}' closed.", (Object)this.key); throw new IOException("Output Stream closed"); } @Override public void flush() throws IOException { this.checkOpen(); this.backupStream.flush(); } @Override public void close() throws IOException { if (this.closed.getAndSet(true)) { return; } this.backupStream.close(); LOG.debug("OutputStream for key '{}' closed. Now beginning upload", (Object)this.key); try { ObjectMetadata om = this.fs.newObjectMetadata(this.backupFile.length()); Upload upload = this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile)); ProgressableProgressListener listener = new ProgressableProgressListener(this.fs, this.key, upload, this.progress); upload.addProgressListener((ProgressListener)listener); upload.waitForUploadResult(); listener.uploadCompleted(); this.fs.finishedWrite(this.key); } catch (InterruptedException e) { throw (InterruptedIOException)new InterruptedIOException(e.toString()).initCause(e); } catch (AmazonClientException e) { throw S3AUtils.translateException("saving output", this.key, e); } finally { if (!this.backupFile.delete()) { LOG.warn("Could not delete temporary s3a file: {}", (Object)this.backupFile); }
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165349688 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() { final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT && - enableIncrementalCheckpointing) { - return snapshotIncrementally(checkpointId, timestamp, streamFactory); - } else { - return snapshotFully(checkpointId, timestamp, streamFactory); - } + return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } - private RunnableFuture> snapshotIncrementally( - final long checkpointId, - final long checkpointTimestamp, - final CheckpointStreamFactory checkpointStreamFactory) throws Exception { - - if (db == null) { - throw new IOException("RocksDB closed."); - } + @Override + public void restore(StateObjectCollection restoreState) throws Exception { + LOG.info("Initializing RocksDB keyed state backend from snapshot."); - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + - checkpointTimestamp + " . Returning null."); - } - return DoneFuture.nullValue(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } - final RocksDBIncrementalSnapshotOperation snapshotOperation = - new RocksDBIncrementalSnapshotOperation<>( - this, - checkpointStreamFactory, - checkpointId, - checkpointTimestamp); - - snapshotOperation.takeSnapshot(); - - return new FutureTask>( - () -> snapshotOperation.materializeSnapshot() - ) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - snapshotOperation.stop(); - return super.cancel(mayInterruptIfRunning); - } + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); - @Override - protected void done() { - snapshotOperation.releaseResources(isCancelled()); + try { + if (restoreState == null || restoreState.isEmpty()) { + createDB(); + } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { + RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); + restoreOperation.restore(restoreState); + } else { + RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation<>(this); + restoreOperation.doRestore(restoreState); } - }; + } catch (Exception ex) { + dispose(); + throw ex; + } } - private RunnableFuture> snapshotFully( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { - - long startTime = System.currentTimeMillis(); - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - - final RocksDBFullSnapshotOperation snapshotOperation; - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + - " . Returning null."); - } + @Override + public void notifyCheckpointComplete(long completedCheckpointId) { - return
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165349012 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() { final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT && - enableIncrementalCheckpointing) { - return snapshotIncrementally(checkpointId, timestamp, streamFactory); - } else { - return snapshotFully(checkpointId, timestamp, streamFactory); - } + return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } - private RunnableFuture> snapshotIncrementally( - final long checkpointId, - final long checkpointTimestamp, - final CheckpointStreamFactory checkpointStreamFactory) throws Exception { - - if (db == null) { - throw new IOException("RocksDB closed."); - } + @Override + public void restore(StateObjectCollection restoreState) throws Exception { + LOG.info("Initializing RocksDB keyed state backend from snapshot."); - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + - checkpointTimestamp + " . Returning null."); - } - return DoneFuture.nullValue(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } - final RocksDBIncrementalSnapshotOperation snapshotOperation = - new RocksDBIncrementalSnapshotOperation<>( - this, - checkpointStreamFactory, - checkpointId, - checkpointTimestamp); - - snapshotOperation.takeSnapshot(); - - return new FutureTask>( - () -> snapshotOperation.materializeSnapshot() - ) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - snapshotOperation.stop(); - return super.cancel(mayInterruptIfRunning); - } + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); - @Override - protected void done() { - snapshotOperation.releaseResources(isCancelled()); + try { + if (restoreState == null || restoreState.isEmpty()) { + createDB(); + } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { + RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); + restoreOperation.restore(restoreState); + } else { + RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation<>(this); + restoreOperation.doRestore(restoreState); } - }; + } catch (Exception ex) { + dispose(); + throw ex; + } } - private RunnableFuture> snapshotFully( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { - - long startTime = System.currentTimeMillis(); - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - - final RocksDBFullSnapshotOperation snapshotOperation; - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + - " . Returning null."); - } + @Override + public void notifyCheckpointComplete(long completedCheckpointId) { - return
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348543#comment-16348543 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165349012 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() { final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT && - enableIncrementalCheckpointing) { - return snapshotIncrementally(checkpointId, timestamp, streamFactory); - } else { - return snapshotFully(checkpointId, timestamp, streamFactory); - } + return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } - private RunnableFuture> snapshotIncrementally( - final long checkpointId, - final long checkpointTimestamp, - final CheckpointStreamFactory checkpointStreamFactory) throws Exception { - - if (db == null) { - throw new IOException("RocksDB closed."); - } + @Override + public void restore(StateObjectCollection restoreState) throws Exception { + LOG.info("Initializing RocksDB keyed state backend from snapshot."); - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + - checkpointTimestamp + " . Returning null."); - } - return DoneFuture.nullValue(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } - final RocksDBIncrementalSnapshotOperation snapshotOperation = - new RocksDBIncrementalSnapshotOperation<>( - this, - checkpointStreamFactory, - checkpointId, - checkpointTimestamp); - - snapshotOperation.takeSnapshot(); - - return new FutureTask>( - () -> snapshotOperation.materializeSnapshot() - ) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - snapshotOperation.stop(); - return super.cancel(mayInterruptIfRunning); - } + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); - @Override - protected void done() { - snapshotOperation.releaseResources(isCancelled()); + try { + if (restoreState == null || restoreState.isEmpty()) { + createDB(); + } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { + RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); + restoreOperation.restore(restoreState); + } else { + RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation<>(this); + restoreOperation.doRestore(restoreState); } - }; + } catch (Exception ex) { + dispose(); + throw ex; + } } - private RunnableFuture> snapshotFully( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { - - long startTime = System.currentTimeMillis(); - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - - final RocksDBFullSnapshotOperation snapshotOperation; - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348525#comment-16348525 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165344648 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -630,19 +506,210 @@ public int numStateEntries(Object namespace) { return sum; } - public StateTable newStateTable(RegisteredKeyedBackendStateMetaInfo newMetaInfo) { - return asynchronousSnapshots ? - new CopyOnWriteStateTable<>(this, newMetaInfo) : - new NestedMapsStateTable<>(this, newMetaInfo); - } - @Override public boolean supportsAsynchronousSnapshots() { - return asynchronousSnapshots; + return snapshotStrategy.isAsynchronous(); } @VisibleForTesting public FsStateBackend.LocalRecoveryConfig getLocalRecoveryConfig() { return localRecoveryConfig; } + + /** +* Base class for the snapshots of the heap backend that outlines the algorithm and offers some hooks to realize +* the concrete strategies. +*/ + private abstract class HeapSnapshotStrategy implements SnapshotStrategy> { + + @Override + public RunnableFuture> performSnapshot( + long checkpointId, + long timestamp, + CheckpointStreamFactory streamFactory, + CheckpointOptions checkpointOptions) throws Exception { + + if (!hasRegisteredState()) { + return DoneFuture.nullValue(); + } + + long syncStartTime = System.currentTimeMillis(); + + Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE, + "Too many KV-States: " + stateTables.size() + + ". Currently at most " + Short.MAX_VALUE + " states are supported"); + + List> metaInfoSnapshots = + new ArrayList<>(stateTables.size()); + + final Map kVStateToId = new HashMap<>(stateTables.size()); + + final Map, StateTableSnapshot> cowStateStableSnapshots = + new HashedMap(stateTables.size()); + + for (Map.Entry> kvState : stateTables.entrySet()) { + kVStateToId.put(kvState.getKey(), kVStateToId.size()); + StateTable stateTable = kvState.getValue(); + if (null != stateTable) { + metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot()); + cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot()); + } + } + + final KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy<>( + keySerializer, + metaInfoSnapshots, + !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator)); + + //--- this becomes the end of sync part + + // implementation of the async IO operation, based on FutureTask + final AbstractAsyncCallableWithResources> ioCallable = + new AbstractAsyncCallableWithResources>() { + + CheckpointStreamFactory.CheckpointStateOutputStream stream = null; + + @Override + protected void acquireResources() throws Exception { + stream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + cancelStreamRegistry.registerCloseable(stream); + } + + @Override + protected void releaseResources() throws Exception { + + if (cancelStreamRegistry.unregisterCloseable(stream)) { + IOUtils.closeQuietly(str
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165344648 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -630,19 +506,210 @@ public int numStateEntries(Object namespace) { return sum; } - public StateTable newStateTable(RegisteredKeyedBackendStateMetaInfo newMetaInfo) { - return asynchronousSnapshots ? - new CopyOnWriteStateTable<>(this, newMetaInfo) : - new NestedMapsStateTable<>(this, newMetaInfo); - } - @Override public boolean supportsAsynchronousSnapshots() { - return asynchronousSnapshots; + return snapshotStrategy.isAsynchronous(); } @VisibleForTesting public FsStateBackend.LocalRecoveryConfig getLocalRecoveryConfig() { return localRecoveryConfig; } + + /** +* Base class for the snapshots of the heap backend that outlines the algorithm and offers some hooks to realize +* the concrete strategies. +*/ + private abstract class HeapSnapshotStrategy implements SnapshotStrategy> { + + @Override + public RunnableFuture> performSnapshot( + long checkpointId, + long timestamp, + CheckpointStreamFactory streamFactory, + CheckpointOptions checkpointOptions) throws Exception { + + if (!hasRegisteredState()) { + return DoneFuture.nullValue(); + } + + long syncStartTime = System.currentTimeMillis(); + + Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE, + "Too many KV-States: " + stateTables.size() + + ". Currently at most " + Short.MAX_VALUE + " states are supported"); + + List> metaInfoSnapshots = + new ArrayList<>(stateTables.size()); + + final Map kVStateToId = new HashMap<>(stateTables.size()); + + final Map, StateTableSnapshot> cowStateStableSnapshots = + new HashedMap(stateTables.size()); + + for (Map.Entry> kvState : stateTables.entrySet()) { + kVStateToId.put(kvState.getKey(), kVStateToId.size()); + StateTable stateTable = kvState.getValue(); + if (null != stateTable) { + metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot()); + cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot()); + } + } + + final KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy<>( + keySerializer, + metaInfoSnapshots, + !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator)); + + //--- this becomes the end of sync part + + // implementation of the async IO operation, based on FutureTask + final AbstractAsyncCallableWithResources> ioCallable = + new AbstractAsyncCallableWithResources>() { + + CheckpointStreamFactory.CheckpointStateOutputStream stream = null; + + @Override + protected void acquireResources() throws Exception { + stream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + cancelStreamRegistry.registerCloseable(stream); + } + + @Override + protected void releaseResources() throws Exception { + + if (cancelStreamRegistry.unregisterCloseable(stream)) { + IOUtils.closeQuietly(stream); + stream = null; + } + + for (StateTableSnapshot tableSnapshot : cowStateStableSnapshots.values())
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165344021 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java --- @@ -19,58 +19,53 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +/** + * Unit tests for the {@link CompletedCheckpoint}. + */ public class CompletedCheckpointTest { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); - /** -* Tests that persistent checkpoints discard their header file. -*/ @Test - public void testDiscard() throws Exception { - File file = tmpFolder.newFile(); - assertEquals(true, file.exists()); - + public void registerStatesAtRegistry() { --- End diff -- The test whether state handles are correctly registered at the SharedStateRegistry was originally just sneakily added to a pre-existing metadata file cleanup test. That did not seem right ;-) This factors the test out into a separate method. The test method should be called `testRegisterStatesAtRegistry` instead of `registerStatesAtRegistry`. Will change that... ---
[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348522#comment-16348522 ] ASF GitHub Bot commented on FLINK-5820: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165344021 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java --- @@ -19,58 +19,53 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +/** + * Unit tests for the {@link CompletedCheckpoint}. + */ public class CompletedCheckpointTest { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); - /** -* Tests that persistent checkpoints discard their header file. -*/ @Test - public void testDiscard() throws Exception { - File file = tmpFolder.newFile(); - assertEquals(true, file.exists()); - + public void registerStatesAtRegistry() { --- End diff -- The test whether state handles are correctly registered at the SharedStateRegistry was originally just sneakily added to a pre-existing metadata file cleanup test. That did not seem right ;-) This factors the test out into a separate method. The test method should be called `testRegisterStatesAtRegistry` instead of `registerStatesAtRegistry`. Will change that... > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend nee
[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348516#comment-16348516 ] ASF GitHub Bot commented on FLINK-5820: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165343453 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java --- @@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws Exception { assertTrue(isDirectoryEmpty(directory)); } + // + // Not deleting parent directories + // + + /** +* This test checks that the stream does not check and clean the parent directory +* when encountering a write error. +*/ + @Test + public void testStreamDoesNotTryToCleanUpParentOnError() throws Exception { + final File directory = tempDir.newFolder(); + + // prevent creation of files in that directory + assertTrue(directory.setWritable(false, true)); + checkDirectoryNotWritable(directory); + + FileSystem fs = spy(FileSystem.getLocalFileSystem()); + + FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + stream1.write(new byte[61]); + stream2.write(new byte[61]); + + try { + stream1.closeAndGetHandle(); + fail("this should fail with an exception"); + } catch (IOException ignored) {} + + stream2.close(); + + verify(fs, times(0)).delete(any(Path.class), anyBoolean()); --- End diff -- Will add an additional check that the directory still exists > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (transitively for RocksDB as well), this means a re-structuring of
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348518#comment-16348518 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165343607 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.CheckpointOptions; + +import java.util.concurrent.RunnableFuture; + +/** + * Interface for different snapshot approaches in state backends. Implementing classes should ideally be stateless or at + * least threadsafe, i.e. this is a functional interface and is can be called in parallel by multiple checkpoints. + * + * @param type of the returned state object that represents the result of the snapshot operation. + */ +public interface SnapshotStrategy { --- End diff -- If this interface is functional, then we could also annotate it with `@FunctionalInterface` > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165343607 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.CheckpointOptions; + +import java.util.concurrent.RunnableFuture; + +/** + * Interface for different snapshot approaches in state backends. Implementing classes should ideally be stateless or at + * least threadsafe, i.e. this is a functional interface and is can be called in parallel by multiple checkpoints. + * + * @param type of the returned state object that represents the result of the snapshot operation. + */ +public interface SnapshotStrategy { --- End diff -- If this interface is functional, then we could also annotate it with `@FunctionalInterface` ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165343453 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java --- @@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws Exception { assertTrue(isDirectoryEmpty(directory)); } + // + // Not deleting parent directories + // + + /** +* This test checks that the stream does not check and clean the parent directory +* when encountering a write error. +*/ + @Test + public void testStreamDoesNotTryToCleanUpParentOnError() throws Exception { + final File directory = tempDir.newFolder(); + + // prevent creation of files in that directory + assertTrue(directory.setWritable(false, true)); + checkDirectoryNotWritable(directory); + + FileSystem fs = spy(FileSystem.getLocalFileSystem()); + + FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + stream1.write(new byte[61]); + stream2.write(new byte[61]); + + try { + stream1.closeAndGetHandle(); + fail("this should fail with an exception"); + } catch (IOException ignored) {} + + stream2.close(); + + verify(fs, times(0)).delete(any(Path.class), anyBoolean()); --- End diff -- Will add an additional check that the directory still exists ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348513#comment-16348513 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165342796 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java --- @@ -137,9 +138,12 @@ public static void main(String[] args) throws Exception { System.err.println("creating task"); + TemporaryFolder temporaryFolder = new TemporaryFolder(); --- End diff -- Making it a `ClassRule` or a `Rule` will take care of the clean up. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165343021 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java --- @@ -156,30 +157,38 @@ public void acknowledgeCheckpoint( } }; - TaskLocalStateStore taskLocalStateStore = new TaskLocalStateStore(jobID, jobVertexID, subtaskIdx) { - @Override - public void storeLocalState( - @Nonnull CheckpointMetaData checkpointMetaData, - @Nullable TaskStateSnapshot localState) { - - Assert.assertEquals(tm, localState); - tmReported.set(true); - } - }; + TemporaryFolder temporaryFolder = new TemporaryFolder(); - TaskStateManagerImpl taskStateManager = - new TaskStateManagerImpl( - jobID, - executionAttemptID, - taskLocalStateStore, - null, - checkpointResponder); - - taskStateManager.reportTaskStateSnapshots( - checkpointMetaData, - checkpointMetrics, - jm, - tm); + try { + TaskLocalStateStore taskLocalStateStore = + new TaskLocalStateStore(jobID, jobVertexID, subtaskIdx, temporaryFolder.newFolder()) { + @Override + public void storeLocalState( + @Nonnull CheckpointMetaData checkpointMetaData, + @Nullable TaskStateSnapshot localState) { + + Assert.assertEquals(tm, localState); + tmReported.set(true); + } + }; + + TaskStateManagerImpl taskStateManager = + new TaskStateManagerImpl( + jobID, + executionAttemptID, + taskLocalStateStore, + null, + checkpointResponder); + + taskStateManager.reportTaskStateSnapshots( + checkpointMetaData, + checkpointMetrics, + jm, + tm); + } catch (Exception ex) { + temporaryFolder.delete(); + throw new RuntimeException(ex); --- End diff -- Why do we throw a `RuntimeException`? ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348514#comment-16348514 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165343021 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java --- @@ -156,30 +157,38 @@ public void acknowledgeCheckpoint( } }; - TaskLocalStateStore taskLocalStateStore = new TaskLocalStateStore(jobID, jobVertexID, subtaskIdx) { - @Override - public void storeLocalState( - @Nonnull CheckpointMetaData checkpointMetaData, - @Nullable TaskStateSnapshot localState) { - - Assert.assertEquals(tm, localState); - tmReported.set(true); - } - }; + TemporaryFolder temporaryFolder = new TemporaryFolder(); - TaskStateManagerImpl taskStateManager = - new TaskStateManagerImpl( - jobID, - executionAttemptID, - taskLocalStateStore, - null, - checkpointResponder); - - taskStateManager.reportTaskStateSnapshots( - checkpointMetaData, - checkpointMetrics, - jm, - tm); + try { + TaskLocalStateStore taskLocalStateStore = + new TaskLocalStateStore(jobID, jobVertexID, subtaskIdx, temporaryFolder.newFolder()) { + @Override + public void storeLocalState( + @Nonnull CheckpointMetaData checkpointMetaData, + @Nullable TaskStateSnapshot localState) { + + Assert.assertEquals(tm, localState); + tmReported.set(true); + } + }; + + TaskStateManagerImpl taskStateManager = + new TaskStateManagerImpl( + jobID, + executionAttemptID, + taskLocalStateStore, + null, + checkpointResponder); + + taskStateManager.reportTaskStateSnapshots( + checkpointMetaData, + checkpointMetrics, + jm, + tm); + } catch (Exception ex) { + temporaryFolder.delete(); + throw new RuntimeException(ex); --- End diff -- Why do we throw a `RuntimeException`? > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165342796 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java --- @@ -137,9 +138,12 @@ public static void main(String[] args) throws Exception { System.err.println("creating task"); + TemporaryFolder temporaryFolder = new TemporaryFolder(); --- End diff -- Making it a `ClassRule` or a `Rule` will take care of the clean up. ---
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165342545 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java --- @@ -97,6 +104,39 @@ public void testStateReportingAndRetrieving() { Assert.assertNull(taskStateManager.operatorStates(operatorID_3)); } + /** +* This tests if the {@link TaskStateManager} properly returns the the subtask local state dir from the +* corresponding {@link TaskLocalStateStore}. +*/ + @Test + public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws IOException { + JobID jobID = new JobID(42L, 43L); + JobVertexID jobVertexID = new JobVertexID(12L, 34L); + ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(23L, 24L); + TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder(); + + TemporaryFolder tmpFolder = new TemporaryFolder(); + + try { + tmpFolder.create(); + TaskLocalStateStore taskLocalStateStore = + new TaskLocalStateStore(jobID, jobVertexID, 13, tmpFolder.newFolder()); + + TaskStateManager taskStateManager = taskStateManager( + jobID, + executionAttemptID, + checkpointResponderMock, + null, + taskLocalStateStore); + + Assert.assertEquals( + taskLocalStateStore.getSubtaskLocalStateBaseDirectory(), + taskStateManager.getSubtaskLocalStateBaseDirectory()); --- End diff -- Why does the `TaskStateManager` knows about the subtask local state base directory? ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348511#comment-16348511 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165342545 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java --- @@ -97,6 +104,39 @@ public void testStateReportingAndRetrieving() { Assert.assertNull(taskStateManager.operatorStates(operatorID_3)); } + /** +* This tests if the {@link TaskStateManager} properly returns the the subtask local state dir from the +* corresponding {@link TaskLocalStateStore}. +*/ + @Test + public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws IOException { + JobID jobID = new JobID(42L, 43L); + JobVertexID jobVertexID = new JobVertexID(12L, 34L); + ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(23L, 24L); + TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder(); + + TemporaryFolder tmpFolder = new TemporaryFolder(); + + try { + tmpFolder.create(); + TaskLocalStateStore taskLocalStateStore = + new TaskLocalStateStore(jobID, jobVertexID, 13, tmpFolder.newFolder()); + + TaskStateManager taskStateManager = taskStateManager( + jobID, + executionAttemptID, + checkpointResponderMock, + null, + taskLocalStateStore); + + Assert.assertEquals( + taskLocalStateStore.getSubtaskLocalStateBaseDirectory(), + taskStateManager.getSubtaskLocalStateBaseDirectory()); --- End diff -- Why does the `TaskStateManager` knows about the subtask local state base directory? > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5394 It may not be a problem in this test, but I wanted to raise that this pattern is a bit dangerous. If the thread ever gets interrupted while 'running' is still true, this goes into a hot loop constantly throwing exceptions: Every time it falls through the loop and attempts to sleep again, it will immediately throw an Interrupted Exception. ---
[jira] [Commented] (FLINK-6571) InfiniteSource in SourceStreamOperatorTest should deal with InterruptedExceptions
[ https://issues.apache.org/jira/browse/FLINK-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348512#comment-16348512 ] ASF GitHub Bot commented on FLINK-6571: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5394 It may not be a problem in this test, but I wanted to raise that this pattern is a bit dangerous. If the thread ever gets interrupted while 'running' is still true, this goes into a hot loop constantly throwing exceptions: Every time it falls through the loop and attempts to sleep again, it will immediately throw an Interrupted Exception. > InfiniteSource in SourceStreamOperatorTest should deal with > InterruptedExceptions > - > > Key: FLINK-6571 > URL: https://issues.apache.org/jira/browse/FLINK-6571 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0, 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Labels: test-stability > Fix For: 1.5.0 > > > So this is a new one: i got a failing test > ({{testNoMaxWatermarkOnAsyncStop}}) due to an uncatched InterruptedException. > {code} > [00:28:15] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: > 0.828 sec <<< FAILURE! - in > org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest > [00:28:15] > testNoMaxWatermarkOnAsyncStop(org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest) > Time elapsed: 0 sec <<< ERROR! > [00:28:15] java.lang.InterruptedException: sleep interrupted > [00:28:15]at java.lang.Thread.sleep(Native Method) > [00:28:15]at > org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest$InfiniteSource.run(StreamSourceOperatorTest.java:343) > [00:28:15]at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > [00:28:15]at > org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest.testNoMaxWatermarkOnAsyncStop(StreamSourceOperatorTest.java:176) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348510#comment-16348510 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165342341 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java --- @@ -97,6 +104,39 @@ public void testStateReportingAndRetrieving() { Assert.assertNull(taskStateManager.operatorStates(operatorID_3)); } + /** +* This tests if the {@link TaskStateManager} properly returns the the subtask local state dir from the +* corresponding {@link TaskLocalStateStore}. +*/ + @Test + public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws IOException { + JobID jobID = new JobID(42L, 43L); + JobVertexID jobVertexID = new JobVertexID(12L, 34L); + ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(23L, 24L); + TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder(); + + TemporaryFolder tmpFolder = new TemporaryFolder(); --- End diff -- Maybe `ClassRule`? > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165342341 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java --- @@ -97,6 +104,39 @@ public void testStateReportingAndRetrieving() { Assert.assertNull(taskStateManager.operatorStates(operatorID_3)); } + /** +* This tests if the {@link TaskStateManager} properly returns the the subtask local state dir from the +* corresponding {@link TaskLocalStateStore}. +*/ + @Test + public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws IOException { + JobID jobID = new JobID(42L, 43L); + JobVertexID jobVertexID = new JobVertexID(12L, 34L); + ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(23L, 24L); + TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder(); + + TemporaryFolder tmpFolder = new TemporaryFolder(); --- End diff -- Maybe `ClassRule`? ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348508#comment-16348508 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165342139 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java --- @@ -25,17 +25,24 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; import org.junit.Assert; import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; import static org.mockito.Mockito.mock; public class TaskStateManagerImplTest { --- End diff -- `extends TestLogger`. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165342139 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java --- @@ -25,17 +25,24 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; import org.junit.Assert; import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; import static org.mockito.Mockito.mock; public class TaskStateManagerImplTest { --- End diff -- `extends TestLogger`. ---
[GitHub] flink issue #5380: [hotfix][connectors] Fix log format strings
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5380 Please merge for `master` and `release-1.4`... ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348506#comment-16348506 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165341901 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.InetAddress; + +public class TaskExecutorLocalStateStoresManagerTest { + + /** +* This tests that the creation of {@link TaskManagerServices} correctly creates the local state root directory +* for the {@link TaskExecutorLocalStateStoresManager} with the configured root directory. +*/ + @Test + public void testCreationFromConfig() throws Exception { + + final Configuration config = new Configuration(); + + final String rootDirString = "localStateRoot"; + config.setString(ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY, rootDirString); + + final ResourceID tmResourceID = ResourceID.generate(); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + + TaskManagerServices taskManagerServices = + TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, tmResourceID); + + TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskStateManager(); + + Assert.assertEquals( + new File(rootDirString, TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT), + taskStateManager.getLocalStateRootDirectory()); + + Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT); + } + + /** +* This tests that the creation of {@link TaskManagerServices} correctly falls back to the first tmp directory of +* the IOManager as default for the local state root directory. +*/ + @Test + public void testCreationFromConfigDefault() throws Exception { + + final Configuration config = new Configuration(); + + final ResourceID tmResourceID = ResourceID.generate(); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + + TaskManagerServices taskManagerServices = + TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, tmResourceID); + + TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskStateManager(); + + Assert.assertEquals( + new File(taskManagerServicesConfiguration.getTmpDirPaths()[0], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT), + taskStateManager.getLocalStateRootDirectory()); + } + + /** +* This tests that the {@link TaskExecutorLocalStateStores
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348505#comment-16348505 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165341653 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.InetAddress; + +public class TaskExecutorLocalStateStoresManagerTest { + + /** +* This tests that the creation of {@link TaskManagerServices} correctly creates the local state root directory +* for the {@link TaskExecutorLocalStateStoresManager} with the configured root directory. +*/ + @Test + public void testCreationFromConfig() throws Exception { + + final Configuration config = new Configuration(); + + final String rootDirString = "localStateRoot"; + config.setString(ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY, rootDirString); + + final ResourceID tmResourceID = ResourceID.generate(); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + + TaskManagerServices taskManagerServices = + TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, tmResourceID); + + TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskStateManager(); + + Assert.assertEquals( + new File(rootDirString, TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT), + taskStateManager.getLocalStateRootDirectory()); + + Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT); --- End diff -- This assertion seems to be redundant. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165341901 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.InetAddress; + +public class TaskExecutorLocalStateStoresManagerTest { + + /** +* This tests that the creation of {@link TaskManagerServices} correctly creates the local state root directory +* for the {@link TaskExecutorLocalStateStoresManager} with the configured root directory. +*/ + @Test + public void testCreationFromConfig() throws Exception { + + final Configuration config = new Configuration(); + + final String rootDirString = "localStateRoot"; + config.setString(ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY, rootDirString); + + final ResourceID tmResourceID = ResourceID.generate(); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + + TaskManagerServices taskManagerServices = + TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, tmResourceID); + + TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskStateManager(); + + Assert.assertEquals( + new File(rootDirString, TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT), + taskStateManager.getLocalStateRootDirectory()); + + Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT); + } + + /** +* This tests that the creation of {@link TaskManagerServices} correctly falls back to the first tmp directory of +* the IOManager as default for the local state root directory. +*/ + @Test + public void testCreationFromConfigDefault() throws Exception { + + final Configuration config = new Configuration(); + + final ResourceID tmResourceID = ResourceID.generate(); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + + TaskManagerServices taskManagerServices = + TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, tmResourceID); + + TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskStateManager(); + + Assert.assertEquals( + new File(taskManagerServicesConfiguration.getTmpDirPaths()[0], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT), + taskStateManager.getLocalStateRootDirectory()); + } + + /** +* This tests that the {@link TaskExecutorLocalStateStoresManager} creates {@link TaskLocalStateStore} that have +* a properly initialized local state base directory. +*/ + @Test + public void testSubtaskStateStoreDirectoryCreation() throws Exception { + + JobID jobI
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165341653 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.InetAddress; + +public class TaskExecutorLocalStateStoresManagerTest { + + /** +* This tests that the creation of {@link TaskManagerServices} correctly creates the local state root directory +* for the {@link TaskExecutorLocalStateStoresManager} with the configured root directory. +*/ + @Test + public void testCreationFromConfig() throws Exception { + + final Configuration config = new Configuration(); + + final String rootDirString = "localStateRoot"; + config.setString(ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY, rootDirString); + + final ResourceID tmResourceID = ResourceID.generate(); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + + TaskManagerServices taskManagerServices = + TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, tmResourceID); + + TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskStateManager(); + + Assert.assertEquals( + new File(rootDirString, TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT), + taskStateManager.getLocalStateRootDirectory()); + + Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT); --- End diff -- This assertion seems to be redundant. ---
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165341397 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.InetAddress; + +public class TaskExecutorLocalStateStoresManagerTest { --- End diff -- `extends TestLogger` ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348503#comment-16348503 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165341397 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.InetAddress; + +public class TaskExecutorLocalStateStoresManagerTest { --- End diff -- `extends TestLogger` > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348501#comment-16348501 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165341071 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -197,7 +201,13 @@ public static TaskManagerServices fromConfiguration( final JobManagerTable jobManagerTable = new JobManagerTable(); final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); - final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(); + + final File taskExecutorLocalStateRootDir = + new File(taskManagerServicesConfiguration.getLocalStateRootDir(), LOCAL_STATE_SUB_DIRECTORY_ROOT); --- End diff -- Are we giving the `TaskExecutorLocalStateStoresManager` a unique directory? What happens if multiple TMs run on the same machine? Will they use different directories? > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165341071 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -197,7 +201,13 @@ public static TaskManagerServices fromConfiguration( final JobManagerTable jobManagerTable = new JobManagerTable(); final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); - final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(); + + final File taskExecutorLocalStateRootDir = + new File(taskManagerServicesConfiguration.getLocalStateRootDir(), LOCAL_STATE_SUB_DIRECTORY_ROOT); --- End diff -- Are we giving the `TaskExecutorLocalStateStoresManager` a unique directory? What happens if multiple TMs run on the same machine? Will they use different directories? ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348495#comment-16348495 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165340438 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -501,4 +529,53 @@ public String toString() { "', asynchronous: " + asynchronousSnapshots + ", fileStateThreshold: " + fileStateThreshold + ")"; } + + /** +* This enum represents the different modes for local recovery. +*/ + public enum LocalRecoveryMode { + DISABLED, ENABLE_FILE_BASED, ENABLE_HEAP_BASED + } + + /** +* This class encapsulates the configuration for local recovery of this backend. +*/ + public static final class LocalRecoveryConfig implements Serializable { + + private static final long serialVersionUID = 1L; + private static final LocalRecoveryConfig DISABLED_SINGLETON = + new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, null); + + private final LocalRecoveryMode localRecoveryMode; + private final File localStateDirectory; + + LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File localStateDirectory) { + this.localRecoveryMode = Preconditions.checkNotNull(localRecoveryMode); + this.localStateDirectory = localStateDirectory; + if (LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && localStateDirectory == null) { + throw new IllegalStateException("Local state directory must be specified if local recovery mode is " + + LocalRecoveryMode.ENABLE_FILE_BASED); + } + } + + public LocalRecoveryMode getLocalRecoveryMode() { + return localRecoveryMode; + } + + public File getLocalStateDirectory() { + return localStateDirectory; --- End diff -- How does this play together with `ENABLE_HEAP_BASED` `LocalRecoveryMode`? > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165340438 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -501,4 +529,53 @@ public String toString() { "', asynchronous: " + asynchronousSnapshots + ", fileStateThreshold: " + fileStateThreshold + ")"; } + + /** +* This enum represents the different modes for local recovery. +*/ + public enum LocalRecoveryMode { + DISABLED, ENABLE_FILE_BASED, ENABLE_HEAP_BASED + } + + /** +* This class encapsulates the configuration for local recovery of this backend. +*/ + public static final class LocalRecoveryConfig implements Serializable { + + private static final long serialVersionUID = 1L; + private static final LocalRecoveryConfig DISABLED_SINGLETON = + new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, null); + + private final LocalRecoveryMode localRecoveryMode; + private final File localStateDirectory; + + LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File localStateDirectory) { + this.localRecoveryMode = Preconditions.checkNotNull(localRecoveryMode); + this.localStateDirectory = localStateDirectory; + if (LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && localStateDirectory == null) { + throw new IllegalStateException("Local state directory must be specified if local recovery mode is " + + LocalRecoveryMode.ENABLE_FILE_BASED); + } + } + + public LocalRecoveryMode getLocalRecoveryMode() { + return localRecoveryMode; + } + + public File getLocalStateDirectory() { + return localStateDirectory; --- End diff -- How does this play together with `ENABLE_HEAP_BASED` `LocalRecoveryMode`? ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348494#comment-16348494 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165340246 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java --- @@ -60,4 +62,9 @@ void reportTaskStateSnapshots( * @return previous state for the operator. Null if no previous state exists. */ OperatorSubtaskState operatorStates(OperatorID operatorID); + + /** +* Returns the base directory for all file-based local state of the owning subtask. +*/ + File getSubtaskLocalStateBaseDirectory(); --- End diff -- I guess there will be other non file-based local states, right? What would this method return for these state objects? > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165340246 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java --- @@ -60,4 +62,9 @@ void reportTaskStateSnapshots( * @return previous state for the operator. Null if no previous state exists. */ OperatorSubtaskState operatorStates(OperatorID operatorID); + + /** +* Returns the base directory for all file-based local state of the owning subtask. +*/ + File getSubtaskLocalStateBaseDirectory(); --- End diff -- I guess there will be other non file-based local states, right? What would this method return for these state objects? ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348492#comment-16348492 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165340082 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java --- @@ -46,26 +52,63 @@ /** */ private final int subtaskIndex; + /** */ + private final Map storedTaskStateByCheckpointID; + + /** This is the base directory for all local state of the subtask that owns this {@link TaskLocalStateStore}. */ + private final File subtaskLocalStateBaseDirectory; + public TaskLocalStateStore( JobID jobID, JobVertexID jobVertexID, - int subtaskIndex) { + int subtaskIndex, + File localStateRootDir) { this.jobID = jobID; this.jobVertexID = jobVertexID; this.subtaskIndex = subtaskIndex; + this.storedTaskStateByCheckpointID = new HashMap<>(); + this.subtaskLocalStateBaseDirectory = + new File(localStateRootDir, createSubtaskPath(jobID, jobVertexID, subtaskIndex)); + } + + static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, int subtaskIndex) { + return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + subtaskIndex; } public void storeLocalState( @Nonnull CheckpointMetaData checkpointMetaData, @Nullable TaskStateSnapshot localState) { - if (localState != null) { - throw new UnsupportedOperationException("Implement this before actually providing local state!"); + TaskStateSnapshot previous = + storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), localState); + + if (previous != null) { + throw new IllegalStateException("Found previously registered local state for checkpoint with id " + + checkpointMetaData.getCheckpointId() + "! This indicated a problem."); } } - public void dispose() { - //TODO + public void dispose() throws Exception { + + Exception collectedException = null; + + for (TaskStateSnapshot snapshot : storedTaskStateByCheckpointID.values()) { + try { + snapshot.discardState(); + } catch (Exception discardEx) { + collectedException = ExceptionUtils.firstOrSuppressed(discardEx, collectedException); + } + } + + if (collectedException != null) { + throw collectedException; + } + + FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory); --- End diff -- Is there a way to retry the non discarded state handles based on this directory? If not, then we could delete it also in case of a failure. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165340082 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java --- @@ -46,26 +52,63 @@ /** */ private final int subtaskIndex; + /** */ + private final Map storedTaskStateByCheckpointID; + + /** This is the base directory for all local state of the subtask that owns this {@link TaskLocalStateStore}. */ + private final File subtaskLocalStateBaseDirectory; + public TaskLocalStateStore( JobID jobID, JobVertexID jobVertexID, - int subtaskIndex) { + int subtaskIndex, + File localStateRootDir) { this.jobID = jobID; this.jobVertexID = jobVertexID; this.subtaskIndex = subtaskIndex; + this.storedTaskStateByCheckpointID = new HashMap<>(); + this.subtaskLocalStateBaseDirectory = + new File(localStateRootDir, createSubtaskPath(jobID, jobVertexID, subtaskIndex)); + } + + static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, int subtaskIndex) { + return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + subtaskIndex; } public void storeLocalState( @Nonnull CheckpointMetaData checkpointMetaData, @Nullable TaskStateSnapshot localState) { - if (localState != null) { - throw new UnsupportedOperationException("Implement this before actually providing local state!"); + TaskStateSnapshot previous = + storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), localState); + + if (previous != null) { + throw new IllegalStateException("Found previously registered local state for checkpoint with id " + + checkpointMetaData.getCheckpointId() + "! This indicated a problem."); } } - public void dispose() { - //TODO + public void dispose() throws Exception { + + Exception collectedException = null; + + for (TaskStateSnapshot snapshot : storedTaskStateByCheckpointID.values()) { + try { + snapshot.discardState(); + } catch (Exception discardEx) { + collectedException = ExceptionUtils.firstOrSuppressed(discardEx, collectedException); + } + } + + if (collectedException != null) { + throw collectedException; + } + + FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory); --- End diff -- Is there a way to retry the non discarded state handles based on this directory? If not, then we could delete it also in case of a failure. ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348467#comment-16348467 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165332129 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -316,6 +316,11 @@ */ public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause"; + /** +* The config parameter defining the root directories for storing file-based state for local recovery. +*/ + public static final String TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY = "taskmanager.local-state-root.dir"; --- End diff -- Using `ConfigConstants` is discouraged. Please introduce a `ConfigOption` if not already done in a later commit. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165332129 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -316,6 +316,11 @@ */ public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause"; + /** +* The config parameter defining the root directories for storing file-based state for local recovery. +*/ + public static final String TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY = "taskmanager.local-state-root.dir"; --- End diff -- Using `ConfigConstants` is discouraged. Please introduce a `ConfigOption` if not already done in a later commit. ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348463#comment-16348463 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165331645 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java --- @@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception { initField.setAccessible(true); initField.setBoolean(null, false); } + + /** +* This enum represents the different modes for local recovery. +*/ + public enum LocalRecoveryMode { + DISABLED, ENABLE_FILE_BASED + } + + /** +* This class encapsulates the configuration for local recovery of this backend. +*/ + public static final class LocalRecoveryConfig implements Serializable { + + private static final long serialVersionUID = 1L; + private static final LocalRecoveryConfig DISABLED_SINGLETON = + new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, null); + + private final LocalRecoveryMode localRecoveryMode; + private final File localStateDirectory; + + LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File localStateDirectory) { + this.localRecoveryMode = Preconditions.checkNotNull(localRecoveryMode); + this.localStateDirectory = localStateDirectory; + if (LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && localStateDirectory == null) { + throw new IllegalStateException("Local state directory must be specified if local recovery mode is " + + LocalRecoveryMode.ENABLE_FILE_BASED); + } + } + + public LocalRecoveryMode getLocalRecoveryMode() { + return localRecoveryMode; + } + + public File getLocalStateDirectory() { --- End diff -- @Nullable > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348461#comment-16348461 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165331598 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java --- @@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception { initField.setAccessible(true); initField.setBoolean(null, false); } + + /** +* This enum represents the different modes for local recovery. +*/ + public enum LocalRecoveryMode { + DISABLED, ENABLE_FILE_BASED + } + + /** +* This class encapsulates the configuration for local recovery of this backend. +*/ + public static final class LocalRecoveryConfig implements Serializable { + + private static final long serialVersionUID = 1L; + private static final LocalRecoveryConfig DISABLED_SINGLETON = + new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, null); + + private final LocalRecoveryMode localRecoveryMode; + private final File localStateDirectory; + + LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File localStateDirectory) { --- End diff -- @Nullable annotation missing > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165331598 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java --- @@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception { initField.setAccessible(true); initField.setBoolean(null, false); } + + /** +* This enum represents the different modes for local recovery. +*/ + public enum LocalRecoveryMode { + DISABLED, ENABLE_FILE_BASED + } + + /** +* This class encapsulates the configuration for local recovery of this backend. +*/ + public static final class LocalRecoveryConfig implements Serializable { + + private static final long serialVersionUID = 1L; + private static final LocalRecoveryConfig DISABLED_SINGLETON = + new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, null); + + private final LocalRecoveryMode localRecoveryMode; + private final File localStateDirectory; + + LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File localStateDirectory) { --- End diff -- @Nullable annotation missing ---
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165331645 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java --- @@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception { initField.setAccessible(true); initField.setBoolean(null, false); } + + /** +* This enum represents the different modes for local recovery. +*/ + public enum LocalRecoveryMode { + DISABLED, ENABLE_FILE_BASED + } + + /** +* This class encapsulates the configuration for local recovery of this backend. +*/ + public static final class LocalRecoveryConfig implements Serializable { + + private static final long serialVersionUID = 1L; + private static final LocalRecoveryConfig DISABLED_SINGLETON = + new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, null); + + private final LocalRecoveryMode localRecoveryMode; + private final File localStateDirectory; + + LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File localStateDirectory) { + this.localRecoveryMode = Preconditions.checkNotNull(localRecoveryMode); + this.localStateDirectory = localStateDirectory; + if (LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && localStateDirectory == null) { + throw new IllegalStateException("Local state directory must be specified if local recovery mode is " + + LocalRecoveryMode.ENABLE_FILE_BASED); + } + } + + public LocalRecoveryMode getLocalRecoveryMode() { + return localRecoveryMode; + } + + public File getLocalStateDirectory() { --- End diff -- @Nullable ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348444#comment-16348444 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165329478 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java --- @@ -104,10 +107,8 @@ @Nullable private OptionsFactory optionsFactory; - /** True if incremental checkpointing is enabled. -* Null if not yet set, in which case the configuration values will be used. */ - @Nullable - private Boolean enableIncrementalCheckpointing; + /** True if incremental checkpointing is enabled. */ + private TernaryBoolean enableIncrementalCheckpointing; --- End diff -- what about renaming it to `isIncrementalCheckpointingEnabled`. This makes it clearer that this is a boolean. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348442#comment-16348442 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165329369 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java --- @@ -104,10 +107,8 @@ @Nullable private OptionsFactory optionsFactory; - /** True if incremental checkpointing is enabled. -* Null if not yet set, in which case the configuration values will be used. */ - @Nullable - private Boolean enableIncrementalCheckpointing; + /** True if incremental checkpointing is enabled. */ --- End diff -- Please update comment. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165329478 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java --- @@ -104,10 +107,8 @@ @Nullable private OptionsFactory optionsFactory; - /** True if incremental checkpointing is enabled. -* Null if not yet set, in which case the configuration values will be used. */ - @Nullable - private Boolean enableIncrementalCheckpointing; + /** True if incremental checkpointing is enabled. */ + private TernaryBoolean enableIncrementalCheckpointing; --- End diff -- what about renaming it to `isIncrementalCheckpointingEnabled`. This makes it clearer that this is a boolean. ---
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165329369 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java --- @@ -104,10 +107,8 @@ @Nullable private OptionsFactory optionsFactory; - /** True if incremental checkpointing is enabled. -* Null if not yet set, in which case the configuration values will be used. */ - @Nullable - private Boolean enableIncrementalCheckpointing; + /** True if incremental checkpointing is enabled. */ --- End diff -- Please update comment. ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348424#comment-16348424 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165326694 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.TaskLocalStateStore; +import org.apache.flink.runtime.state.TaskStateManagerImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.Mockito.mock; + +/** + * Test for forwarding of state reporting to and from {@link org.apache.flink.runtime.state.TaskStateManager}. + */ +public class LocalStateForwardingTest { + + /** +* This tests the forwarding of jm and tm-local state from the futures reported by the backends, through the +* async checkpointing thread to the {@link org.apache.flink.runtime.state.TaskStateManager}. +*/ + @Test + public void testForwardingFromSnapshotToTaskStateManager() throws Exception { + + TestTaskStateManager taskStateManager = new TestTaskStateManager(); + + StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment( + new Configuration(), + new Configuration(), + new ExecutionConfig(), + 1024*1024, + new MockInputSplitProvider(), + 0, + taskStateManager); + + StreamTask testStreamTask = new StreamTaskTest.NoOpStreamTask(streamMockEnvironment); + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(0L, 0L); + CheckpointMetrics checkpointMetrics = new CheckpointMetrics(); + + Map snapshots = new HashMap<>(1); + OperatorSnapshotFutures osFuture = new OperatorSnapshotFutures(); + + osFuture.setKeyedStateManagedFuture(createSnapshotResult(KeyedStateHandle.class)); + osFuture.setKeyedStateRawFuture(createSnapshotResult(KeyedStateHandle.class)); + osFuture.setOperatorSta
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165326694 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.TaskLocalStateStore; +import org.apache.flink.runtime.state.TaskStateManagerImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.Mockito.mock; + +/** + * Test for forwarding of state reporting to and from {@link org.apache.flink.runtime.state.TaskStateManager}. + */ +public class LocalStateForwardingTest { + + /** +* This tests the forwarding of jm and tm-local state from the futures reported by the backends, through the +* async checkpointing thread to the {@link org.apache.flink.runtime.state.TaskStateManager}. +*/ + @Test + public void testForwardingFromSnapshotToTaskStateManager() throws Exception { + + TestTaskStateManager taskStateManager = new TestTaskStateManager(); + + StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment( + new Configuration(), + new Configuration(), + new ExecutionConfig(), + 1024*1024, + new MockInputSplitProvider(), + 0, + taskStateManager); + + StreamTask testStreamTask = new StreamTaskTest.NoOpStreamTask(streamMockEnvironment); + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(0L, 0L); + CheckpointMetrics checkpointMetrics = new CheckpointMetrics(); + + Map snapshots = new HashMap<>(1); + OperatorSnapshotFutures osFuture = new OperatorSnapshotFutures(); + + osFuture.setKeyedStateManagedFuture(createSnapshotResult(KeyedStateHandle.class)); + osFuture.setKeyedStateRawFuture(createSnapshotResult(KeyedStateHandle.class)); + osFuture.setOperatorStateManagedFuture(createSnapshotResult(OperatorStateHandle.class)); + osFuture.setOperatorStateRawFuture(createSnapshotResult(OperatorStateHandle.class)); + + OperatorID operatorID = new OperatorID(); + snapshot
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348422#comment-16348422 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165325793 --- Diff: flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.mockito.Mockito; + +import java.lang.reflect.Array; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; + +/** + * Helper class with a method that attempt to automatically test method forwarding between a delegate and a wrapper. + */ +public class MethodForwardingTestUtil { + + /** +* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper +* class is a subtype of the delegate. This ignores methods that are inherited from Object. +* +* @param delegateClass the class for the delegate. +* @param wrapperFactory factory that produces a wrapper from a delegate. +* @param type of the delegate +* @param type of the wrapper +*/ + public static void testMethodForwarding( + Class delegateClass, + Function wrapperFactory) { + testMethodForwarding(delegateClass, wrapperFactory, Collections.emptySet()); + } + + /** +* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper +* class is a subtype of the delegate. Methods can be remapped in case that the implementation does not call the +* original method. Remapping to null skips the method. This ignores methods that are inherited from Object. +* +* @param delegateClass the class for the delegate. +* @param wrapperFactory factory that produces a wrapper from a delegate. +* @param skipMethodSet set of methods to ignore. +* @param type of the delegate +* @param type of the wrapper +*/ + public static void testMethodForwarding( + Class delegateClass, + Function wrapperFactory, + Set skipMethodSet) { + + Preconditions.checkNotNull(delegateClass); + Preconditions.checkNotNull(wrapperFactory); + Preconditions.checkNotNull(skipMethodSet); + + D delegate = spy(delegateClass); + W wrapper = wrapperFactory.apply(delegate); + + // ensure that wrapper is a subtype of delegate + Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass())); + + for (Method delegateMethod : delegateClass.getMethods()) { + + if (checkSkipMethodForwardCheck(delegateMethod, skipMethodSet)) { + continue; + } + + try { + // find the correct method to substitute the bridge for erased generic types. + // if this doesn't work, the user need to exclude the method and write an additional test. + Method wrapperMethod = wrapper.getClass().getDeclaredMethod( + delegateMethod.getName(), + delegateMethod.getParameterTypes()); + + // things get a bit fuzzy here, best effort to find a match but this might end up with a wrong method. + if (wrapperMethod.isBridge()) {
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165325793 --- Diff: flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.mockito.Mockito; + +import java.lang.reflect.Array; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; + +/** + * Helper class with a method that attempt to automatically test method forwarding between a delegate and a wrapper. + */ +public class MethodForwardingTestUtil { + + /** +* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper +* class is a subtype of the delegate. This ignores methods that are inherited from Object. +* +* @param delegateClass the class for the delegate. +* @param wrapperFactory factory that produces a wrapper from a delegate. +* @param type of the delegate +* @param type of the wrapper +*/ + public static void testMethodForwarding( + Class delegateClass, + Function wrapperFactory) { + testMethodForwarding(delegateClass, wrapperFactory, Collections.emptySet()); + } + + /** +* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper +* class is a subtype of the delegate. Methods can be remapped in case that the implementation does not call the +* original method. Remapping to null skips the method. This ignores methods that are inherited from Object. +* +* @param delegateClass the class for the delegate. +* @param wrapperFactory factory that produces a wrapper from a delegate. +* @param skipMethodSet set of methods to ignore. +* @param type of the delegate +* @param type of the wrapper +*/ + public static void testMethodForwarding( + Class delegateClass, + Function wrapperFactory, + Set skipMethodSet) { + + Preconditions.checkNotNull(delegateClass); + Preconditions.checkNotNull(wrapperFactory); + Preconditions.checkNotNull(skipMethodSet); + + D delegate = spy(delegateClass); + W wrapper = wrapperFactory.apply(delegate); + + // ensure that wrapper is a subtype of delegate + Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass())); + + for (Method delegateMethod : delegateClass.getMethods()) { + + if (checkSkipMethodForwardCheck(delegateMethod, skipMethodSet)) { + continue; + } + + try { + // find the correct method to substitute the bridge for erased generic types. + // if this doesn't work, the user need to exclude the method and write an additional test. + Method wrapperMethod = wrapper.getClass().getDeclaredMethod( + delegateMethod.getName(), + delegateMethod.getParameterTypes()); + + // things get a bit fuzzy here, best effort to find a match but this might end up with a wrong method. + if (wrapperMethod.isBridge()) { + for (Method method : wrapper.getClass().getDeclaredMethods()) { + if (!method.isBridge() + && method.getName().equals(
[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart
[ https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348414#comment-16348414 ] ASF GitHub Bot commented on FLINK-8484: --- Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Good point. An ugly workaround would be to store a timestamp when the ending number is being set on a shard, and provide a configurable/sufficiently enough (eg. 7 days) window. It would exclude the dependency on the Kinesis API. > Kinesis consumer re-reads closed shards on job restart > -- > > Key: FLINK-8484 > URL: https://issues.apache.org/jira/browse/FLINK-8484 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2 >Reporter: Philip Luppens >Assignee: Philip Luppens >Priority: Blocker > Labels: bug, flink, kinesis > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > We’re using the connector to subscribe to streams varying from 1 to a 100 > shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis > stream up and down during peak times. What we’ve noticed is that, while we > were having closed shards, any Flink job restart with check- or save-point > would result in shards being re-read from the event horizon, duplicating our > events. > > We started checking the checkpoint state, and found that the shards were > stored correctly with the proper sequence number (including for closed > shards), but that upon restarts, the older closed shards would be read from > the event horizon, as if their restored state would be ignored. > > In the end, we believe that we found the problem: in the > FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned > from the KinesisDataFetcher against the shards’ metadata from the restoration > point, but we do this via a containsKey() call, which means we’ll use the > StreamShardMetadata’s equals() method. However, this checks for all > properties, including the endingSequenceNumber, which might have changed > between the restored state’s checkpoint and our data fetch, thus failing the > equality check, failing the containsKey() check, and resulting in the shard > being re-read from the event horizon, even though it was present in the > restored state. > > We’ve created a workaround where we only check for the shardId and stream > name to restore the state of the shards we’ve already seen, and this seems to > work correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Good point. An ugly workaround would be to store a timestamp when the ending number is being set on a shard, and provide a configurable/sufficiently enough (eg. 7 days) window. It would exclude the dependency on the Kinesis API. ---
[jira] [Commented] (FLINK-8242) ClassCastException in OrcTableSource.toOrcPredicate
[ https://issues.apache.org/jira/browse/FLINK-8242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348411#comment-16348411 ] ASF GitHub Bot commented on FLINK-8242: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5345 Thank you @fhueske. The code looks good now. I will merge this... > ClassCastException in OrcTableSource.toOrcPredicate > --- > > Key: FLINK-8242 > URL: https://issues.apache.org/jira/browse/FLINK-8242 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{OrcTableSource}} tries to cast all predicate literals to > {{Serializable}} in its {{toOrcPredicate()}} method. This fails with a > {{ClassCastException}} if a literal is not serializable. > Instead of failing, we should ignore the predicate and log a WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5345: [FLINK-8242] [orc] Fix predicate push-down of OrcTableSou...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5345 Thank you @fhueske. The code looks good now. I will merge this... ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348408#comment-16348408 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165323982 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.TaskLocalStateStore; +import org.apache.flink.runtime.state.TaskStateManagerImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.Mockito.mock; + +/** + * Test for forwarding of state reporting to and from {@link org.apache.flink.runtime.state.TaskStateManager}. + */ +public class LocalStateForwardingTest { --- End diff -- Same here with `TestLogger`. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348405#comment-16348405 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165323575 --- Diff: flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.mockito.Mockito; + +import java.lang.reflect.Array; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; + +/** + * Helper class with a method that attempt to automatically test method forwarding between a delegate and a wrapper. + */ +public class MethodForwardingTestUtil { + + /** +* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper +* class is a subtype of the delegate. This ignores methods that are inherited from Object. +* +* @param delegateClass the class for the delegate. +* @param wrapperFactory factory that produces a wrapper from a delegate. +* @param type of the delegate +* @param type of the wrapper +*/ + public static void testMethodForwarding( + Class delegateClass, + Function wrapperFactory) { + testMethodForwarding(delegateClass, wrapperFactory, Collections.emptySet()); + } + + /** +* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper +* class is a subtype of the delegate. Methods can be remapped in case that the implementation does not call the +* original method. Remapping to null skips the method. This ignores methods that are inherited from Object. +* +* @param delegateClass the class for the delegate. +* @param wrapperFactory factory that produces a wrapper from a delegate. +* @param skipMethodSet set of methods to ignore. +* @param type of the delegate +* @param type of the wrapper +*/ + public static void testMethodForwarding( + Class delegateClass, + Function wrapperFactory, + Set skipMethodSet) { + + Preconditions.checkNotNull(delegateClass); + Preconditions.checkNotNull(wrapperFactory); + Preconditions.checkNotNull(skipMethodSet); + + D delegate = spy(delegateClass); + W wrapper = wrapperFactory.apply(delegate); + + // ensure that wrapper is a subtype of delegate + Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass())); + + for (Method delegateMethod : delegateClass.getMethods()) { + + if (checkSkipMethodForwardCheck(delegateMethod, skipMethodSet)) { + continue; + } + + try { + // find the correct method to substitute the bridge for erased generic types. + // if this doesn't work, the user need to exclude the method and write an additional test. + Method wrapperMethod = wrapper.getClass().getDeclaredMethod( + delegateMethod.getName(), + delegateMethod.getParameterTypes()); + + // things get a bit fuzzy here, best effort to find a match but this might end up with a wrong method. + if (wrapperMethod.isBridge()) {
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165323982 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.TaskLocalStateStore; +import org.apache.flink.runtime.state.TaskStateManagerImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.Mockito.mock; + +/** + * Test for forwarding of state reporting to and from {@link org.apache.flink.runtime.state.TaskStateManager}. + */ +public class LocalStateForwardingTest { --- End diff -- Same here with `TestLogger`. ---
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165323823 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateObjectCollectionTest.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.MethodForwardingTestUtil; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link StateObjectCollection}. + */ +public class StateObjectCollectionTest { --- End diff -- should extend `TestLogger`. ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348406#comment-16348406 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165323823 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateObjectCollectionTest.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.MethodForwardingTestUtil; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link StateObjectCollection}. + */ +public class StateObjectCollectionTest { --- End diff -- should extend `TestLogger`. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165323575 --- Diff: flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.mockito.Mockito; + +import java.lang.reflect.Array; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; + +/** + * Helper class with a method that attempt to automatically test method forwarding between a delegate and a wrapper. + */ +public class MethodForwardingTestUtil { + + /** +* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper +* class is a subtype of the delegate. This ignores methods that are inherited from Object. +* +* @param delegateClass the class for the delegate. +* @param wrapperFactory factory that produces a wrapper from a delegate. +* @param type of the delegate +* @param type of the wrapper +*/ + public static void testMethodForwarding( + Class delegateClass, + Function wrapperFactory) { + testMethodForwarding(delegateClass, wrapperFactory, Collections.emptySet()); + } + + /** +* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper +* class is a subtype of the delegate. Methods can be remapped in case that the implementation does not call the +* original method. Remapping to null skips the method. This ignores methods that are inherited from Object. +* +* @param delegateClass the class for the delegate. +* @param wrapperFactory factory that produces a wrapper from a delegate. +* @param skipMethodSet set of methods to ignore. +* @param type of the delegate +* @param type of the wrapper +*/ + public static void testMethodForwarding( + Class delegateClass, + Function wrapperFactory, + Set skipMethodSet) { + + Preconditions.checkNotNull(delegateClass); + Preconditions.checkNotNull(wrapperFactory); + Preconditions.checkNotNull(skipMethodSet); + + D delegate = spy(delegateClass); + W wrapper = wrapperFactory.apply(delegate); + + // ensure that wrapper is a subtype of delegate + Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass())); + + for (Method delegateMethod : delegateClass.getMethods()) { + + if (checkSkipMethodForwardCheck(delegateMethod, skipMethodSet)) { + continue; + } + + try { + // find the correct method to substitute the bridge for erased generic types. + // if this doesn't work, the user need to exclude the method and write an additional test. + Method wrapperMethod = wrapper.getClass().getDeclaredMethod( + delegateMethod.getName(), + delegateMethod.getParameterTypes()); + + // things get a bit fuzzy here, best effort to find a match but this might end up with a wrong method. + if (wrapperMethod.isBridge()) { + for (Method method : wrapper.getClass().getDeclaredMethods()) { + if (!method.isBridge() + && method.getName().equals(
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348399#comment-16348399 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165322528 --- Diff: flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.mockito.Mockito; + +import java.lang.reflect.Array; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; + +/** + * Helper class with a method that attempt to automatically test method forwarding between a delegate and a wrapper. --- End diff -- "attempts" or "with methods" > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165322528 --- Diff: flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.mockito.Mockito; + +import java.lang.reflect.Array; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; + +/** + * Helper class with a method that attempt to automatically test method forwarding between a delegate and a wrapper. --- End diff -- "attempts" or "with methods" ---
[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart
[ https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348395#comment-16348395 ] ASF GitHub Bot commented on FLINK-8484: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 Here it is: https://issues.apache.org/jira/browse/FLINK-8542 > Kinesis consumer re-reads closed shards on job restart > -- > > Key: FLINK-8484 > URL: https://issues.apache.org/jira/browse/FLINK-8484 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2 >Reporter: Philip Luppens >Assignee: Philip Luppens >Priority: Blocker > Labels: bug, flink, kinesis > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > We’re using the connector to subscribe to streams varying from 1 to a 100 > shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis > stream up and down during peak times. What we’ve noticed is that, while we > were having closed shards, any Flink job restart with check- or save-point > would result in shards being re-read from the event horizon, duplicating our > events. > > We started checking the checkpoint state, and found that the shards were > stored correctly with the proper sequence number (including for closed > shards), but that upon restarts, the older closed shards would be read from > the event horizon, as if their restored state would be ignored. > > In the end, we believe that we found the problem: in the > FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned > from the KinesisDataFetcher against the shards’ metadata from the restoration > point, but we do this via a containsKey() call, which means we’ll use the > StreamShardMetadata’s equals() method. However, this checks for all > properties, including the endingSequenceNumber, which might have changed > between the restored state’s checkpoint and our data fetch, thus failing the > equality check, failing the containsKey() check, and resulting in the shard > being re-read from the event horizon, even though it was present in the > restored state. > > We’ve created a workaround where we only check for the shardId and stream > name to restore the state of the shards we’ve already seen, and this seems to > work correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348392#comment-16348392 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165321013 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java --- @@ -67,8 +69,11 @@ public TestTaskStateManager( this.jobId = jobId; this.executionAttemptID = executionAttemptID; this.checkpointResponder = checkpointResponder; - this.taskStateSnapshotsByCheckpointId = new HashMap<>(); + this.jobManagerTaskStateSnapshotsByCheckpointId = new HashMap<>(); + this.taskManagerTaskStateSnapshotsByCheckpointId = new HashMap<>(); this.reportedCheckpointId = -1L; + this.subtaskLocalStateBaseDirectory = + new File(System.getProperty("java.io.tmpdir"), "testLocalState_" + UUID.randomUUID()); --- End diff -- Shouldn't this come from something like a `TemporaryFolder` such that the directory get's cleaned up in all cases. Consequently, maybe we should pass in the `subtaskLocalStateBaseDirectory` from the test using this class. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 Here it is: https://issues.apache.org/jira/browse/FLINK-8542 ---
[jira] [Created] (FLINK-8542) Do not indefinitely store closed shard's state in the FlinkKinesisConsumer
Tzu-Li (Gordon) Tai created FLINK-8542: -- Summary: Do not indefinitely store closed shard's state in the FlinkKinesisConsumer Key: FLINK-8542 URL: https://issues.apache.org/jira/browse/FLINK-8542 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Tzu-Li (Gordon) Tai See original discussion here: [https://github.com/apache/flink/pull/5337|https://github.com/apache/flink/pull/5337#issuecomment-362227711] Currently, the Kinesis consumer keeps a list of {{(StreamShardMetadata, SequenceNumber)}} as its state. That list also contains all shards that have been closed already, and is kept in the state indefinitely so that on restore, we know that a closed shard is already fully consumed, The downside of this, is that the state size of the Kinesis consumer can basically grow without bounds, as the consumed Kinesis streams are resharded and more and more closed shards are present. Some possible solutions have been discussed in the linked PR comments. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165321013 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java --- @@ -67,8 +69,11 @@ public TestTaskStateManager( this.jobId = jobId; this.executionAttemptID = executionAttemptID; this.checkpointResponder = checkpointResponder; - this.taskStateSnapshotsByCheckpointId = new HashMap<>(); + this.jobManagerTaskStateSnapshotsByCheckpointId = new HashMap<>(); + this.taskManagerTaskStateSnapshotsByCheckpointId = new HashMap<>(); this.reportedCheckpointId = -1L; + this.subtaskLocalStateBaseDirectory = + new File(System.getProperty("java.io.tmpdir"), "testLocalState_" + UUID.randomUUID()); --- End diff -- Shouldn't this come from something like a `TemporaryFolder` such that the directory get's cleaned up in all cases. Consequently, maybe we should pass in the `subtaskLocalStateBaseDirectory` from the test using this class. ---
[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348383#comment-16348383 ] ASF GitHub Bot commented on FLINK-5820: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165317663 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java --- @@ -19,58 +19,53 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +/** + * Unit tests for the {@link CompletedCheckpoint}. + */ public class CompletedCheckpointTest { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); - /** -* Tests that persistent checkpoints discard their header file. -*/ @Test - public void testDiscard() throws Exception { - File file = tmpFolder.newFile(); - assertEquals(true, file.exists()); - + public void registerStatesAtRegistry() { --- End diff -- What's the reason for this change? > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (trans
[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348384#comment-16348384 ] ASF GitHub Bot commented on FLINK-5820: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165295667 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -18,45 +18,50 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.JobStatus; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * The configuration of a checkpoint, such as whether + * The configuration of a checkpoint. This described whether --- End diff -- nit: type > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (transitively for RocksDB as well), this means a re-structuring of > the storage directories as follows: > {code} > ..//job1-id/ > /shared/<-- shared checkpoint data > /chk-1/... <-- data exclusive to checkpoint 1 > /chk-2/... <-- data exclusive to checkpoint 2 > /chk-3/... <-- data exclusive to checkpoint 3 > ..//job2-id/ > /shared/... > /chk-1/... > /chk-2/... > /chk-3/... > ..//savepoint-1/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > /savepoint-2/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > {code} > This is the umbrella issue for the individual steps needed to address this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348381#comment-16348381 ] ASF GitHub Bot commented on FLINK-5820: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165295768 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -18,45 +18,50 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.JobStatus; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * The configuration of a checkpoint, such as whether + * The configuration of a checkpoint. This described whether * - * The checkpoint should be persisted - * The checkpoint must be full, or may be incremental (not yet implemented) - * The checkpoint format must be the common (cross backend) format, - * or may be state-backend specific (not yet implemented) - * when the checkpoint should be garbage collected + * The checkpoint is s regular checkpoint or a savepoint + * When the checkpoint should be garbage collected * */ public class CheckpointProperties implements Serializable { - private static final long serialVersionUID = -8835900655844879470L; + private static final long serialVersionUID = 2L; - private final boolean forced; + /** Type - checkpoit / savepoint. */ --- End diff -- nit: typo > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (transitively for RocksDB as well), this means a re-structuring of > the storage directories as follows: > {code} > ..//job1-id/ > /shared/<-- shared checkpoint data > /chk-1/... <-- data exclusive to checkpoint 1 > /chk-2/... <-- data exclusive to checkpoint 2 > /chk-3/... <-- data exclusive to checkpoint 3 > ..//job2-id/ > /shared/... > /chk-1/... > /chk-2/... > /chk-3/... > ..
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165295667 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -18,45 +18,50 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.JobStatus; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * The configuration of a checkpoint, such as whether + * The configuration of a checkpoint. This described whether --- End diff -- nit: type ---
[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348382#comment-16348382 ] ASF GitHub Bot commented on FLINK-5820: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165318529 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java --- @@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws Exception { assertTrue(isDirectoryEmpty(directory)); } + // + // Not deleting parent directories + // + + /** +* This test checks that the stream does not check and clean the parent directory +* when encountering a write error. +*/ + @Test + public void testStreamDoesNotTryToCleanUpParentOnError() throws Exception { + final File directory = tempDir.newFolder(); + + // prevent creation of files in that directory + assertTrue(directory.setWritable(false, true)); + checkDirectoryNotWritable(directory); + + FileSystem fs = spy(FileSystem.getLocalFileSystem()); + + FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + stream1.write(new byte[61]); + stream2.write(new byte[61]); + + try { + stream1.closeAndGetHandle(); + fail("this should fail with an exception"); + } catch (IOException ignored) {} + + stream2.close(); + + verify(fs, times(0)).delete(any(Path.class), anyBoolean()); --- End diff -- nit: This seems somewhat brittle because there could be another "delete" method that the handle uses to delete the parent dir. For "future proof-ness"... > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpoi
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165317663 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java --- @@ -19,58 +19,53 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +/** + * Unit tests for the {@link CompletedCheckpoint}. + */ public class CompletedCheckpointTest { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); - /** -* Tests that persistent checkpoints discard their header file. -*/ @Test - public void testDiscard() throws Exception { - File file = tmpFolder.newFile(); - assertEquals(true, file.exists()); - + public void registerStatesAtRegistry() { --- End diff -- What's the reason for this change? ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165318529 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java --- @@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws Exception { assertTrue(isDirectoryEmpty(directory)); } + // + // Not deleting parent directories + // + + /** +* This test checks that the stream does not check and clean the parent directory +* when encountering a write error. +*/ + @Test + public void testStreamDoesNotTryToCleanUpParentOnError() throws Exception { + final File directory = tempDir.newFolder(); + + // prevent creation of files in that directory + assertTrue(directory.setWritable(false, true)); + checkDirectoryNotWritable(directory); + + FileSystem fs = spy(FileSystem.getLocalFileSystem()); + + FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + stream1.write(new byte[61]); + stream2.write(new byte[61]); + + try { + stream1.closeAndGetHandle(); + fail("this should fail with an exception"); + } catch (IOException ignored) {} + + stream2.close(); + + verify(fs, times(0)).delete(any(Path.class), anyBoolean()); --- End diff -- nit: This seems somewhat brittle because there could be another "delete" method that the handle uses to delete the parent dir. For "future proof-ness"... ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165295768 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -18,45 +18,50 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.JobStatus; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * The configuration of a checkpoint, such as whether + * The configuration of a checkpoint. This described whether * - * The checkpoint should be persisted - * The checkpoint must be full, or may be incremental (not yet implemented) - * The checkpoint format must be the common (cross backend) format, - * or may be state-backend specific (not yet implemented) - * when the checkpoint should be garbage collected + * The checkpoint is s regular checkpoint or a savepoint + * When the checkpoint should be garbage collected * */ public class CheckpointProperties implements Serializable { - private static final long serialVersionUID = -8835900655844879470L; + private static final long serialVersionUID = 2L; - private final boolean forced; + /** Type - checkpoit / savepoint. */ --- End diff -- nit: typo ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348376#comment-16348376 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165318377 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java --- @@ -227,21 +229,13 @@ protected void cleanup() throws Exception { // has been stopped CLEANUP_LATCH.trigger(); - // wait until handle async exception has been called to proceed with the termination of the - // StreamTask - HANDLE_ASYNC_EXCEPTION_LATCH.await(); + // wait until all async checkpoint threads are terminated, so that no more exceptions can be reported + Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS)); --- End diff -- Where do we trigger the shut down of the `getAsyncOperationsThreadpool`? > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348375#comment-16348375 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165318246 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java --- @@ -227,21 +229,13 @@ protected void cleanup() throws Exception { // has been stopped CLEANUP_LATCH.trigger(); - // wait until handle async exception has been called to proceed with the termination of the - // StreamTask - HANDLE_ASYNC_EXCEPTION_LATCH.await(); --- End diff -- This handle should be deleted since it is no longer needed. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165318377 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java --- @@ -227,21 +229,13 @@ protected void cleanup() throws Exception { // has been stopped CLEANUP_LATCH.trigger(); - // wait until handle async exception has been called to proceed with the termination of the - // StreamTask - HANDLE_ASYNC_EXCEPTION_LATCH.await(); + // wait until all async checkpoint threads are terminated, so that no more exceptions can be reported + Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS)); --- End diff -- Where do we trigger the shut down of the `getAsyncOperationsThreadpool`? ---
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165318246 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java --- @@ -227,21 +229,13 @@ protected void cleanup() throws Exception { // has been stopped CLEANUP_LATCH.trigger(); - // wait until handle async exception has been called to proceed with the termination of the - // StreamTask - HANDLE_ASYNC_EXCEPTION_LATCH.await(); --- End diff -- This handle should be deleted since it is no longer needed. ---
[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart
[ https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348374#comment-16348374 ] ASF GitHub Bot commented on FLINK-8484: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 @pluppens My only concern is that scanning the whole list of shards can be very limited to AWS Kinesis's API invoke rate limitations. Also, we would then only be cleaning up the state on restore, meaning we would kind of be encouraging (in a bad way) Kinesis users to snapshot and restore every once in a while. I think the best solution for that is probably to use a threshold constant as Stephan suggested, but we will need to investigate whether the Kinesis API supports enough information to implement this. I'll open a separate JIRA ticket forr this, so we can properly discuss the issues of pruning closed shard states there. > Kinesis consumer re-reads closed shards on job restart > -- > > Key: FLINK-8484 > URL: https://issues.apache.org/jira/browse/FLINK-8484 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2 >Reporter: Philip Luppens >Assignee: Philip Luppens >Priority: Blocker > Labels: bug, flink, kinesis > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > We’re using the connector to subscribe to streams varying from 1 to a 100 > shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis > stream up and down during peak times. What we’ve noticed is that, while we > were having closed shards, any Flink job restart with check- or save-point > would result in shards being re-read from the event horizon, duplicating our > events. > > We started checking the checkpoint state, and found that the shards were > stored correctly with the proper sequence number (including for closed > shards), but that upon restarts, the older closed shards would be read from > the event horizon, as if their restored state would be ignored. > > In the end, we believe that we found the problem: in the > FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned > from the KinesisDataFetcher against the shards’ metadata from the restoration > point, but we do this via a containsKey() call, which means we’ll use the > StreamShardMetadata’s equals() method. However, this checks for all > properties, including the endingSequenceNumber, which might have changed > between the restored state’s checkpoint and our data fetch, thus failing the > equality check, failing the containsKey() check, and resulting in the shard > being re-read from the event horizon, even though it was present in the > restored state. > > We’ve created a workaround where we only check for the shardId and stream > name to restore the state of the shards we’ve already seen, and this seems to > work correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 @pluppens My only concern is that scanning the whole list of shards can be very limited to AWS Kinesis's API invoke rate limitations. Also, we would then only be cleaning up the state on restore, meaning we would kind of be encouraging (in a bad way) Kinesis users to snapshot and restore every once in a while. I think the best solution for that is probably to use a threshold constant as Stephan suggested, but we will need to investigate whether the Kinesis API supports enough information to implement this. I'll open a separate JIRA ticket forr this, so we can properly discuss the issues of pruning closed shard states there. ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348372#comment-16348372 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165317571 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -916,57 +916,78 @@ private void handleExecutionException(Exception e) { CheckpointingOperation.AsynCheckpointState.COMPLETED, CheckpointingOperation.AsynCheckpointState.RUNNING); - try { - cleanup(); - } catch (Exception cleanupException) { - e.addSuppressed(cleanupException); - } + if (asyncCheckpointState.compareAndSet( + CheckpointingOperation.AsynCheckpointState.RUNNING, + CheckpointingOperation.AsynCheckpointState.DISCARDED)) { --- End diff -- This could be `asyncCheckpointState.compareAndSet(currentAsyncCheckpointState, DISCARDED)`. Then we could remove the compare and set above this if condition. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165317571 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -916,57 +916,78 @@ private void handleExecutionException(Exception e) { CheckpointingOperation.AsynCheckpointState.COMPLETED, CheckpointingOperation.AsynCheckpointState.RUNNING); - try { - cleanup(); - } catch (Exception cleanupException) { - e.addSuppressed(cleanupException); - } + if (asyncCheckpointState.compareAndSet( + CheckpointingOperation.AsynCheckpointState.RUNNING, + CheckpointingOperation.AsynCheckpointState.DISCARDED)) { --- End diff -- This could be `asyncCheckpointState.compareAndSet(currentAsyncCheckpointState, DISCARDED)`. Then we could remove the compare and set above this if condition. ---
[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart
[ https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348367#comment-16348367 ] ASF GitHub Bot commented on FLINK-8484: --- Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Regarding the remark from @StephanEwen: perhaps it would be ok to re-use the `KinesisProxy` to return a list of all shards and compare them to the `sequenceNumsToRestore` to prune any shards that no longer exist? It would delay the restoration, but you'd be sure the state wouldn't grow indefinitely (we were looking at around a 1000 closed shards with a 24 hour retention period, so 365k per year - that's not going to end well). Another option would be to kick off another task periodically to prune them, but that is likely to run into race conditions, so doing it at the safe point of restoration would make more sense to me. > Kinesis consumer re-reads closed shards on job restart > -- > > Key: FLINK-8484 > URL: https://issues.apache.org/jira/browse/FLINK-8484 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2 >Reporter: Philip Luppens >Assignee: Philip Luppens >Priority: Blocker > Labels: bug, flink, kinesis > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > We’re using the connector to subscribe to streams varying from 1 to a 100 > shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis > stream up and down during peak times. What we’ve noticed is that, while we > were having closed shards, any Flink job restart with check- or save-point > would result in shards being re-read from the event horizon, duplicating our > events. > > We started checking the checkpoint state, and found that the shards were > stored correctly with the proper sequence number (including for closed > shards), but that upon restarts, the older closed shards would be read from > the event horizon, as if their restored state would be ignored. > > In the end, we believe that we found the problem: in the > FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned > from the KinesisDataFetcher against the shards’ metadata from the restoration > point, but we do this via a containsKey() call, which means we’ll use the > StreamShardMetadata’s equals() method. However, this checks for all > properties, including the endingSequenceNumber, which might have changed > between the restored state’s checkpoint and our data fetch, thus failing the > equality check, failing the containsKey() check, and resulting in the shard > being re-read from the event horizon, even though it was present in the > restored state. > > We’ve created a workaround where we only check for the shardId and stream > name to restore the state of the shards we’ve already seen, and this seems to > work correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Regarding the remark from @StephanEwen: perhaps it would be ok to re-use the `KinesisProxy` to return a list of all shards and compare them to the `sequenceNumsToRestore` to prune any shards that no longer exist? It would delay the restoration, but you'd be sure the state wouldn't grow indefinitely (we were looking at around a 1000 closed shards with a 24 hour retention period, so 365k per year - that's not going to end well). Another option would be to kick off another task periodically to prune them, but that is likely to run into race conditions, so doing it at the safe point of restoration would make more sense to me. ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348364#comment-16348364 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165315816 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java --- @@ -46,20 +47,28 @@ public void testCancelAndCleanup() throws Exception { operatorSnapshotResult.cancel(); KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class); - RunnableFuture keyedStateManagedFuture = mock(RunnableFuture.class); - when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle); + RunnableFuture> keyedStateManagedFuture = mock(RunnableFuture.class); --- End diff -- Why not using a `DoneFuture` instead of mocking? > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165315816 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java --- @@ -46,20 +47,28 @@ public void testCancelAndCleanup() throws Exception { operatorSnapshotResult.cancel(); KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class); - RunnableFuture keyedStateManagedFuture = mock(RunnableFuture.class); - when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle); + RunnableFuture> keyedStateManagedFuture = mock(RunnableFuture.class); --- End diff -- Why not using a `DoneFuture` instead of mocking? ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348362#comment-16348362 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165315625 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -859,56 +861,77 @@ public void run() { if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { - TaskStateSnapshot acknowledgedState = hasState ? taskOperatorSubtaskStates : null; - - TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager(); - - // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state - // to stateless tasks on restore. This enables simple job modifications that only concern - // stateless without the need to assign them uids to match their (always empty) states. - taskStateManager.reportTaskStateSnapshot( - checkpointMetaData, - checkpointMetrics, - acknowledgedState); - - LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", - owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis); - - LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", - owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedState); + reportCompletedSnapshotStates( + jobManagerTaskOperatorSubtaskStates, + localTaskOperatorSubtaskStates, + asyncDurationMillis); } else { LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", owner.getName(), checkpointMetaData.getCheckpointId()); } } catch (Exception e) { - // the state is completed if an exception occurred in the acknowledgeCheckpoint call - // in order to clean up, we have to set it to RUNNING again. - asyncCheckpointState.compareAndSet( - CheckpointingOperation.AsynCheckpointState.COMPLETED, - CheckpointingOperation.AsynCheckpointState.RUNNING); - - try { - cleanup(); - } catch (Exception cleanupException) { - e.addSuppressed(cleanupException); - } - - Exception checkpointException = new Exception( - "Could not materialize checkpoint " + checkpointId + " for operator " + - owner.getName() + '.', - e); - - owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException( - checkpointMetaData, - checkpointException); + handleExecutionException(e); } finally { owner.cancelables.unregisterCloseable(this); FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } } + private void reportCompletedSnapshotStates( + TaskStateSnapshot acknowledgedTaskStateSnapshot, + TaskStateSnapshot localTaskStateSnapshot, + long asyncDurationMillis) { + + TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager(); + + boolean
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165315625 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -859,56 +861,77 @@ public void run() { if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { - TaskStateSnapshot acknowledgedState = hasState ? taskOperatorSubtaskStates : null; - - TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager(); - - // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state - // to stateless tasks on restore. This enables simple job modifications that only concern - // stateless without the need to assign them uids to match their (always empty) states. - taskStateManager.reportTaskStateSnapshot( - checkpointMetaData, - checkpointMetrics, - acknowledgedState); - - LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", - owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis); - - LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", - owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedState); + reportCompletedSnapshotStates( + jobManagerTaskOperatorSubtaskStates, + localTaskOperatorSubtaskStates, + asyncDurationMillis); } else { LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", owner.getName(), checkpointMetaData.getCheckpointId()); } } catch (Exception e) { - // the state is completed if an exception occurred in the acknowledgeCheckpoint call - // in order to clean up, we have to set it to RUNNING again. - asyncCheckpointState.compareAndSet( - CheckpointingOperation.AsynCheckpointState.COMPLETED, - CheckpointingOperation.AsynCheckpointState.RUNNING); - - try { - cleanup(); - } catch (Exception cleanupException) { - e.addSuppressed(cleanupException); - } - - Exception checkpointException = new Exception( - "Could not materialize checkpoint " + checkpointId + " for operator " + - owner.getName() + '.', - e); - - owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException( - checkpointMetaData, - checkpointException); + handleExecutionException(e); } finally { owner.cancelables.unregisterCloseable(this); FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } } + private void reportCompletedSnapshotStates( + TaskStateSnapshot acknowledgedTaskStateSnapshot, + TaskStateSnapshot localTaskStateSnapshot, + long asyncDurationMillis) { + + TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager(); + + boolean hasAckState = acknowledgedTaskStateSnapshot.hasState(); + boolean hasLocalState = localTaskStateSnapshot.hasState(); + + Preconditions.checkState(hasAckState || !hasLocalState, +
[jira] [Comment Edited] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343115#comment-16343115 ] yanxiaobin edited comment on FLINK-5479 at 2/1/18 10:30 AM: I've also met this problem at the moment. This can cause serious delays!{color:#00}Is there a better solution to the problem?{color} was (Author: backlight): I've also met this problem at the moment. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348340#comment-16348340 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165309957 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.FutureUtil; + +import java.util.concurrent.ExecutionException; + +/** + * TODO write comment. + */ +public class OperatorSnapshotFinalizer { + + private final OperatorSubtaskState jobManagerOwnedState; + private final OperatorSubtaskState taskLocalState; + + public OperatorSnapshotFinalizer( + OperatorSnapshotFutures snapshotFutures) throws ExecutionException, InterruptedException { + + SnapshotResult keyedManaged = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture()); + + SnapshotResult keyedRaw = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture()); + + SnapshotResult operatorManaged = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture()); + + SnapshotResult operatorRaw = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture()); + + jobManagerOwnedState = new OperatorSubtaskState( + extractJobManagerOwned(operatorManaged), + extractJobManagerOwned(operatorRaw), + extractJobManagerOwned(keyedManaged), + extractJobManagerOwned(keyedRaw) + ); + + taskLocalState = new OperatorSubtaskState( + extractTaskLocal(operatorManaged), + extractTaskLocal(operatorRaw), + extractTaskLocal(keyedManaged), + extractTaskLocal(keyedRaw) + ); + } + + public OperatorSubtaskState getTaskLocalState() { + return taskLocalState; + } + + public OperatorSubtaskState getJobManagerOwnedState() { + return jobManagerOwnedState; + } + + private T extractJobManagerOwned(SnapshotResult snapshotResult) { + return snapshotResult != null ? snapshotResult.getJobManagerOwnedSnapshot() : null; --- End diff -- We should add `@Nullable` annotation to make sure that this method can return a `null` value. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165309957 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.FutureUtil; + +import java.util.concurrent.ExecutionException; + +/** + * TODO write comment. + */ +public class OperatorSnapshotFinalizer { + + private final OperatorSubtaskState jobManagerOwnedState; + private final OperatorSubtaskState taskLocalState; + + public OperatorSnapshotFinalizer( + OperatorSnapshotFutures snapshotFutures) throws ExecutionException, InterruptedException { + + SnapshotResult keyedManaged = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture()); + + SnapshotResult keyedRaw = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture()); + + SnapshotResult operatorManaged = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture()); + + SnapshotResult operatorRaw = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture()); + + jobManagerOwnedState = new OperatorSubtaskState( + extractJobManagerOwned(operatorManaged), + extractJobManagerOwned(operatorRaw), + extractJobManagerOwned(keyedManaged), + extractJobManagerOwned(keyedRaw) + ); + + taskLocalState = new OperatorSubtaskState( + extractTaskLocal(operatorManaged), + extractTaskLocal(operatorRaw), + extractTaskLocal(keyedManaged), + extractTaskLocal(keyedRaw) + ); + } + + public OperatorSubtaskState getTaskLocalState() { + return taskLocalState; + } + + public OperatorSubtaskState getJobManagerOwnedState() { + return jobManagerOwnedState; + } + + private T extractJobManagerOwned(SnapshotResult snapshotResult) { + return snapshotResult != null ? snapshotResult.getJobManagerOwnedSnapshot() : null; --- End diff -- We should add `@Nullable` annotation to make sure that this method can return a `null` value. ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348339#comment-16348339 ] ASF GitHub Bot commented on FLINK-8360: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165309670 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.FutureUtil; + +import java.util.concurrent.ExecutionException; + +/** + * TODO write comment. + */ +public class OperatorSnapshotFinalizer { + + private final OperatorSubtaskState jobManagerOwnedState; + private final OperatorSubtaskState taskLocalState; + + public OperatorSnapshotFinalizer( + OperatorSnapshotFutures snapshotFutures) throws ExecutionException, InterruptedException { + + SnapshotResult keyedManaged = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture()); + + SnapshotResult keyedRaw = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture()); + + SnapshotResult operatorManaged = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture()); + + SnapshotResult operatorRaw = + FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture()); + + jobManagerOwnedState = new OperatorSubtaskState( + extractJobManagerOwned(operatorManaged), + extractJobManagerOwned(operatorRaw), + extractJobManagerOwned(keyedManaged), + extractJobManagerOwned(keyedRaw) + ); + + taskLocalState = new OperatorSubtaskState( + extractTaskLocal(operatorManaged), + extractTaskLocal(operatorRaw), + extractTaskLocal(keyedManaged), + extractTaskLocal(keyedRaw) + ); + } + + public OperatorSubtaskState getTaskLocalState() { + return taskLocalState; + } + + public OperatorSubtaskState getJobManagerOwnedState() { + return jobManagerOwnedState; + } + + private T extractJobManagerOwned(SnapshotResult snapshotResult) { + return snapshotResult != null ? snapshotResult.getJobManagerOwnedSnapshot() : null; --- End diff -- Same here with the `null` values. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and