[jira] [Created] (FLINK-8544) JSONKeyValueDeserializationSchema throws NPE when message key is null

2018-02-01 Thread Bill Lee (JIRA)
Bill Lee created FLINK-8544:
---

 Summary: JSONKeyValueDeserializationSchema throws NPE when message 
key is null
 Key: FLINK-8544
 URL: https://issues.apache.org/jira/browse/FLINK-8544
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.0
Reporter: Bill Lee


JSONKeyValueDeserializationSchema call Jaskon to deserialize the message key 
without validation.
 If a message with key == null is read, flink throws an NPE.
{code:java}
@Override
public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
node.set("key", mapper.readValue(messageKey, JsonNode.class)); 
// messageKey is not validate against null.
node.set("value", mapper.readValue(message, JsonNode.class));
{code}

The fix is very straightforward.
{code:java}
if (messageKey == null) {
node.set("key", null)
} else {
node.set("key", mapper.readValue(messageKey, 
JsonNode.class));
}
{code}
If it is appreciated, I would send a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-01 Thread chris snow (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chris snow updated FLINK-8543:
--
Description: 
I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink>(path)
 .setWriter(writer)
 .setBucketer(new DateTimeBucketer>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

!Screen Shot 2018-01-30 at 18.34.51.png!   

The Flink console output is showing an exception being thrown by 
S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster and 
added some additional logging to the checkOpen() method to log the 'key' just 
before the exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
private final OutputStream backupStream;
private final File backupFile;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String key;
private final Progressable progress;
private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;

public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
Progressable progress) throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
(Object)key, (Object)this.backupFile);
this.backupStream = new BufferedOutputStream(new 
FileOutputStream(this.backupFile));
}

void checkOpen() throws IOException {
if (!this.closed.get()) return;

// vv-- Additional logging --vvv

LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


throw new IOException("Output Stream closed");
}

@Override
public void flush() throws IOException {
this.checkOpen();
this.backupStream.flush();
}

@Override
public void close() throws IOException {
if (this.closed.getAndSet(true)) {
return;
}
this.backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
(Object)this.key);
try {
ObjectMetadata om = 
this.fs.newObjectMetadata(this.backupFile.length());
Upload upload = 
this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
ProgressableProgressListener listener = new 
ProgressableProgressListener(this.fs, this.key, upload, this.progress);
upload.addProgressListener((ProgressListener)listener);
upload.waitForUploadResult();
listener.uploadCompleted();
this.fs.finishedWrite(this.key);
}
catch (InterruptedException e) {
throw (InterruptedIOException)new 
InterruptedIOException(e.toString()).initCause(e);
}
catch (AmazonClientException e) {
throw S3AUtils.translateException("saving output", this.key, e);
}
finally {
if (!this.backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", 
(Object)this.backupFile);
}
super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", 
(Object)this.key);
}

@Override
public void write(int b) throws IOException {
this.checkOpen();
this.backupStream.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
this.checkOpen();
this.backupStream.write(b, off, len);
}

static {
}
}
{code}
 

You can see from this addition log output that the S3AOutputStream#close() 
method **appears** to be called before the S3AOutputStream#flush() method:

 
{code:java}
2018-02-01 1

[jira] [Updated] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-01 Thread chris snow (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chris snow updated FLINK-8543:
--
 Attachment: Screen Shot 2018-01-30 at 18.34.51.png
Description: 
I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink>(path)
 .setWriter(writer)
 .setBucketer(new DateTimeBucketer>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

!Screen Shot 2018-01-30 at 18.34.51.png!   

I've grabbed the S3AOutputStream class from my cluster and added some 
additional logging to the checkOpen() method to log the 'key' just before the 
exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
private final OutputStream backupStream;
private final File backupFile;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String key;
private final Progressable progress;
private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;

public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
Progressable progress) throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
(Object)key, (Object)this.backupFile);
this.backupStream = new BufferedOutputStream(new 
FileOutputStream(this.backupFile));
}

void checkOpen() throws IOException {
if (!this.closed.get()) return;

// vv-- Additional logging --vvv

LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


throw new IOException("Output Stream closed");
}

@Override
public void flush() throws IOException {
this.checkOpen();
this.backupStream.flush();
}

@Override
public void close() throws IOException {
if (this.closed.getAndSet(true)) {
return;
}
this.backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
(Object)this.key);
try {
ObjectMetadata om = 
this.fs.newObjectMetadata(this.backupFile.length());
Upload upload = 
this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
ProgressableProgressListener listener = new 
ProgressableProgressListener(this.fs, this.key, upload, this.progress);
upload.addProgressListener((ProgressListener)listener);
upload.waitForUploadResult();
listener.uploadCompleted();
this.fs.finishedWrite(this.key);
}
catch (InterruptedException e) {
throw (InterruptedIOException)new 
InterruptedIOException(e.toString()).initCause(e);
}
catch (AmazonClientException e) {
throw S3AUtils.translateException("saving output", this.key, e);
}
finally {
if (!this.backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", 
(Object)this.backupFile);
}
super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", 
(Object)this.key);
}

@Override
public void write(int b) throws IOException {
this.checkOpen();
this.backupStream.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
this.checkOpen();
this.backupStream.write(b, off, len);
}

static {
}
}
{code}
 

You can see from this addition log output that the S3AOutputStream#close() 
method **appears** to be called before the S3AOutputStream#flush() method:

 
{code:java}
2018-02-01 12:42:20,698 DEBUG org.apache.h

[GitHub] flink pull request #:

2018-02-01 Thread tzulitai
Github user tzulitai commented on the pull request:


https://github.com/apache/flink/commit/a2533f406d46b1c5acb5f70c263f9afad839dffe#commitcomment-27262148
  
In 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java:
In 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
 on line 74:
Question regarding this change:
Why do we already test for `v1_5`? I think we should / can only add that 
variant once we cut the `release-1.5` branch. Otherwise, strictly speaking, any 
savepoints taken with the current `master` snapshot is not a proper 1.5 
savepoint.


---


[jira] [Created] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-01 Thread chris snow (JIRA)
chris snow created FLINK-8543:
-

 Summary: Output Stream closed at 
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
 Key: FLINK-8543
 URL: https://issues.apache.org/jira/browse/FLINK-8543
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.4.0
 Environment: IBM Analytics Engine - 
[https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]

The cluster is based on Hortonworks Data Platform 2.6.2. The following 
components are made available.

Apache Spark 2.1.1 Hadoop 2.7.3
Apache Livy 0.3.0
Knox 0.12.0
Ambari 2.5.2
Anaconda with Python 2.7.13 and 3.5.2 
Jupyter Enterprise Gateway 0.5.0 
HBase 1.1.2 * 
Hive 1.2.1 *
Oozie 4.2.0 *
Flume 1.5.2 * 
Tez 0.7.0 * 
Pig 0.16.0 * 
Sqoop 1.4.6 * 
Slider 0.92.0 * 
Reporter: chris snow


I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink>(path)
 .setWriter(writer)
 .setBucketer(new DateTimeBucketer>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

  !Screen Shot 2018-01-30 at 18.34.51.png!

I've grabbed the S3AOutputStream class from my cluster and added some 
additional logging to the checkOpen() method to log the 'key' just before the 
exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
private final OutputStream backupStream;
private final File backupFile;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String key;
private final Progressable progress;
private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;

public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
Progressable progress) throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
(Object)key, (Object)this.backupFile);
this.backupStream = new BufferedOutputStream(new 
FileOutputStream(this.backupFile));
}

void checkOpen() throws IOException {
if (!this.closed.get()) return;

// vv-- Additional logging --vvv

LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


throw new IOException("Output Stream closed");
}

@Override
public void flush() throws IOException {
this.checkOpen();
this.backupStream.flush();
}

@Override
public void close() throws IOException {
if (this.closed.getAndSet(true)) {
return;
}
this.backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
(Object)this.key);
try {
ObjectMetadata om = 
this.fs.newObjectMetadata(this.backupFile.length());
Upload upload = 
this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
ProgressableProgressListener listener = new 
ProgressableProgressListener(this.fs, this.key, upload, this.progress);
upload.addProgressListener((ProgressListener)listener);
upload.waitForUploadResult();
listener.uploadCompleted();
this.fs.finishedWrite(this.key);
}
catch (InterruptedException e) {
throw (InterruptedIOException)new 
InterruptedIOException(e.toString()).initCause(e);
}
catch (AmazonClientException e) {
throw S3AUtils.translateException("saving output", this.key, e);
}
finally {
if (!this.backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", 
(Object)this.backupFile);
}
 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165349688
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() {
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointOptions.CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture> 
snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " +
-   checkpointTimestamp + " . Returning 
null.");
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
-
-   snapshotOperation.takeSnapshot();
-
-   return new FutureTask>(
-   () -> snapshotOperation.materializeSnapshot()
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-   }
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
-   @Override
-   protected void done() {
-   
snapshotOperation.releaseResources(isCancelled());
+   try {
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
+   restoreOperation.doRestore(restoreState);
}
-   };
+   } catch (Exception ex) {
+   dispose();
+   throw ex;
+   }
}
 
-   private RunnableFuture> snapshotFully(
-   final long checkpointId,
-   final long timestamp,
-   final CheckpointStreamFactory streamFactory) throws Exception {
-
-   long startTime = System.currentTimeMillis();
-   final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
-
-   final RocksDBFullSnapshotOperation snapshotOperation;
-
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " + timestamp +
-   " . Returning null.");
-   }
+   @Override
+   public void notifyCheckpointComplete(long completedCheckpointId) {
 
-   return 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165349012
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() {
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointOptions.CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture> 
snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " +
-   checkpointTimestamp + " . Returning 
null.");
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
-
-   snapshotOperation.takeSnapshot();
-
-   return new FutureTask>(
-   () -> snapshotOperation.materializeSnapshot()
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-   }
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
-   @Override
-   protected void done() {
-   
snapshotOperation.releaseResources(isCancelled());
+   try {
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
+   restoreOperation.doRestore(restoreState);
}
-   };
+   } catch (Exception ex) {
+   dispose();
+   throw ex;
+   }
}
 
-   private RunnableFuture> snapshotFully(
-   final long checkpointId,
-   final long timestamp,
-   final CheckpointStreamFactory streamFactory) throws Exception {
-
-   long startTime = System.currentTimeMillis();
-   final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
-
-   final RocksDBFullSnapshotOperation snapshotOperation;
-
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " + timestamp +
-   " . Returning null.");
-   }
+   @Override
+   public void notifyCheckpointComplete(long completedCheckpointId) {
 
-   return 

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348543#comment-16348543
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165349012
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() {
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointOptions.CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture> 
snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " +
-   checkpointTimestamp + " . Returning 
null.");
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
-
-   snapshotOperation.takeSnapshot();
-
-   return new FutureTask>(
-   () -> snapshotOperation.materializeSnapshot()
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-   }
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
-   @Override
-   protected void done() {
-   
snapshotOperation.releaseResources(isCancelled());
+   try {
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
+   restoreOperation.doRestore(restoreState);
}
-   };
+   } catch (Exception ex) {
+   dispose();
+   throw ex;
+   }
}
 
-   private RunnableFuture> snapshotFully(
-   final long checkpointId,
-   final long timestamp,
-   final CheckpointStreamFactory streamFactory) throws Exception {
-
-   long startTime = System.currentTimeMillis();
-   final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
-
-   final RocksDBFullSnapshotOperation snapshotOperation;
-
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on 

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348525#comment-16348525
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165344648
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -630,19 +506,210 @@ public int numStateEntries(Object namespace) {
return sum;
}
 
-   public  StateTable 
newStateTable(RegisteredKeyedBackendStateMetaInfo newMetaInfo) {
-   return asynchronousSnapshots ?
-   new CopyOnWriteStateTable<>(this, newMetaInfo) :
-   new NestedMapsStateTable<>(this, newMetaInfo);
-   }
-
@Override
public boolean supportsAsynchronousSnapshots() {
-   return asynchronousSnapshots;
+   return snapshotStrategy.isAsynchronous();
}
 
@VisibleForTesting
public FsStateBackend.LocalRecoveryConfig getLocalRecoveryConfig() {
return localRecoveryConfig;
}
+
+   /**
+* Base class for the snapshots of the heap backend that outlines the 
algorithm and offers some hooks to realize
+* the concrete strategies.
+*/
+   private abstract class HeapSnapshotStrategy implements 
SnapshotStrategy> {
+
+   @Override
+   public RunnableFuture> 
performSnapshot(
+   long checkpointId,
+   long timestamp,
+   CheckpointStreamFactory streamFactory,
+   CheckpointOptions checkpointOptions) throws Exception {
+
+   if (!hasRegisteredState()) {
+   return DoneFuture.nullValue();
+   }
+
+   long syncStartTime = System.currentTimeMillis();
+
+   Preconditions.checkState(stateTables.size() <= 
Short.MAX_VALUE,
+   "Too many KV-States: " + stateTables.size() +
+   ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
+
+   List> metaInfoSnapshots =
+   new ArrayList<>(stateTables.size());
+
+   final Map kVStateToId = new 
HashMap<>(stateTables.size());
+
+   final Map, StateTableSnapshot> 
cowStateStableSnapshots =
+   new HashedMap(stateTables.size());
+
+   for (Map.Entry> kvState : 
stateTables.entrySet()) {
+   kVStateToId.put(kvState.getKey(), 
kVStateToId.size());
+   StateTable stateTable = 
kvState.getValue();
+   if (null != stateTable) {
+   
metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
+   cowStateStableSnapshots.put(stateTable, 
stateTable.createSnapshot());
+   }
+   }
+
+   final KeyedBackendSerializationProxy 
serializationProxy =
+   new KeyedBackendSerializationProxy<>(
+   keySerializer,
+   metaInfoSnapshots,
+   
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
keyGroupCompressionDecorator));
+
+   //--- 
this becomes the end of sync part
+
+   // implementation of the async IO operation, based on 
FutureTask
+   final 
AbstractAsyncCallableWithResources> ioCallable 
=
+   new 
AbstractAsyncCallableWithResources>() {
+
+   
CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
+
+   @Override
+   protected void acquireResources() 
throws Exception {
+   stream = 
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+   
cancelStreamRegistry.registerCloseable(stream);
+   }
+
+   @Override
+   protected void releaseResources() 
throws Exception {
+
+   if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
+   
IOUtils.closeQuietly(str

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165344648
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -630,19 +506,210 @@ public int numStateEntries(Object namespace) {
return sum;
}
 
-   public  StateTable 
newStateTable(RegisteredKeyedBackendStateMetaInfo newMetaInfo) {
-   return asynchronousSnapshots ?
-   new CopyOnWriteStateTable<>(this, newMetaInfo) :
-   new NestedMapsStateTable<>(this, newMetaInfo);
-   }
-
@Override
public boolean supportsAsynchronousSnapshots() {
-   return asynchronousSnapshots;
+   return snapshotStrategy.isAsynchronous();
}
 
@VisibleForTesting
public FsStateBackend.LocalRecoveryConfig getLocalRecoveryConfig() {
return localRecoveryConfig;
}
+
+   /**
+* Base class for the snapshots of the heap backend that outlines the 
algorithm and offers some hooks to realize
+* the concrete strategies.
+*/
+   private abstract class HeapSnapshotStrategy implements 
SnapshotStrategy> {
+
+   @Override
+   public RunnableFuture> 
performSnapshot(
+   long checkpointId,
+   long timestamp,
+   CheckpointStreamFactory streamFactory,
+   CheckpointOptions checkpointOptions) throws Exception {
+
+   if (!hasRegisteredState()) {
+   return DoneFuture.nullValue();
+   }
+
+   long syncStartTime = System.currentTimeMillis();
+
+   Preconditions.checkState(stateTables.size() <= 
Short.MAX_VALUE,
+   "Too many KV-States: " + stateTables.size() +
+   ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
+
+   List> metaInfoSnapshots =
+   new ArrayList<>(stateTables.size());
+
+   final Map kVStateToId = new 
HashMap<>(stateTables.size());
+
+   final Map, StateTableSnapshot> 
cowStateStableSnapshots =
+   new HashedMap(stateTables.size());
+
+   for (Map.Entry> kvState : 
stateTables.entrySet()) {
+   kVStateToId.put(kvState.getKey(), 
kVStateToId.size());
+   StateTable stateTable = 
kvState.getValue();
+   if (null != stateTable) {
+   
metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
+   cowStateStableSnapshots.put(stateTable, 
stateTable.createSnapshot());
+   }
+   }
+
+   final KeyedBackendSerializationProxy 
serializationProxy =
+   new KeyedBackendSerializationProxy<>(
+   keySerializer,
+   metaInfoSnapshots,
+   
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
keyGroupCompressionDecorator));
+
+   //--- 
this becomes the end of sync part
+
+   // implementation of the async IO operation, based on 
FutureTask
+   final 
AbstractAsyncCallableWithResources> ioCallable 
=
+   new 
AbstractAsyncCallableWithResources>() {
+
+   
CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
+
+   @Override
+   protected void acquireResources() 
throws Exception {
+   stream = 
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+   
cancelStreamRegistry.registerCloseable(stream);
+   }
+
+   @Override
+   protected void releaseResources() 
throws Exception {
+
+   if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
+   
IOUtils.closeQuietly(stream);
+   stream = null;
+   }
+
+   for (StateTableSnapshot 
tableSnapshot : cowStateStableSnapshots.values()) 

[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165344021
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 ---
@@ -19,58 +19,53 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Unit tests for the {@link CompletedCheckpoint}.
+ */
 public class CompletedCheckpointTest {
 
@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-   /**
-* Tests that persistent checkpoints discard their header file.
-*/
@Test
-   public void testDiscard() throws Exception {
-   File file = tmpFolder.newFile();
-   assertEquals(true, file.exists());
-
+   public void registerStatesAtRegistry() {
--- End diff --

The test whether state handles are correctly registered at the 
SharedStateRegistry was originally just sneakily added to a pre-existing 
metadata file cleanup test. That did not seem right ;-)

This factors the test out into a separate method. The test method should be 
called `testRegisterStatesAtRegistry` instead of `registerStatesAtRegistry`. 
Will change that...


---


[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348522#comment-16348522
 ] 

ASF GitHub Bot commented on FLINK-5820:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165344021
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 ---
@@ -19,58 +19,53 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Unit tests for the {@link CompletedCheckpoint}.
+ */
 public class CompletedCheckpointTest {
 
@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-   /**
-* Tests that persistent checkpoints discard their header file.
-*/
@Test
-   public void testDiscard() throws Exception {
-   File file = tmpFolder.newFile();
-   assertEquals(true, file.exists());
-
+   public void registerStatesAtRegistry() {
--- End diff --

The test whether state handles are correctly registered at the 
SharedStateRegistry was originally just sneakily added to a pre-existing 
metadata file cleanup test. That did not seem right ;-)

This factors the test out into a separate method. The test method should be 
called `testRegisterStatesAtRegistry` instead of `registerStatesAtRegistry`. 
Will change that...


> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend nee

[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348516#comment-16348516
 ] 

ASF GitHub Bot commented on FLINK-5820:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165343453
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 ---
@@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws 
Exception {
assertTrue(isDirectoryEmpty(directory));
}
 
+   // 

+   //  Not deleting parent directories
+   // 

+
+   /**
+* This test checks that the stream does not check and clean the parent 
directory
+* when encountering a write error.
+*/
+   @Test
+   public void testStreamDoesNotTryToCleanUpParentOnError() throws 
Exception {
+   final File directory = tempDir.newFolder();
+
+   // prevent creation of files in that directory
+   assertTrue(directory.setWritable(false, true));
+   checkDirectoryNotWritable(directory);
+
+   FileSystem fs = spy(FileSystem.getLocalFileSystem());
+
+   FsCheckpointStateOutputStream stream1 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   FsCheckpointStateOutputStream stream2 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   stream1.write(new byte[61]);
+   stream2.write(new byte[61]);
+
+   try {
+   stream1.closeAndGetHandle();
+   fail("this should fail with an exception");
+   } catch (IOException ignored) {}
+
+   stream2.close();
+
+   verify(fs, times(0)).delete(any(Path.class), anyBoolean());
--- End diff --

Will add an additional check that the directory still exists


> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348518#comment-16348518
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165343607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Interface for different snapshot approaches in state backends. 
Implementing classes should ideally be stateless or at
+ * least threadsafe, i.e. this is a functional interface and is can be 
called in parallel by multiple checkpoints.
+ *
+ * @param  type of the returned state object that represents the result 
of the snapshot operation.
+ */
+public interface SnapshotStrategy {
--- End diff --

If this interface is functional, then we could also annotate it with 
`@FunctionalInterface`


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165343607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Interface for different snapshot approaches in state backends. 
Implementing classes should ideally be stateless or at
+ * least threadsafe, i.e. this is a functional interface and is can be 
called in parallel by multiple checkpoints.
+ *
+ * @param  type of the returned state object that represents the result 
of the snapshot operation.
+ */
+public interface SnapshotStrategy {
--- End diff --

If this interface is functional, then we could also annotate it with 
`@FunctionalInterface`


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165343453
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 ---
@@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws 
Exception {
assertTrue(isDirectoryEmpty(directory));
}
 
+   // 

+   //  Not deleting parent directories
+   // 

+
+   /**
+* This test checks that the stream does not check and clean the parent 
directory
+* when encountering a write error.
+*/
+   @Test
+   public void testStreamDoesNotTryToCleanUpParentOnError() throws 
Exception {
+   final File directory = tempDir.newFolder();
+
+   // prevent creation of files in that directory
+   assertTrue(directory.setWritable(false, true));
+   checkDirectoryNotWritable(directory);
+
+   FileSystem fs = spy(FileSystem.getLocalFileSystem());
+
+   FsCheckpointStateOutputStream stream1 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   FsCheckpointStateOutputStream stream2 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   stream1.write(new byte[61]);
+   stream2.write(new byte[61]);
+
+   try {
+   stream1.closeAndGetHandle();
+   fail("this should fail with an exception");
+   } catch (IOException ignored) {}
+
+   stream2.close();
+
+   verify(fs, times(0)).delete(any(Path.class), anyBoolean());
--- End diff --

Will add an additional check that the directory still exists


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348513#comment-16348513
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165342796
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 ---
@@ -137,9 +138,12 @@ public static void main(String[] args) throws 
Exception {
 
System.err.println("creating task");
 
+   TemporaryFolder temporaryFolder = new TemporaryFolder();
--- End diff --

Making it a `ClassRule` or a `Rule` will take care of the clean up.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165343021
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 ---
@@ -156,30 +157,38 @@ public void acknowledgeCheckpoint(
}
};
 
-   TaskLocalStateStore taskLocalStateStore = new 
TaskLocalStateStore(jobID, jobVertexID, subtaskIdx) {
-   @Override
-   public void storeLocalState(
-   @Nonnull CheckpointMetaData checkpointMetaData,
-   @Nullable TaskStateSnapshot localState) {
-
-   Assert.assertEquals(tm, localState);
-   tmReported.set(true);
-   }
-   };
+   TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-   TaskStateManagerImpl taskStateManager =
-   new TaskStateManagerImpl(
-   jobID,
-   executionAttemptID,
-   taskLocalStateStore,
-   null,
-   checkpointResponder);
-
-   taskStateManager.reportTaskStateSnapshots(
-   checkpointMetaData,
-   checkpointMetrics,
-   jm,
-   tm);
+   try {
+   TaskLocalStateStore taskLocalStateStore =
+   new TaskLocalStateStore(jobID, jobVertexID, 
subtaskIdx, temporaryFolder.newFolder()) {
+   @Override
+   public void storeLocalState(
+   @Nonnull CheckpointMetaData 
checkpointMetaData,
+   @Nullable TaskStateSnapshot 
localState) {
+
+   Assert.assertEquals(tm, 
localState);
+   tmReported.set(true);
+   }
+   };
+
+   TaskStateManagerImpl taskStateManager =
+   new TaskStateManagerImpl(
+   jobID,
+   executionAttemptID,
+   taskLocalStateStore,
+   null,
+   checkpointResponder);
+
+   taskStateManager.reportTaskStateSnapshots(
+   checkpointMetaData,
+   checkpointMetrics,
+   jm,
+   tm);
+   } catch (Exception ex) {
+   temporaryFolder.delete();
+   throw new RuntimeException(ex);
--- End diff --

Why do we throw a `RuntimeException`?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348514#comment-16348514
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165343021
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 ---
@@ -156,30 +157,38 @@ public void acknowledgeCheckpoint(
}
};
 
-   TaskLocalStateStore taskLocalStateStore = new 
TaskLocalStateStore(jobID, jobVertexID, subtaskIdx) {
-   @Override
-   public void storeLocalState(
-   @Nonnull CheckpointMetaData checkpointMetaData,
-   @Nullable TaskStateSnapshot localState) {
-
-   Assert.assertEquals(tm, localState);
-   tmReported.set(true);
-   }
-   };
+   TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-   TaskStateManagerImpl taskStateManager =
-   new TaskStateManagerImpl(
-   jobID,
-   executionAttemptID,
-   taskLocalStateStore,
-   null,
-   checkpointResponder);
-
-   taskStateManager.reportTaskStateSnapshots(
-   checkpointMetaData,
-   checkpointMetrics,
-   jm,
-   tm);
+   try {
+   TaskLocalStateStore taskLocalStateStore =
+   new TaskLocalStateStore(jobID, jobVertexID, 
subtaskIdx, temporaryFolder.newFolder()) {
+   @Override
+   public void storeLocalState(
+   @Nonnull CheckpointMetaData 
checkpointMetaData,
+   @Nullable TaskStateSnapshot 
localState) {
+
+   Assert.assertEquals(tm, 
localState);
+   tmReported.set(true);
+   }
+   };
+
+   TaskStateManagerImpl taskStateManager =
+   new TaskStateManagerImpl(
+   jobID,
+   executionAttemptID,
+   taskLocalStateStore,
+   null,
+   checkpointResponder);
+
+   taskStateManager.reportTaskStateSnapshots(
+   checkpointMetaData,
+   checkpointMetrics,
+   jm,
+   tm);
+   } catch (Exception ex) {
+   temporaryFolder.delete();
+   throw new RuntimeException(ex);
--- End diff --

Why do we throw a `RuntimeException`?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165342796
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 ---
@@ -137,9 +138,12 @@ public static void main(String[] args) throws 
Exception {
 
System.err.println("creating task");
 
+   TemporaryFolder temporaryFolder = new TemporaryFolder();
--- End diff --

Making it a `ClassRule` or a `Rule` will take care of the clean up.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165342545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -97,6 +104,39 @@ public void testStateReportingAndRetrieving() {

Assert.assertNull(taskStateManager.operatorStates(operatorID_3));
}
 
+   /**
+* This tests if the {@link TaskStateManager} properly returns the the 
subtask local state dir from the
+* corresponding {@link TaskLocalStateStore}.
+*/
+   @Test
+   public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() 
throws IOException {
+   JobID jobID = new JobID(42L, 43L);
+   JobVertexID jobVertexID = new JobVertexID(12L, 34L);
+   ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID(23L, 24L);
+   TestCheckpointResponder checkpointResponderMock = new 
TestCheckpointResponder();
+
+   TemporaryFolder tmpFolder = new TemporaryFolder();
+
+   try {
+   tmpFolder.create();
+   TaskLocalStateStore taskLocalStateStore =
+   new TaskLocalStateStore(jobID, jobVertexID, 13, 
tmpFolder.newFolder());
+
+   TaskStateManager taskStateManager = taskStateManager(
+   jobID,
+   executionAttemptID,
+   checkpointResponderMock,
+   null,
+   taskLocalStateStore);
+
+   Assert.assertEquals(
+   
taskLocalStateStore.getSubtaskLocalStateBaseDirectory(),
+   
taskStateManager.getSubtaskLocalStateBaseDirectory());
--- End diff --

Why does the `TaskStateManager` knows about the subtask local state base 
directory?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348511#comment-16348511
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165342545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -97,6 +104,39 @@ public void testStateReportingAndRetrieving() {

Assert.assertNull(taskStateManager.operatorStates(operatorID_3));
}
 
+   /**
+* This tests if the {@link TaskStateManager} properly returns the the 
subtask local state dir from the
+* corresponding {@link TaskLocalStateStore}.
+*/
+   @Test
+   public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() 
throws IOException {
+   JobID jobID = new JobID(42L, 43L);
+   JobVertexID jobVertexID = new JobVertexID(12L, 34L);
+   ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID(23L, 24L);
+   TestCheckpointResponder checkpointResponderMock = new 
TestCheckpointResponder();
+
+   TemporaryFolder tmpFolder = new TemporaryFolder();
+
+   try {
+   tmpFolder.create();
+   TaskLocalStateStore taskLocalStateStore =
+   new TaskLocalStateStore(jobID, jobVertexID, 13, 
tmpFolder.newFolder());
+
+   TaskStateManager taskStateManager = taskStateManager(
+   jobID,
+   executionAttemptID,
+   checkpointResponderMock,
+   null,
+   taskLocalStateStore);
+
+   Assert.assertEquals(
+   
taskLocalStateStore.getSubtaskLocalStateBaseDirectory(),
+   
taskStateManager.getSubtaskLocalStateBaseDirectory());
--- End diff --

Why does the `TaskStateManager` knows about the subtask local state base 
directory?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...

2018-02-01 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5394
  
It may not be a problem in this test, but I wanted to raise that this 
pattern is a bit dangerous.
If the thread ever gets interrupted while 'running' is still true, this 
goes into a hot loop constantly throwing exceptions: Every time it falls 
through the loop and attempts to sleep again, it will immediately throw an 
Interrupted Exception.


---


[jira] [Commented] (FLINK-6571) InfiniteSource in SourceStreamOperatorTest should deal with InterruptedExceptions

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348512#comment-16348512
 ] 

ASF GitHub Bot commented on FLINK-6571:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5394
  
It may not be a problem in this test, but I wanted to raise that this 
pattern is a bit dangerous.
If the thread ever gets interrupted while 'running' is still true, this 
goes into a hot loop constantly throwing exceptions: Every time it falls 
through the loop and attempts to sleep again, it will immediately throw an 
Interrupted Exception.


> InfiniteSource in SourceStreamOperatorTest should deal with 
> InterruptedExceptions
> -
>
> Key: FLINK-6571
> URL: https://issues.apache.org/jira/browse/FLINK-6571
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0, 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> So this is a new one: i got a failing test 
> ({{testNoMaxWatermarkOnAsyncStop}}) due to an uncatched InterruptedException.
> {code}
> [00:28:15] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
> 0.828 sec <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest
> [00:28:15] 
> testNoMaxWatermarkOnAsyncStop(org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest)
>   Time elapsed: 0 sec  <<< ERROR!
> [00:28:15] java.lang.InterruptedException: sleep interrupted
> [00:28:15]at java.lang.Thread.sleep(Native Method)
> [00:28:15]at 
> org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest$InfiniteSource.run(StreamSourceOperatorTest.java:343)
> [00:28:15]at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> [00:28:15]at 
> org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest.testNoMaxWatermarkOnAsyncStop(StreamSourceOperatorTest.java:176)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348510#comment-16348510
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165342341
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -97,6 +104,39 @@ public void testStateReportingAndRetrieving() {

Assert.assertNull(taskStateManager.operatorStates(operatorID_3));
}
 
+   /**
+* This tests if the {@link TaskStateManager} properly returns the the 
subtask local state dir from the
+* corresponding {@link TaskLocalStateStore}.
+*/
+   @Test
+   public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() 
throws IOException {
+   JobID jobID = new JobID(42L, 43L);
+   JobVertexID jobVertexID = new JobVertexID(12L, 34L);
+   ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID(23L, 24L);
+   TestCheckpointResponder checkpointResponderMock = new 
TestCheckpointResponder();
+
+   TemporaryFolder tmpFolder = new TemporaryFolder();
--- End diff --

Maybe `ClassRule`?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165342341
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -97,6 +104,39 @@ public void testStateReportingAndRetrieving() {

Assert.assertNull(taskStateManager.operatorStates(operatorID_3));
}
 
+   /**
+* This tests if the {@link TaskStateManager} properly returns the the 
subtask local state dir from the
+* corresponding {@link TaskLocalStateStore}.
+*/
+   @Test
+   public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() 
throws IOException {
+   JobID jobID = new JobID(42L, 43L);
+   JobVertexID jobVertexID = new JobVertexID(12L, 34L);
+   ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID(23L, 24L);
+   TestCheckpointResponder checkpointResponderMock = new 
TestCheckpointResponder();
+
+   TemporaryFolder tmpFolder = new TemporaryFolder();
--- End diff --

Maybe `ClassRule`?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348508#comment-16348508
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165342139
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -25,17 +25,24 @@
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
 
 import static org.mockito.Mockito.mock;
 
 public class TaskStateManagerImplTest {
--- End diff --

`extends TestLogger`.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165342139
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -25,17 +25,24 @@
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
 
 import static org.mockito.Mockito.mock;
 
 public class TaskStateManagerImplTest {
--- End diff --

`extends TestLogger`.


---


[GitHub] flink issue #5380: [hotfix][connectors] Fix log format strings

2018-02-01 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5380
  
Please merge for `master` and `release-1.4`...


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348506#comment-16348506
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165341901
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import 
org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+
+public class TaskExecutorLocalStateStoresManagerTest {
+
+   /**
+* This tests that the creation of {@link TaskManagerServices} 
correctly creates the local state root directory
+* for the {@link TaskExecutorLocalStateStoresManager} with the 
configured root directory.
+*/
+   @Test
+   public void testCreationFromConfig() throws Exception {
+
+   final Configuration config = new Configuration();
+
+   final String rootDirString = "localStateRoot";
+   
config.setString(ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY, 
rootDirString);
+
+   final ResourceID tmResourceID = ResourceID.generate();
+
+   TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+   TaskManagerServices taskManagerServices =
+   
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
tmResourceID);
+
+   TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskStateManager();
+
+   Assert.assertEquals(
+   new File(rootDirString, 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+   taskStateManager.getLocalStateRootDirectory());
+
+   Assert.assertEquals("localState", 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
+   }
+
+   /**
+* This tests that the creation of {@link TaskManagerServices} 
correctly falls back to the first tmp directory of
+* the IOManager as default for the local state root directory.
+*/
+   @Test
+   public void testCreationFromConfigDefault() throws Exception {
+
+   final Configuration config = new Configuration();
+
+   final ResourceID tmResourceID = ResourceID.generate();
+
+   TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+   TaskManagerServices taskManagerServices =
+   
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
tmResourceID);
+
+   TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskStateManager();
+
+   Assert.assertEquals(
+   new 
File(taskManagerServicesConfiguration.getTmpDirPaths()[0], 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+   taskStateManager.getLocalStateRootDirectory());
+   }
+
+   /**
+* This tests that the {@link TaskExecutorLocalStateStores

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348505#comment-16348505
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165341653
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import 
org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+
+public class TaskExecutorLocalStateStoresManagerTest {
+
+   /**
+* This tests that the creation of {@link TaskManagerServices} 
correctly creates the local state root directory
+* for the {@link TaskExecutorLocalStateStoresManager} with the 
configured root directory.
+*/
+   @Test
+   public void testCreationFromConfig() throws Exception {
+
+   final Configuration config = new Configuration();
+
+   final String rootDirString = "localStateRoot";
+   
config.setString(ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY, 
rootDirString);
+
+   final ResourceID tmResourceID = ResourceID.generate();
+
+   TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+   TaskManagerServices taskManagerServices =
+   
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
tmResourceID);
+
+   TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskStateManager();
+
+   Assert.assertEquals(
+   new File(rootDirString, 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+   taskStateManager.getLocalStateRootDirectory());
+
+   Assert.assertEquals("localState", 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
--- End diff --

This assertion seems to be redundant.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165341901
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import 
org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+
+public class TaskExecutorLocalStateStoresManagerTest {
+
+   /**
+* This tests that the creation of {@link TaskManagerServices} 
correctly creates the local state root directory
+* for the {@link TaskExecutorLocalStateStoresManager} with the 
configured root directory.
+*/
+   @Test
+   public void testCreationFromConfig() throws Exception {
+
+   final Configuration config = new Configuration();
+
+   final String rootDirString = "localStateRoot";
+   
config.setString(ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY, 
rootDirString);
+
+   final ResourceID tmResourceID = ResourceID.generate();
+
+   TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+   TaskManagerServices taskManagerServices =
+   
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
tmResourceID);
+
+   TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskStateManager();
+
+   Assert.assertEquals(
+   new File(rootDirString, 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+   taskStateManager.getLocalStateRootDirectory());
+
+   Assert.assertEquals("localState", 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
+   }
+
+   /**
+* This tests that the creation of {@link TaskManagerServices} 
correctly falls back to the first tmp directory of
+* the IOManager as default for the local state root directory.
+*/
+   @Test
+   public void testCreationFromConfigDefault() throws Exception {
+
+   final Configuration config = new Configuration();
+
+   final ResourceID tmResourceID = ResourceID.generate();
+
+   TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+   TaskManagerServices taskManagerServices =
+   
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
tmResourceID);
+
+   TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskStateManager();
+
+   Assert.assertEquals(
+   new 
File(taskManagerServicesConfiguration.getTmpDirPaths()[0], 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+   taskStateManager.getLocalStateRootDirectory());
+   }
+
+   /**
+* This tests that the {@link TaskExecutorLocalStateStoresManager} 
creates {@link TaskLocalStateStore} that have
+* a properly initialized local state base directory.
+*/
+   @Test
+   public void testSubtaskStateStoreDirectoryCreation() throws Exception {
+
+   JobID jobI

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165341653
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import 
org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+
+public class TaskExecutorLocalStateStoresManagerTest {
+
+   /**
+* This tests that the creation of {@link TaskManagerServices} 
correctly creates the local state root directory
+* for the {@link TaskExecutorLocalStateStoresManager} with the 
configured root directory.
+*/
+   @Test
+   public void testCreationFromConfig() throws Exception {
+
+   final Configuration config = new Configuration();
+
+   final String rootDirString = "localStateRoot";
+   
config.setString(ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY, 
rootDirString);
+
+   final ResourceID tmResourceID = ResourceID.generate();
+
+   TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+   TaskManagerServices taskManagerServices =
+   
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
tmResourceID);
+
+   TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskStateManager();
+
+   Assert.assertEquals(
+   new File(rootDirString, 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+   taskStateManager.getLocalStateRootDirectory());
+
+   Assert.assertEquals("localState", 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
--- End diff --

This assertion seems to be redundant.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165341397
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import 
org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+
+public class TaskExecutorLocalStateStoresManagerTest {
--- End diff --

`extends TestLogger`


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348503#comment-16348503
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165341397
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import 
org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+
+public class TaskExecutorLocalStateStoresManagerTest {
--- End diff --

`extends TestLogger`


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348501#comment-16348501
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165341071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -197,7 +201,13 @@ public static TaskManagerServices fromConfiguration(
final JobManagerTable jobManagerTable = new JobManagerTable();
 
final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
-   final TaskExecutorLocalStateStoresManager taskStateManager = 
new TaskExecutorLocalStateStoresManager();
+
+   final File taskExecutorLocalStateRootDir =
+   new 
File(taskManagerServicesConfiguration.getLocalStateRootDir(), 
LOCAL_STATE_SUB_DIRECTORY_ROOT);
--- End diff --

Are we giving the `TaskExecutorLocalStateStoresManager` a unique directory? 
What happens if multiple TMs run on the same machine? Will they use different 
directories?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165341071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -197,7 +201,13 @@ public static TaskManagerServices fromConfiguration(
final JobManagerTable jobManagerTable = new JobManagerTable();
 
final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
-   final TaskExecutorLocalStateStoresManager taskStateManager = 
new TaskExecutorLocalStateStoresManager();
+
+   final File taskExecutorLocalStateRootDir =
+   new 
File(taskManagerServicesConfiguration.getLocalStateRootDir(), 
LOCAL_STATE_SUB_DIRECTORY_ROOT);
--- End diff --

Are we giving the `TaskExecutorLocalStateStoresManager` a unique directory? 
What happens if multiple TMs run on the same machine? Will they use different 
directories?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348495#comment-16348495
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165340438
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -501,4 +529,53 @@ public String toString() {
"', asynchronous: " + asynchronousSnapshots +
", fileStateThreshold: " + fileStateThreshold + 
")";
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED, ENABLE_HEAP_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
+   this.localRecoveryMode = 
Preconditions.checkNotNull(localRecoveryMode);
+   this.localStateDirectory = localStateDirectory;
+   if 
(LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && 
localStateDirectory == null) {
+   throw new IllegalStateException("Local state 
directory must be specified if local recovery mode is " +
+   LocalRecoveryMode.ENABLE_FILE_BASED);
+   }
+   }
+
+   public LocalRecoveryMode getLocalRecoveryMode() {
+   return localRecoveryMode;
+   }
+
+   public File getLocalStateDirectory() {
+   return localStateDirectory;
--- End diff --

How does this play together with `ENABLE_HEAP_BASED` `LocalRecoveryMode`?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165340438
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -501,4 +529,53 @@ public String toString() {
"', asynchronous: " + asynchronousSnapshots +
", fileStateThreshold: " + fileStateThreshold + 
")";
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED, ENABLE_HEAP_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
+   this.localRecoveryMode = 
Preconditions.checkNotNull(localRecoveryMode);
+   this.localStateDirectory = localStateDirectory;
+   if 
(LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && 
localStateDirectory == null) {
+   throw new IllegalStateException("Local state 
directory must be specified if local recovery mode is " +
+   LocalRecoveryMode.ENABLE_FILE_BASED);
+   }
+   }
+
+   public LocalRecoveryMode getLocalRecoveryMode() {
+   return localRecoveryMode;
+   }
+
+   public File getLocalStateDirectory() {
+   return localStateDirectory;
--- End diff --

How does this play together with `ENABLE_HEAP_BASED` `LocalRecoveryMode`?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348494#comment-16348494
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165340246
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 ---
@@ -60,4 +62,9 @@ void reportTaskStateSnapshots(
 * @return previous state for the operator. Null if no previous state 
exists.
 */
OperatorSubtaskState operatorStates(OperatorID operatorID);
+
+   /**
+* Returns the base directory for all file-based local state of the 
owning subtask.
+*/
+   File getSubtaskLocalStateBaseDirectory();
--- End diff --

I guess there will be other non file-based local states, right? What would 
this method return for these state objects?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165340246
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 ---
@@ -60,4 +62,9 @@ void reportTaskStateSnapshots(
 * @return previous state for the operator. Null if no previous state 
exists.
 */
OperatorSubtaskState operatorStates(OperatorID operatorID);
+
+   /**
+* Returns the base directory for all file-based local state of the 
owning subtask.
+*/
+   File getSubtaskLocalStateBaseDirectory();
--- End diff --

I guess there will be other non file-based local states, right? What would 
this method return for these state objects?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348492#comment-16348492
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165340082
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -46,26 +52,63 @@
/** */
private final int subtaskIndex;
 
+   /** */
+   private final Map 
storedTaskStateByCheckpointID;
+
+   /** This is the base directory for all local state of the subtask that 
owns this {@link TaskLocalStateStore}. */
+   private final File subtaskLocalStateBaseDirectory;
+
public TaskLocalStateStore(
JobID jobID,
JobVertexID jobVertexID,
-   int subtaskIndex) {
+   int subtaskIndex,
+   File localStateRootDir) {
 
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
+   this.storedTaskStateByCheckpointID = new HashMap<>();
+   this.subtaskLocalStateBaseDirectory =
+   new File(localStateRootDir, createSubtaskPath(jobID, 
jobVertexID, subtaskIndex));
+   }
+
+   static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, 
int subtaskIndex) {
+   return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + 
subtaskIndex;
}
 
public void storeLocalState(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nullable TaskStateSnapshot localState) {
 
-   if (localState != null) {
-   throw new UnsupportedOperationException("Implement this 
before actually providing local state!");
+   TaskStateSnapshot previous =
+   
storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), 
localState);
+
+   if (previous != null) {
+   throw new IllegalStateException("Found previously 
registered local state for checkpoint with id " +
+   checkpointMetaData.getCheckpointId() + "! This 
indicated a problem.");
}
}
 
-   public void dispose() {
-   //TODO
+   public void dispose() throws Exception {
+
+   Exception collectedException = null;
+
+   for (TaskStateSnapshot snapshot : 
storedTaskStateByCheckpointID.values()) {
+   try {
+   snapshot.discardState();
+   } catch (Exception discardEx) {
+   collectedException = 
ExceptionUtils.firstOrSuppressed(discardEx, collectedException);
+   }
+   }
+
+   if (collectedException != null) {
+   throw collectedException;
+   }
+
+   
FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory);
--- End diff --

Is there a way to retry the non discarded state handles based on this 
directory? If not, then we could delete it also in case of a failure.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165340082
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -46,26 +52,63 @@
/** */
private final int subtaskIndex;
 
+   /** */
+   private final Map 
storedTaskStateByCheckpointID;
+
+   /** This is the base directory for all local state of the subtask that 
owns this {@link TaskLocalStateStore}. */
+   private final File subtaskLocalStateBaseDirectory;
+
public TaskLocalStateStore(
JobID jobID,
JobVertexID jobVertexID,
-   int subtaskIndex) {
+   int subtaskIndex,
+   File localStateRootDir) {
 
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
+   this.storedTaskStateByCheckpointID = new HashMap<>();
+   this.subtaskLocalStateBaseDirectory =
+   new File(localStateRootDir, createSubtaskPath(jobID, 
jobVertexID, subtaskIndex));
+   }
+
+   static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, 
int subtaskIndex) {
+   return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + 
subtaskIndex;
}
 
public void storeLocalState(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nullable TaskStateSnapshot localState) {
 
-   if (localState != null) {
-   throw new UnsupportedOperationException("Implement this 
before actually providing local state!");
+   TaskStateSnapshot previous =
+   
storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), 
localState);
+
+   if (previous != null) {
+   throw new IllegalStateException("Found previously 
registered local state for checkpoint with id " +
+   checkpointMetaData.getCheckpointId() + "! This 
indicated a problem.");
}
}
 
-   public void dispose() {
-   //TODO
+   public void dispose() throws Exception {
+
+   Exception collectedException = null;
+
+   for (TaskStateSnapshot snapshot : 
storedTaskStateByCheckpointID.values()) {
+   try {
+   snapshot.discardState();
+   } catch (Exception discardEx) {
+   collectedException = 
ExceptionUtils.firstOrSuppressed(discardEx, collectedException);
+   }
+   }
+
+   if (collectedException != null) {
+   throw collectedException;
+   }
+
+   
FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory);
--- End diff --

Is there a way to retry the non discarded state handles based on this 
directory? If not, then we could delete it also in case of a failure.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348467#comment-16348467
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165332129
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -316,6 +316,11 @@
 */
public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = 
"taskmanager.refused-registration-pause";
 
+   /**
+* The config parameter defining the root directories for storing 
file-based state for local recovery.
+*/
+   public static final String TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY = 
"taskmanager.local-state-root.dir";
--- End diff --

Using `ConfigConstants` is discouraged. Please introduce a `ConfigOption` 
if not already done in a later commit.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165332129
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -316,6 +316,11 @@
 */
public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = 
"taskmanager.refused-registration-pause";
 
+   /**
+* The config parameter defining the root directories for storing 
file-based state for local recovery.
+*/
+   public static final String TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY = 
"taskmanager.local-state-root.dir";
--- End diff --

Using `ConfigConstants` is discouraged. Please introduce a `ConfigOption` 
if not already done in a later commit.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348463#comment-16348463
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165331645
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception {
initField.setAccessible(true);
initField.setBoolean(null, false);
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
+   this.localRecoveryMode = 
Preconditions.checkNotNull(localRecoveryMode);
+   this.localStateDirectory = localStateDirectory;
+   if 
(LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && 
localStateDirectory == null) {
+   throw new IllegalStateException("Local state 
directory must be specified if local recovery mode is " +
+   LocalRecoveryMode.ENABLE_FILE_BASED);
+   }
+   }
+
+   public LocalRecoveryMode getLocalRecoveryMode() {
+   return localRecoveryMode;
+   }
+
+   public File getLocalStateDirectory() {
--- End diff --

@Nullable


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348461#comment-16348461
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165331598
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception {
initField.setAccessible(true);
initField.setBoolean(null, false);
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
--- End diff --

@Nullable annotation missing


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165331598
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception {
initField.setAccessible(true);
initField.setBoolean(null, false);
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
--- End diff --

@Nullable annotation missing


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165331645
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception {
initField.setAccessible(true);
initField.setBoolean(null, false);
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
+   this.localRecoveryMode = 
Preconditions.checkNotNull(localRecoveryMode);
+   this.localStateDirectory = localStateDirectory;
+   if 
(LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && 
localStateDirectory == null) {
+   throw new IllegalStateException("Local state 
directory must be specified if local recovery mode is " +
+   LocalRecoveryMode.ENABLE_FILE_BASED);
+   }
+   }
+
+   public LocalRecoveryMode getLocalRecoveryMode() {
+   return localRecoveryMode;
+   }
+
+   public File getLocalStateDirectory() {
--- End diff --

@Nullable


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348444#comment-16348444
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165329478
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -104,10 +107,8 @@
@Nullable
private OptionsFactory optionsFactory;
 
-   /** True if incremental checkpointing is enabled.
-* Null if not yet set, in which case the configuration values will be 
used. */
-   @Nullable
-   private Boolean enableIncrementalCheckpointing;
+   /** True if incremental checkpointing is enabled. */
+   private TernaryBoolean enableIncrementalCheckpointing;
--- End diff --

what about renaming it to `isIncrementalCheckpointingEnabled`. This makes 
it clearer that this is a boolean.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348442#comment-16348442
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165329369
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -104,10 +107,8 @@
@Nullable
private OptionsFactory optionsFactory;
 
-   /** True if incremental checkpointing is enabled.
-* Null if not yet set, in which case the configuration values will be 
used. */
-   @Nullable
-   private Boolean enableIncrementalCheckpointing;
+   /** True if incremental checkpointing is enabled. */
--- End diff --

Please update comment.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165329478
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -104,10 +107,8 @@
@Nullable
private OptionsFactory optionsFactory;
 
-   /** True if incremental checkpointing is enabled.
-* Null if not yet set, in which case the configuration values will be 
used. */
-   @Nullable
-   private Boolean enableIncrementalCheckpointing;
+   /** True if incremental checkpointing is enabled. */
+   private TernaryBoolean enableIncrementalCheckpointing;
--- End diff --

what about renaming it to `isIncrementalCheckpointingEnabled`. This makes 
it clearer that this is a boolean.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165329369
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -104,10 +107,8 @@
@Nullable
private OptionsFactory optionsFactory;
 
-   /** True if incremental checkpointing is enabled.
-* Null if not yet set, in which case the configuration values will be 
used. */
-   @Nullable
-   private Boolean enableIncrementalCheckpointing;
+   /** True if incremental checkpointing is enabled. */
--- End diff --

Please update comment.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348424#comment-16348424
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165326694
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.TaskLocalStateStore;
+import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for forwarding of state reporting to and from {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+ */
+public class LocalStateForwardingTest {
+
+   /**
+* This tests the forwarding of jm and tm-local state from the futures 
reported by the backends, through the
+* async checkpointing thread to the {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+*/
+   @Test
+   public void testForwardingFromSnapshotToTaskStateManager() throws 
Exception {
+
+   TestTaskStateManager taskStateManager = new 
TestTaskStateManager();
+
+   StreamMockEnvironment streamMockEnvironment = new 
StreamMockEnvironment(
+   new Configuration(),
+   new Configuration(),
+   new ExecutionConfig(),
+   1024*1024,
+   new MockInputSplitProvider(),
+   0,
+   taskStateManager);
+
+   StreamTask testStreamTask = new 
StreamTaskTest.NoOpStreamTask(streamMockEnvironment);
+   CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(0L, 0L);
+   CheckpointMetrics checkpointMetrics = new CheckpointMetrics();
+
+   Map snapshots = new 
HashMap<>(1);
+   OperatorSnapshotFutures osFuture = new 
OperatorSnapshotFutures();
+
+   
osFuture.setKeyedStateManagedFuture(createSnapshotResult(KeyedStateHandle.class));
+   
osFuture.setKeyedStateRawFuture(createSnapshotResult(KeyedStateHandle.class));
+   
osFuture.setOperatorSta

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165326694
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.TaskLocalStateStore;
+import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for forwarding of state reporting to and from {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+ */
+public class LocalStateForwardingTest {
+
+   /**
+* This tests the forwarding of jm and tm-local state from the futures 
reported by the backends, through the
+* async checkpointing thread to the {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+*/
+   @Test
+   public void testForwardingFromSnapshotToTaskStateManager() throws 
Exception {
+
+   TestTaskStateManager taskStateManager = new 
TestTaskStateManager();
+
+   StreamMockEnvironment streamMockEnvironment = new 
StreamMockEnvironment(
+   new Configuration(),
+   new Configuration(),
+   new ExecutionConfig(),
+   1024*1024,
+   new MockInputSplitProvider(),
+   0,
+   taskStateManager);
+
+   StreamTask testStreamTask = new 
StreamTaskTest.NoOpStreamTask(streamMockEnvironment);
+   CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(0L, 0L);
+   CheckpointMetrics checkpointMetrics = new CheckpointMetrics();
+
+   Map snapshots = new 
HashMap<>(1);
+   OperatorSnapshotFutures osFuture = new 
OperatorSnapshotFutures();
+
+   
osFuture.setKeyedStateManagedFuture(createSnapshotResult(KeyedStateHandle.class));
+   
osFuture.setKeyedStateRawFuture(createSnapshotResult(KeyedStateHandle.class));
+   
osFuture.setOperatorStateManagedFuture(createSnapshotResult(OperatorStateHandle.class));
+   
osFuture.setOperatorStateRawFuture(createSnapshotResult(OperatorStateHandle.class));
+
+   OperatorID operatorID = new OperatorID();
+   snapshot

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348422#comment-16348422
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165325793
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.mockito.Mockito;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Helper class with a method that attempt to automatically test method 
forwarding between a delegate and a wrapper.
+ */
+public class MethodForwardingTestUtil {
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. This ignores methods that are 
inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory) {
+   testMethodForwarding(delegateClass, wrapperFactory, 
Collections.emptySet());
+   }
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. Methods can be remapped in case 
that the implementation does not call the
+* original method. Remapping to null skips the method. This ignores 
methods that are inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param skipMethodSet set of methods to ignore.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory,
+   Set skipMethodSet) {
+
+   Preconditions.checkNotNull(delegateClass);
+   Preconditions.checkNotNull(wrapperFactory);
+   Preconditions.checkNotNull(skipMethodSet);
+
+   D delegate = spy(delegateClass);
+   W wrapper = wrapperFactory.apply(delegate);
+
+   // ensure that wrapper is a subtype of delegate
+   
Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass()));
+
+   for (Method delegateMethod : delegateClass.getMethods()) {
+
+   if (checkSkipMethodForwardCheck(delegateMethod, 
skipMethodSet)) {
+   continue;
+   }
+
+   try {
+   // find the correct method to substitute the 
bridge for erased generic types.
+   // if this doesn't work, the user need to 
exclude the method and write an additional test.
+   Method wrapperMethod = 
wrapper.getClass().getDeclaredMethod(
+   delegateMethod.getName(),
+   delegateMethod.getParameterTypes());
+
+   // things get a bit fuzzy here, best effort to 
find a match but this might end up with a wrong method.
+   if (wrapperMethod.isBridge()) {

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165325793
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.mockito.Mockito;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Helper class with a method that attempt to automatically test method 
forwarding between a delegate and a wrapper.
+ */
+public class MethodForwardingTestUtil {
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. This ignores methods that are 
inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory) {
+   testMethodForwarding(delegateClass, wrapperFactory, 
Collections.emptySet());
+   }
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. Methods can be remapped in case 
that the implementation does not call the
+* original method. Remapping to null skips the method. This ignores 
methods that are inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param skipMethodSet set of methods to ignore.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory,
+   Set skipMethodSet) {
+
+   Preconditions.checkNotNull(delegateClass);
+   Preconditions.checkNotNull(wrapperFactory);
+   Preconditions.checkNotNull(skipMethodSet);
+
+   D delegate = spy(delegateClass);
+   W wrapper = wrapperFactory.apply(delegate);
+
+   // ensure that wrapper is a subtype of delegate
+   
Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass()));
+
+   for (Method delegateMethod : delegateClass.getMethods()) {
+
+   if (checkSkipMethodForwardCheck(delegateMethod, 
skipMethodSet)) {
+   continue;
+   }
+
+   try {
+   // find the correct method to substitute the 
bridge for erased generic types.
+   // if this doesn't work, the user need to 
exclude the method and write an additional test.
+   Method wrapperMethod = 
wrapper.getClass().getDeclaredMethod(
+   delegateMethod.getName(),
+   delegateMethod.getParameterTypes());
+
+   // things get a bit fuzzy here, best effort to 
find a match but this might end up with a wrong method.
+   if (wrapperMethod.isBridge()) {
+   for (Method method : 
wrapper.getClass().getDeclaredMethods()) {
+   if (!method.isBridge()
+   && 
method.getName().equals(

[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348414#comment-16348414
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Good point. An ugly workaround would be to store a timestamp when the 
ending number is being set on a shard, and provide a configurable/sufficiently 
enough (eg. 7 days) window. It would exclude the dependency on the Kinesis API.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Good point. An ugly workaround would be to store a timestamp when the 
ending number is being set on a shard, and provide a configurable/sufficiently 
enough (eg. 7 days) window. It would exclude the dependency on the Kinesis API.


---


[jira] [Commented] (FLINK-8242) ClassCastException in OrcTableSource.toOrcPredicate

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348411#comment-16348411
 ] 

ASF GitHub Bot commented on FLINK-8242:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5345
  
Thank you @fhueske. The code looks good now. I will merge this...


> ClassCastException in OrcTableSource.toOrcPredicate
> ---
>
> Key: FLINK-8242
> URL: https://issues.apache.org/jira/browse/FLINK-8242
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{OrcTableSource}} tries to cast all predicate literals to 
> {{Serializable}} in its {{toOrcPredicate()}} method. This fails with a 
> {{ClassCastException}} if a literal is not serializable.
> Instead of failing, we should ignore the predicate and log a WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5345: [FLINK-8242] [orc] Fix predicate push-down of OrcTableSou...

2018-02-01 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5345
  
Thank you @fhueske. The code looks good now. I will merge this...


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348408#comment-16348408
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165323982
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.TaskLocalStateStore;
+import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for forwarding of state reporting to and from {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+ */
+public class LocalStateForwardingTest {
--- End diff --

Same here with `TestLogger`.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348405#comment-16348405
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165323575
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.mockito.Mockito;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Helper class with a method that attempt to automatically test method 
forwarding between a delegate and a wrapper.
+ */
+public class MethodForwardingTestUtil {
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. This ignores methods that are 
inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory) {
+   testMethodForwarding(delegateClass, wrapperFactory, 
Collections.emptySet());
+   }
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. Methods can be remapped in case 
that the implementation does not call the
+* original method. Remapping to null skips the method. This ignores 
methods that are inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param skipMethodSet set of methods to ignore.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory,
+   Set skipMethodSet) {
+
+   Preconditions.checkNotNull(delegateClass);
+   Preconditions.checkNotNull(wrapperFactory);
+   Preconditions.checkNotNull(skipMethodSet);
+
+   D delegate = spy(delegateClass);
+   W wrapper = wrapperFactory.apply(delegate);
+
+   // ensure that wrapper is a subtype of delegate
+   
Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass()));
+
+   for (Method delegateMethod : delegateClass.getMethods()) {
+
+   if (checkSkipMethodForwardCheck(delegateMethod, 
skipMethodSet)) {
+   continue;
+   }
+
+   try {
+   // find the correct method to substitute the 
bridge for erased generic types.
+   // if this doesn't work, the user need to 
exclude the method and write an additional test.
+   Method wrapperMethod = 
wrapper.getClass().getDeclaredMethod(
+   delegateMethod.getName(),
+   delegateMethod.getParameterTypes());
+
+   // things get a bit fuzzy here, best effort to 
find a match but this might end up with a wrong method.
+   if (wrapperMethod.isBridge()) {

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165323982
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.TaskLocalStateStore;
+import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for forwarding of state reporting to and from {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+ */
+public class LocalStateForwardingTest {
--- End diff --

Same here with `TestLogger`.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165323823
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateObjectCollectionTest.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.MethodForwardingTestUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link StateObjectCollection}.
+ */
+public class StateObjectCollectionTest {
--- End diff --

should extend `TestLogger`.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348406#comment-16348406
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165323823
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateObjectCollectionTest.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.MethodForwardingTestUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link StateObjectCollection}.
+ */
+public class StateObjectCollectionTest {
--- End diff --

should extend `TestLogger`.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165323575
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.mockito.Mockito;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Helper class with a method that attempt to automatically test method 
forwarding between a delegate and a wrapper.
+ */
+public class MethodForwardingTestUtil {
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. This ignores methods that are 
inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory) {
+   testMethodForwarding(delegateClass, wrapperFactory, 
Collections.emptySet());
+   }
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. Methods can be remapped in case 
that the implementation does not call the
+* original method. Remapping to null skips the method. This ignores 
methods that are inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param skipMethodSet set of methods to ignore.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory,
+   Set skipMethodSet) {
+
+   Preconditions.checkNotNull(delegateClass);
+   Preconditions.checkNotNull(wrapperFactory);
+   Preconditions.checkNotNull(skipMethodSet);
+
+   D delegate = spy(delegateClass);
+   W wrapper = wrapperFactory.apply(delegate);
+
+   // ensure that wrapper is a subtype of delegate
+   
Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass()));
+
+   for (Method delegateMethod : delegateClass.getMethods()) {
+
+   if (checkSkipMethodForwardCheck(delegateMethod, 
skipMethodSet)) {
+   continue;
+   }
+
+   try {
+   // find the correct method to substitute the 
bridge for erased generic types.
+   // if this doesn't work, the user need to 
exclude the method and write an additional test.
+   Method wrapperMethod = 
wrapper.getClass().getDeclaredMethod(
+   delegateMethod.getName(),
+   delegateMethod.getParameterTypes());
+
+   // things get a bit fuzzy here, best effort to 
find a match but this might end up with a wrong method.
+   if (wrapperMethod.isBridge()) {
+   for (Method method : 
wrapper.getClass().getDeclaredMethods()) {
+   if (!method.isBridge()
+   && 
method.getName().equals(

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348399#comment-16348399
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165322528
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.mockito.Mockito;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Helper class with a method that attempt to automatically test method 
forwarding between a delegate and a wrapper.
--- End diff --

"attempts" or "with methods"


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165322528
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.mockito.Mockito;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Helper class with a method that attempt to automatically test method 
forwarding between a delegate and a wrapper.
--- End diff --

"attempts" or "with methods"


---


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348395#comment-16348395
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
Here it is: https://issues.apache.org/jira/browse/FLINK-8542


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348392#comment-16348392
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165321013
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 ---
@@ -67,8 +69,11 @@ public TestTaskStateManager(
this.jobId = jobId;
this.executionAttemptID = executionAttemptID;
this.checkpointResponder = checkpointResponder;
-   this.taskStateSnapshotsByCheckpointId = new HashMap<>();
+   this.jobManagerTaskStateSnapshotsByCheckpointId = new 
HashMap<>();
+   this.taskManagerTaskStateSnapshotsByCheckpointId = new 
HashMap<>();
this.reportedCheckpointId = -1L;
+   this.subtaskLocalStateBaseDirectory =
+   new File(System.getProperty("java.io.tmpdir"), 
"testLocalState_" + UUID.randomUUID());
--- End diff --

Shouldn't this come from something like a `TemporaryFolder` such that the 
directory get's cleaned up in all cases. Consequently, maybe we should pass in 
the `subtaskLocalStateBaseDirectory` from the test using this class.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
Here it is: https://issues.apache.org/jira/browse/FLINK-8542


---


[jira] [Created] (FLINK-8542) Do not indefinitely store closed shard's state in the FlinkKinesisConsumer

2018-02-01 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8542:
--

 Summary: Do not indefinitely store closed shard's state in the 
FlinkKinesisConsumer
 Key: FLINK-8542
 URL: https://issues.apache.org/jira/browse/FLINK-8542
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai


See original discussion here: 
[https://github.com/apache/flink/pull/5337|https://github.com/apache/flink/pull/5337#issuecomment-362227711]

Currently, the Kinesis consumer keeps a list of {{(StreamShardMetadata, 
SequenceNumber)}} as its state. That list also contains all shards that have 
been closed already, and is kept in the state indefinitely so that on restore, 
we know that a closed shard is already fully consumed,

The downside of this, is that the state size of the Kinesis consumer can 
basically grow without bounds, as the consumed Kinesis streams are resharded 
and more and more closed shards are present.

Some possible solutions have been discussed in the linked PR comments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165321013
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 ---
@@ -67,8 +69,11 @@ public TestTaskStateManager(
this.jobId = jobId;
this.executionAttemptID = executionAttemptID;
this.checkpointResponder = checkpointResponder;
-   this.taskStateSnapshotsByCheckpointId = new HashMap<>();
+   this.jobManagerTaskStateSnapshotsByCheckpointId = new 
HashMap<>();
+   this.taskManagerTaskStateSnapshotsByCheckpointId = new 
HashMap<>();
this.reportedCheckpointId = -1L;
+   this.subtaskLocalStateBaseDirectory =
+   new File(System.getProperty("java.io.tmpdir"), 
"testLocalState_" + UUID.randomUUID());
--- End diff --

Shouldn't this come from something like a `TemporaryFolder` such that the 
directory get's cleaned up in all cases. Consequently, maybe we should pass in 
the `subtaskLocalStateBaseDirectory` from the test using this class.


---


[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348383#comment-16348383
 ] 

ASF GitHub Bot commented on FLINK-5820:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165317663
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 ---
@@ -19,58 +19,53 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Unit tests for the {@link CompletedCheckpoint}.
+ */
 public class CompletedCheckpointTest {
 
@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-   /**
-* Tests that persistent checkpoints discard their header file.
-*/
@Test
-   public void testDiscard() throws Exception {
-   File file = tmpFolder.newFile();
-   assertEquals(true, file.exists());
-
+   public void registerStatesAtRegistry() {
--- End diff --

What's the reason for this change?


> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (trans

[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348384#comment-16348384
 ] 

ASF GitHub Bot commented on FLINK-5820:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165295667
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,45 +18,50 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * The configuration of a checkpoint, such as whether
+ * The configuration of a checkpoint. This described whether
--- End diff --

nit: type


> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 
> the storage directories as follows:
> {code}
> ..//job1-id/
>   /shared/<-- shared checkpoint data
>   /chk-1/...  <-- data exclusive to checkpoint 1
>   /chk-2/...  <-- data exclusive to checkpoint 2
>   /chk-3/...  <-- data exclusive to checkpoint 3
> ..//job2-id/
>   /shared/...
>   /chk-1/...
>   /chk-2/...
>   /chk-3/...
> ..//savepoint-1/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
>  /savepoint-2/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348381#comment-16348381
 ] 

ASF GitHub Bot commented on FLINK-5820:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165295768
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,45 +18,50 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * The configuration of a checkpoint, such as whether
+ * The configuration of a checkpoint. This described whether
  * 
- * The checkpoint should be persisted
- * The checkpoint must be full, or may be incremental (not yet 
implemented)
- * The checkpoint format must be the common (cross backend) format,
- * or may be state-backend specific (not yet implemented)
- * when the checkpoint should be garbage collected
+ * The checkpoint is s regular checkpoint or a savepoint
+ * When the checkpoint should be garbage collected
  * 
  */
 public class CheckpointProperties implements Serializable {
 
-   private static final long serialVersionUID = -8835900655844879470L;
+   private static final long serialVersionUID = 2L;
 
-   private final boolean forced;
+   /** Type - checkpoit / savepoint. */
--- End diff --

nit: typo


> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 
> the storage directories as follows:
> {code}
> ..//job1-id/
>   /shared/<-- shared checkpoint data
>   /chk-1/...  <-- data exclusive to checkpoint 1
>   /chk-2/...  <-- data exclusive to checkpoint 2
>   /chk-3/...  <-- data exclusive to checkpoint 3
> ..//job2-id/
>   /shared/...
>   /chk-1/...
>   /chk-2/...
>   /chk-3/...
> ..

[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165295667
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,45 +18,50 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * The configuration of a checkpoint, such as whether
+ * The configuration of a checkpoint. This described whether
--- End diff --

nit: type


---


[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348382#comment-16348382
 ] 

ASF GitHub Bot commented on FLINK-5820:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165318529
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 ---
@@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws 
Exception {
assertTrue(isDirectoryEmpty(directory));
}
 
+   // 

+   //  Not deleting parent directories
+   // 

+
+   /**
+* This test checks that the stream does not check and clean the parent 
directory
+* when encountering a write error.
+*/
+   @Test
+   public void testStreamDoesNotTryToCleanUpParentOnError() throws 
Exception {
+   final File directory = tempDir.newFolder();
+
+   // prevent creation of files in that directory
+   assertTrue(directory.setWritable(false, true));
+   checkDirectoryNotWritable(directory);
+
+   FileSystem fs = spy(FileSystem.getLocalFileSystem());
+
+   FsCheckpointStateOutputStream stream1 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   FsCheckpointStateOutputStream stream2 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   stream1.write(new byte[61]);
+   stream2.write(new byte[61]);
+
+   try {
+   stream1.closeAndGetHandle();
+   fail("this should fail with an exception");
+   } catch (IOException ignored) {}
+
+   stream2.close();
+
+   verify(fs, times(0)).delete(any(Path.class), anyBoolean());
--- End diff --

nit: This seems somewhat brittle because there could be another "delete" 
method that the handle uses to delete the parent dir. For "future proof-ness"...


> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpoi

[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165317663
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 ---
@@ -19,58 +19,53 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Unit tests for the {@link CompletedCheckpoint}.
+ */
 public class CompletedCheckpointTest {
 
@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-   /**
-* Tests that persistent checkpoints discard their header file.
-*/
@Test
-   public void testDiscard() throws Exception {
-   File file = tmpFolder.newFile();
-   assertEquals(true, file.exists());
-
+   public void registerStatesAtRegistry() {
--- End diff --

What's the reason for this change?


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165318529
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 ---
@@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws 
Exception {
assertTrue(isDirectoryEmpty(directory));
}
 
+   // 

+   //  Not deleting parent directories
+   // 

+
+   /**
+* This test checks that the stream does not check and clean the parent 
directory
+* when encountering a write error.
+*/
+   @Test
+   public void testStreamDoesNotTryToCleanUpParentOnError() throws 
Exception {
+   final File directory = tempDir.newFolder();
+
+   // prevent creation of files in that directory
+   assertTrue(directory.setWritable(false, true));
+   checkDirectoryNotWritable(directory);
+
+   FileSystem fs = spy(FileSystem.getLocalFileSystem());
+
+   FsCheckpointStateOutputStream stream1 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   FsCheckpointStateOutputStream stream2 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   stream1.write(new byte[61]);
+   stream2.write(new byte[61]);
+
+   try {
+   stream1.closeAndGetHandle();
+   fail("this should fail with an exception");
+   } catch (IOException ignored) {}
+
+   stream2.close();
+
+   verify(fs, times(0)).delete(any(Path.class), anyBoolean());
--- End diff --

nit: This seems somewhat brittle because there could be another "delete" 
method that the handle uses to delete the parent dir. For "future proof-ness"...


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165295768
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,45 +18,50 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * The configuration of a checkpoint, such as whether
+ * The configuration of a checkpoint. This described whether
  * 
- * The checkpoint should be persisted
- * The checkpoint must be full, or may be incremental (not yet 
implemented)
- * The checkpoint format must be the common (cross backend) format,
- * or may be state-backend specific (not yet implemented)
- * when the checkpoint should be garbage collected
+ * The checkpoint is s regular checkpoint or a savepoint
+ * When the checkpoint should be garbage collected
  * 
  */
 public class CheckpointProperties implements Serializable {
 
-   private static final long serialVersionUID = -8835900655844879470L;
+   private static final long serialVersionUID = 2L;
 
-   private final boolean forced;
+   /** Type - checkpoit / savepoint. */
--- End diff --

nit: typo


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348376#comment-16348376
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165318377
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 ---
@@ -227,21 +229,13 @@ protected void cleanup() throws Exception {
// has been stopped
CLEANUP_LATCH.trigger();
 
-   // wait until handle async exception has been called to 
proceed with the termination of the
-   // StreamTask
-   HANDLE_ASYNC_EXCEPTION_LATCH.await();
+   // wait until all async checkpoint threads are 
terminated, so that no more exceptions can be reported
+   
Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, 
TimeUnit.SECONDS));
--- End diff --

Where do we trigger the shut down of the `getAsyncOperationsThreadpool`?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348375#comment-16348375
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165318246
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 ---
@@ -227,21 +229,13 @@ protected void cleanup() throws Exception {
// has been stopped
CLEANUP_LATCH.trigger();
 
-   // wait until handle async exception has been called to 
proceed with the termination of the
-   // StreamTask
-   HANDLE_ASYNC_EXCEPTION_LATCH.await();
--- End diff --

This handle should be deleted since it is no longer needed.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165318377
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 ---
@@ -227,21 +229,13 @@ protected void cleanup() throws Exception {
// has been stopped
CLEANUP_LATCH.trigger();
 
-   // wait until handle async exception has been called to 
proceed with the termination of the
-   // StreamTask
-   HANDLE_ASYNC_EXCEPTION_LATCH.await();
+   // wait until all async checkpoint threads are 
terminated, so that no more exceptions can be reported
+   
Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, 
TimeUnit.SECONDS));
--- End diff --

Where do we trigger the shut down of the `getAsyncOperationsThreadpool`?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165318246
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 ---
@@ -227,21 +229,13 @@ protected void cleanup() throws Exception {
// has been stopped
CLEANUP_LATCH.trigger();
 
-   // wait until handle async exception has been called to 
proceed with the termination of the
-   // StreamTask
-   HANDLE_ASYNC_EXCEPTION_LATCH.await();
--- End diff --

This handle should be deleted since it is no longer needed.


---


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348374#comment-16348374
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
@pluppens 
My only concern is that scanning the whole list of shards can be very 
limited to AWS Kinesis's API invoke rate limitations. Also, we would then only 
be cleaning up the state on restore, meaning we would kind of be encouraging 
(in a bad way) Kinesis users to snapshot and restore every once in a while.

I think the best solution for that is probably to use a threshold constant 
as Stephan suggested, but we will need to investigate whether the Kinesis API 
supports enough information to implement this.

I'll open a separate JIRA ticket forr this, so we can properly discuss the 
issues of pruning closed shard states there.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
@pluppens 
My only concern is that scanning the whole list of shards can be very 
limited to AWS Kinesis's API invoke rate limitations. Also, we would then only 
be cleaning up the state on restore, meaning we would kind of be encouraging 
(in a bad way) Kinesis users to snapshot and restore every once in a while.

I think the best solution for that is probably to use a threshold constant 
as Stephan suggested, but we will need to investigate whether the Kinesis API 
supports enough information to implement this.

I'll open a separate JIRA ticket forr this, so we can properly discuss the 
issues of pruning closed shard states there.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348372#comment-16348372
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165317571
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -916,57 +916,78 @@ private void handleExecutionException(Exception e) {

CheckpointingOperation.AsynCheckpointState.COMPLETED,

CheckpointingOperation.AsynCheckpointState.RUNNING);
 
-   try {
-   cleanup();
-   } catch (Exception cleanupException) {
-   e.addSuppressed(cleanupException);
-   }
+   if (asyncCheckpointState.compareAndSet(
+   
CheckpointingOperation.AsynCheckpointState.RUNNING,
+   
CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
--- End diff --

This could be 
`asyncCheckpointState.compareAndSet(currentAsyncCheckpointState, DISCARDED)`. 
Then we could remove the compare and set above this if condition.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165317571
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -916,57 +916,78 @@ private void handleExecutionException(Exception e) {

CheckpointingOperation.AsynCheckpointState.COMPLETED,

CheckpointingOperation.AsynCheckpointState.RUNNING);
 
-   try {
-   cleanup();
-   } catch (Exception cleanupException) {
-   e.addSuppressed(cleanupException);
-   }
+   if (asyncCheckpointState.compareAndSet(
+   
CheckpointingOperation.AsynCheckpointState.RUNNING,
+   
CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
--- End diff --

This could be 
`asyncCheckpointState.compareAndSet(currentAsyncCheckpointState, DISCARDED)`. 
Then we could remove the compare and set above this if condition.


---


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348367#comment-16348367
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Regarding the remark from @StephanEwen: perhaps it would be ok to re-use 
the `KinesisProxy` to return a list of all shards and compare them to the 
`sequenceNumsToRestore` to prune any shards that no longer exist? It would 
delay the restoration, but you'd be sure the state wouldn't grow indefinitely 
(we were looking at around a 1000 closed shards with a 24 hour retention 
period, so 365k per year - that's not going to end well). Another option would 
be to kick off another task periodically to prune them, but that is likely to 
run into race conditions, so doing it at the safe point of restoration would 
make more sense to me.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread pluppens
Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Regarding the remark from @StephanEwen: perhaps it would be ok to re-use 
the `KinesisProxy` to return a list of all shards and compare them to the 
`sequenceNumsToRestore` to prune any shards that no longer exist? It would 
delay the restoration, but you'd be sure the state wouldn't grow indefinitely 
(we were looking at around a 1000 closed shards with a 24 hour retention 
period, so 365k per year - that's not going to end well). Another option would 
be to kick off another task periodically to prune them, but that is likely to 
run into race conditions, so doing it at the safe point of restoration would 
make more sense to me.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348364#comment-16348364
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165315816
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
 ---
@@ -46,20 +47,28 @@ public void testCancelAndCleanup() throws Exception {
operatorSnapshotResult.cancel();
 
KeyedStateHandle keyedManagedStateHandle = 
mock(KeyedStateHandle.class);
-   RunnableFuture keyedStateManagedFuture = 
mock(RunnableFuture.class);
-   
when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
+   RunnableFuture> 
keyedStateManagedFuture = mock(RunnableFuture.class);
--- End diff --

Why not using a `DoneFuture` instead of mocking?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165315816
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
 ---
@@ -46,20 +47,28 @@ public void testCancelAndCleanup() throws Exception {
operatorSnapshotResult.cancel();
 
KeyedStateHandle keyedManagedStateHandle = 
mock(KeyedStateHandle.class);
-   RunnableFuture keyedStateManagedFuture = 
mock(RunnableFuture.class);
-   
when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
+   RunnableFuture> 
keyedStateManagedFuture = mock(RunnableFuture.class);
--- End diff --

Why not using a `DoneFuture` instead of mocking?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348362#comment-16348362
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165315625
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -859,56 +861,77 @@ public void run() {
if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,

CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
 
-   TaskStateSnapshot acknowledgedState = 
hasState ? taskOperatorSubtaskStates : null;
-
-   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
-
-   // we signal stateless tasks by 
reporting null, so that there are no attempts to assign empty state
-   // to stateless tasks on restore. This 
enables simple job modifications that only concern
-   // stateless without the need to assign 
them uids to match their (always empty) states.
-   
taskStateManager.reportTaskStateSnapshot(
-   checkpointMetaData,
-   checkpointMetrics,
-   acknowledgedState);
-
-   LOG.debug("{} - finished asynchronous 
part of checkpoint {}. Asynchronous duration: {} ms",
-   owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
-
-   LOG.trace("{} - reported the following 
states in snapshot for checkpoint {}: {}.",
-   owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedState);
+   reportCompletedSnapshotStates(
+   
jobManagerTaskOperatorSubtaskStates,
+   localTaskOperatorSubtaskStates,
+   asyncDurationMillis);
 
} else {
LOG.debug("{} - asynchronous part of 
checkpoint {} could not be completed because it was closed before.",
owner.getName(),

checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
-   // the state is completed if an exception 
occurred in the acknowledgeCheckpoint call
-   // in order to clean up, we have to set it to 
RUNNING again.
-   asyncCheckpointState.compareAndSet(
-   
CheckpointingOperation.AsynCheckpointState.COMPLETED,
-   
CheckpointingOperation.AsynCheckpointState.RUNNING);
-
-   try {
-   cleanup();
-   } catch (Exception cleanupException) {
-   e.addSuppressed(cleanupException);
-   }
-
-   Exception checkpointException = new Exception(
-   "Could not materialize checkpoint " + 
checkpointId + " for operator " +
-   owner.getName() + '.',
-   e);
-
-   
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
-   checkpointMetaData,
-   checkpointException);
+   handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);

FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
 
+   private void reportCompletedSnapshotStates(
+   TaskStateSnapshot acknowledgedTaskStateSnapshot,
+   TaskStateSnapshot localTaskStateSnapshot,
+   long asyncDurationMillis) {
+
+   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
+
+   boolean 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165315625
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -859,56 +861,77 @@ public void run() {
if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,

CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
 
-   TaskStateSnapshot acknowledgedState = 
hasState ? taskOperatorSubtaskStates : null;
-
-   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
-
-   // we signal stateless tasks by 
reporting null, so that there are no attempts to assign empty state
-   // to stateless tasks on restore. This 
enables simple job modifications that only concern
-   // stateless without the need to assign 
them uids to match their (always empty) states.
-   
taskStateManager.reportTaskStateSnapshot(
-   checkpointMetaData,
-   checkpointMetrics,
-   acknowledgedState);
-
-   LOG.debug("{} - finished asynchronous 
part of checkpoint {}. Asynchronous duration: {} ms",
-   owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
-
-   LOG.trace("{} - reported the following 
states in snapshot for checkpoint {}: {}.",
-   owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedState);
+   reportCompletedSnapshotStates(
+   
jobManagerTaskOperatorSubtaskStates,
+   localTaskOperatorSubtaskStates,
+   asyncDurationMillis);
 
} else {
LOG.debug("{} - asynchronous part of 
checkpoint {} could not be completed because it was closed before.",
owner.getName(),

checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
-   // the state is completed if an exception 
occurred in the acknowledgeCheckpoint call
-   // in order to clean up, we have to set it to 
RUNNING again.
-   asyncCheckpointState.compareAndSet(
-   
CheckpointingOperation.AsynCheckpointState.COMPLETED,
-   
CheckpointingOperation.AsynCheckpointState.RUNNING);
-
-   try {
-   cleanup();
-   } catch (Exception cleanupException) {
-   e.addSuppressed(cleanupException);
-   }
-
-   Exception checkpointException = new Exception(
-   "Could not materialize checkpoint " + 
checkpointId + " for operator " +
-   owner.getName() + '.',
-   e);
-
-   
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
-   checkpointMetaData,
-   checkpointException);
+   handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);

FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
 
+   private void reportCompletedSnapshotStates(
+   TaskStateSnapshot acknowledgedTaskStateSnapshot,
+   TaskStateSnapshot localTaskStateSnapshot,
+   long asyncDurationMillis) {
+
+   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
+
+   boolean hasAckState = 
acknowledgedTaskStateSnapshot.hasState();
+   boolean hasLocalState = 
localTaskStateSnapshot.hasState();
+
+   Preconditions.checkState(hasAckState || !hasLocalState,
+

[jira] [Comment Edited] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-02-01 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343115#comment-16343115
 ] 

yanxiaobin edited comment on FLINK-5479 at 2/1/18 10:30 AM:


I've also met this problem at the moment. This can cause serious 
delays!{color:#00}Is there a better solution to the problem?{color}


was (Author: backlight):
I've also met this problem at the moment.

> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348340#comment-16348340
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165309957
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.FutureUtil;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * TODO write comment.
+ */
+public class OperatorSnapshotFinalizer {
+
+   private final OperatorSubtaskState jobManagerOwnedState;
+   private final OperatorSubtaskState taskLocalState;
+
+   public OperatorSnapshotFinalizer(
+   OperatorSnapshotFutures snapshotFutures) throws 
ExecutionException, InterruptedException {
+
+   SnapshotResult keyedManaged =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
+
+   SnapshotResult keyedRaw =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());
+
+   SnapshotResult operatorManaged =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());
+
+   SnapshotResult operatorRaw =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());
+
+   jobManagerOwnedState = new OperatorSubtaskState(
+   extractJobManagerOwned(operatorManaged),
+   extractJobManagerOwned(operatorRaw),
+   extractJobManagerOwned(keyedManaged),
+   extractJobManagerOwned(keyedRaw)
+   );
+
+   taskLocalState = new OperatorSubtaskState(
+   extractTaskLocal(operatorManaged),
+   extractTaskLocal(operatorRaw),
+   extractTaskLocal(keyedManaged),
+   extractTaskLocal(keyedRaw)
+   );
+   }
+
+   public OperatorSubtaskState getTaskLocalState() {
+   return taskLocalState;
+   }
+
+   public OperatorSubtaskState getJobManagerOwnedState() {
+   return jobManagerOwnedState;
+   }
+
+   private  T 
extractJobManagerOwned(SnapshotResult snapshotResult) {
+   return snapshotResult != null ? 
snapshotResult.getJobManagerOwnedSnapshot() : null;
--- End diff --

We should add `@Nullable` annotation to make sure that this method can 
return a `null` value.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165309957
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.FutureUtil;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * TODO write comment.
+ */
+public class OperatorSnapshotFinalizer {
+
+   private final OperatorSubtaskState jobManagerOwnedState;
+   private final OperatorSubtaskState taskLocalState;
+
+   public OperatorSnapshotFinalizer(
+   OperatorSnapshotFutures snapshotFutures) throws 
ExecutionException, InterruptedException {
+
+   SnapshotResult keyedManaged =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
+
+   SnapshotResult keyedRaw =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());
+
+   SnapshotResult operatorManaged =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());
+
+   SnapshotResult operatorRaw =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());
+
+   jobManagerOwnedState = new OperatorSubtaskState(
+   extractJobManagerOwned(operatorManaged),
+   extractJobManagerOwned(operatorRaw),
+   extractJobManagerOwned(keyedManaged),
+   extractJobManagerOwned(keyedRaw)
+   );
+
+   taskLocalState = new OperatorSubtaskState(
+   extractTaskLocal(operatorManaged),
+   extractTaskLocal(operatorRaw),
+   extractTaskLocal(keyedManaged),
+   extractTaskLocal(keyedRaw)
+   );
+   }
+
+   public OperatorSubtaskState getTaskLocalState() {
+   return taskLocalState;
+   }
+
+   public OperatorSubtaskState getJobManagerOwnedState() {
+   return jobManagerOwnedState;
+   }
+
+   private  T 
extractJobManagerOwned(SnapshotResult snapshotResult) {
+   return snapshotResult != null ? 
snapshotResult.getJobManagerOwnedSnapshot() : null;
--- End diff --

We should add `@Nullable` annotation to make sure that this method can 
return a `null` value.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348339#comment-16348339
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r165309670
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.FutureUtil;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * TODO write comment.
+ */
+public class OperatorSnapshotFinalizer {
+
+   private final OperatorSubtaskState jobManagerOwnedState;
+   private final OperatorSubtaskState taskLocalState;
+
+   public OperatorSnapshotFinalizer(
+   OperatorSnapshotFutures snapshotFutures) throws 
ExecutionException, InterruptedException {
+
+   SnapshotResult keyedManaged =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
+
+   SnapshotResult keyedRaw =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());
+
+   SnapshotResult operatorManaged =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());
+
+   SnapshotResult operatorRaw =
+   
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());
+
+   jobManagerOwnedState = new OperatorSubtaskState(
+   extractJobManagerOwned(operatorManaged),
+   extractJobManagerOwned(operatorRaw),
+   extractJobManagerOwned(keyedManaged),
+   extractJobManagerOwned(keyedRaw)
+   );
+
+   taskLocalState = new OperatorSubtaskState(
+   extractTaskLocal(operatorManaged),
+   extractTaskLocal(operatorRaw),
+   extractTaskLocal(keyedManaged),
+   extractTaskLocal(keyedRaw)
+   );
+   }
+
+   public OperatorSubtaskState getTaskLocalState() {
+   return taskLocalState;
+   }
+
+   public OperatorSubtaskState getJobManagerOwnedState() {
+   return jobManagerOwnedState;
+   }
+
+   private  T 
extractJobManagerOwned(SnapshotResult snapshotResult) {
+   return snapshotResult != null ? 
snapshotResult.getJobManagerOwnedSnapshot() : null;
--- End diff --

Same here with the `null` values.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 

<    1   2   3   >