[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-20 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/2648


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83822468
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 ---
@@ -199,30 +228,56 @@ public Environment getEnvironment() {
}
 
/**
-* Calls
-* {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
 */
public void setup() throws Exception {
operator.setup(mockTask, config, new MockOutput());
setupCalled = true;
}
 
/**
-* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
-* calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
 * if it was not called before.
 */
-   public void open() throws Exception {
+   public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
if (!setupCalled) {
setup();
}
+   operator.initializeState(operatorStateHandles);
+   initializeCalled = true;
+   }
+
+   /**
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
 if it
+* was not called before.
+*/
+   public void open() throws Exception {
+   if (!initializeCalled) {
+   initializeState(null);
+   }
operator.open();
}
 
/**
 *
 */
-   public StreamStateHandle snapshot(long checkpointId, long timestamp) 
throws Exception {
+   public SnapshotInProgressSubtaskState snapshot(long checkpointId, long 
timestamp) throws Exception {
--- End diff --

I think the idea of having a single method is nice, and if there is no 
special reason why we should keep the old signature, I suggest to do it the 
other way around. `OperatorSnapshotResult`is already a container for all 
operator states (except the legacy state that will be removed in the near 
future). Using this removed the need for the multiplexing.

However, `OperatorSnapshotResult` does not contain the legacy state 
anymore, so for the time being, we might return a Tuple2 of both, or some 
special container class which could also strip away the `RunnableFuture` part.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83819873
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class holds all state handles for one operator.
+ */
+public class OperatorStateHandles {
--- End diff --

Agreed about `@Internal`. For the name, however, I think it should somehow 
reflect how this is related to `TaskStateHandles`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83807634
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -38,8 +38,8 @@
private final StreamStateHandle delegateStateHandle;
 
public OperatorStateHandle(
-   StreamStateHandle delegateStateHandle,
-   Map stateNameToPartitionOffsets) {
+   Map stateNameToPartitionOffsets,
--- End diff --

Changing the order here had the purpose of having the same order in 
`OperatorStateHandle` as in `KeyGroupStateHandle`, which was exactly the other 
way around :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83807199
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState(
 
LOG.info("Restoring from latest valid checkpoint: {}.", 
latest);
 
-   for (Map.Entry 
taskGroupStateEntry: latest.getTaskStates().entrySet()) {
-   TaskState taskState = 
taskGroupStateEntry.getValue();
-   ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
-
-   if (executionJobVertex != null) {
-   // check that the number of key groups 
have not changed
-   if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
-   throw new 
IllegalStateException("The maximum parallelism (" +
-   
taskState.getMaxParallelism() + ") with which the latest " +
-   "checkpoint of the 
execution job vertex " + executionJobVertex +
-   " has been taken and 
the current maximum parallelism (" +
-   
executionJobVertex.getMaxParallelism() + ") changed. This " +
-   "is currently not 
supported.");
-   }
-
-
-   int oldParallelism = 
taskState.getParallelism();
-   int newParallelism = 
executionJobVertex.getParallelism();
-   boolean parallelismChanged = 
oldParallelism != newParallelism;
-   boolean hasNonPartitionedState = 
taskState.hasNonPartitionedState();
-
-   if (hasNonPartitionedState && 
parallelismChanged) {
-   throw new 
IllegalStateException("Cannot restore the latest checkpoint because " +
-   "the operator " + 
executionJobVertex.getJobVertexId() + " has non-partitioned " +
-   "state and its 
parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
-   " has parallelism " + 
newParallelism + " whereas the corresponding" +
-   "state object has a 
parallelism of " + oldParallelism);
-   }
-
-   List keyGroupPartitions 
= createKeyGroupPartitions(
-   
executionJobVertex.getMaxParallelism(),
-   newParallelism);
-   
-   // operator chain index -> list of the 
stored partitionables states from all parallel instances
-   @SuppressWarnings("unchecked")
-   List[] 
chainParallelStates =
-   new 
List[taskState.getChainLength()];
-
-   for (int i = 0; i < oldParallelism; 
++i) {
-
-   
ChainedStateHandle partitionableState =
-   
taskState.getPartitionableState(i);
-
-   if (partitionableState != null) 
{
-   for (int j = 0; j < 
partitionableState.getLength(); ++j) {
-   
OperatorStateHandle opParalleState = partitionableState.get(j);
-   if 
(opParalleState != null) {
-   
List opParallelStates =
-   
chainParallelStates[j];
-   if 
(opParallelStates == null) {
-   
opParallelStates = new ArrayList<>();
-   
chainParallelStates[j] = opParallelStates;
-   }
  

[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83680393
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -2132,24 +2132,36 @@ public void 
testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Ex
"non-partitioned state changed.");
}
 
+   @Test
--- End diff --

Very good additions! 👍 😺 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83685681
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 ---
@@ -199,30 +228,56 @@ public Environment getEnvironment() {
}
 
/**
-* Calls
-* {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
 */
public void setup() throws Exception {
operator.setup(mockTask, config, new MockOutput());
setupCalled = true;
}
 
/**
-* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
-* calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
 * if it was not called before.
 */
-   public void open() throws Exception {
+   public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
if (!setupCalled) {
setup();
}
+   operator.initializeState(operatorStateHandles);
+   initializeCalled = true;
+   }
+
+   /**
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
 if it
+* was not called before.
+*/
+   public void open() throws Exception {
+   if (!initializeCalled) {
+   initializeState(null);
+   }
operator.open();
}
 
/**
 *
 */
-   public StreamStateHandle snapshot(long checkpointId, long timestamp) 
throws Exception {
+   public SnapshotInProgressSubtaskState snapshot(long checkpointId, long 
timestamp) throws Exception {
--- End diff --

I think we can keep the old method signature by doing something like this:
```
/**
 * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointStreamFactory)}.
 */
public final StreamStateHandle snapshot(long checkpointId, long 
timestamp) throws Exception {
synchronized (checkpointLock) {
CheckpointStreamFactory.CheckpointStateOutputStream 
outStream = stateBackend.createStreamFactory(
new JobID(),

"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);

if (operator instanceof StreamCheckpointedOperator) {
((StreamCheckpointedOperator) 
operator).snapshotState(
outStream,
checkpointId,
timestamp);
}

RunnableFuture snapshotRunnable = 
operator.snapshotState(
checkpointId,
timestamp,
stateBackend.createStreamFactory(new 
JobID(), "test_op"));

if (snapshotRunnable != null) {
outStream.write(1);
snapshotRunnable.run();
OperatorStateHandle operatorStateHandle = 
snapshotRunnable.get();

InstantiationUtil.serializeObject(outStream, 
operatorStateHandle);
} else {
outStream.write(0);
}

snapshotToStream(checkpointId, timestamp, outStream);

return outStream.closeAndGetHandle();
}
}
```
This multiplexes the results from the different operator snapshotting 
methods into the same stream. The restore method just tweezes out the correct 
items from the stream and hands them to the correct operator methods.

This would let all tests use the same method and we can keep the 
name/signature the same if we evolve the operator/snapshot interfaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83673478
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SnapshotInProgressSubtaskState.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Encapsulates all runnable futures draw snapshots for a single subtask 
state of an in-flight checkpointing operation.
+ */
+public class SnapshotInProgressSubtaskState {
--- End diff --

I think this could be changed to
```
/**
 * Result of {@link AbstractStreamOperator#snapshotState}.
 */
public class OperatorSnapshotResult { ... }
```

to make it more clearer what it is supposed to be. And it should probably 
be in the same module/package as `AbstractStreamOperator` but the code layout 
of the state classes seems a bit messy so not sure if it's possible.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83682543
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -238,11 +294,51 @@ public void dispose() throws Exception {
}
 
@Override
-   public RunnableFuture snapshotState(
+   public SnapshotInProgressSubtaskState snapshotState(
--- End diff --

This should probably be `final`, similarly to how 
`initializeState(OperatorStateHandles)` is `final`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83673691
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
 ---
@@ -34,10 +37,14 @@
 
private static final long serialVersionUID = -2394696997971923995L;
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(SubtaskState.class);
-
-   /** The state of the parallel operator */
-   private final ChainedStateHandle chainedStateHandle;
+   /**
+* The state of the parallel operator
+*/
+   private final ChainedStateHandle 
nonPartitionableOperatorState;
--- End diff --

I think these names don't match the names in the rest of the code base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83683321
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class holds all state handles for one operator.
+ */
+public class OperatorStateHandles {
--- End diff --

This should be `@Internal` or at least `@PublicEvolving`. Also, the name 
clashes a bit with `OperatorStateHandle` which does something quite different.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83680077
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class NonClosingStreamDecorator extends InputStream {
--- End diff --

It's quite clear what it does but Javadocs would still be nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83676027
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractPartitionedCheckpointOutputStream.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Output stream that allows to write state into several partitions.
+ * @param  type of the returned state handle.
+ */
+public abstract class AbstractPartitionedCheckpointOutputStream extends OutputStream {
--- End diff --

I think the javadoc and class name don't accurately describe what this does 
(possibly due to some refactoring). Now it should probably be called something 
like `NonClosingCheckpointOutputStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83662569
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -80,38 +81,38 @@
// 

 
private final List topics;
-   
+
/** The schema to convert between Kafka's byte messages, and Flink's 
objects */
protected final KeyedDeserializationSchema deserializer;
 
/** The set of topic partitions that the source will read */
protected List subscribedPartitions;
-   
+
/** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
 * to exploit per-partition timestamp characteristics.
 * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
private SerializedValue 
periodicWatermarkAssigner;
-   
+
/** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
-* to exploit per-partition timestamp characteristics. 
+* to exploit per-partition timestamp characteristics.
 * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
private SerializedValue 
punctuatedWatermarkAssigner;
 
-   private transient OperatorStateStore stateStore;
+   private transient ListState offsetsStateForCheckpoint;
--- End diff --

This can can have a more concrete type. You changed 
`OperatorStateStore.getSerializableListState` to this:
```
 ListState getSerializableListState(String 
stateName) throws Exception;
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83677603
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -132,7 +133,7 @@ public DefaultOperatorStateBackend(ClassLoader 
userClassLoader) {
}
 
/**
-* @see SnapshotProvider
+* @see Snapshotable
--- End diff --

I think an empty Javadoc does simply prevent tools from displaying the 
Javadoc of the overridden method so it's probably best to remove those. There 
are also more instances of that in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83679164
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsList.java 
---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public interface KeyGroupsList extends Iterable {
--- End diff --

This one could benefit from some Javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83661373
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -348,6 +345,11 @@ public void prepareSnapshot(long checkpointId, long 
timestamp) throws Exception
}
}
 
+   @Override
--- End diff --

The methods don't need to be reordered here. Also, the state store is not 
used anywhere, as far as I can see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83661983
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -55,20 +56,20 @@
 /**
  * Base class of all Flink Kafka Consumer data sources.
  * This implements the common behavior across all Kafka versions.
- * 
+ *
--- End diff --

This file contains a lot of whitespace changes. It would be good to remove 
them before we merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83679673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.concurrent.RunnableFuture;
+
+public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext {
--- End diff --

Some Javadocs would probably be helpful. 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83666257
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 ---
@@ -18,14 +18,18 @@
 
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.io.Serializable;
 import java.util.Set;
 
 /**
- * Interface for a backend that manages operator state.
+ * This interface contains methods for registering operator state with a 
managed store.
  */
+@PublicEvolving
 public interface OperatorStateStore {
 
+   /** The default namespace for state in cases where no state name is 
provided */
String DEFAULT_OPERATOR_STATE_NAME = "_default_";
--- End diff --

This is an implementation detail that should not be exposed on this 
interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83678862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * This interface provides a context in which operators that use managed 
state (i.e. state that is managed by state
+ * backends) can perform a snapshot. As snapshots of the backends 
themselves are taken by the system, this interface
+ * mainly provides meta information about the checkpoint.
+ */
+@PublicEvolving
+public interface ManagedSnapshotContext {
--- End diff --

Same comments as for `ManagedInitializationContext` hold here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83678698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
+
+/**
+ * This interface provides a context in which operators can initialize by 
registering to managed state (i.e. state that
+ * is managed by state backends).
+ * 
+ *
+ * Operator state is available to all operators, while keyed state is only 
available for operators after keyBy.
+ * 
+ *
+ * For the purpose of initialization, the context signals if the state is 
empty (new operator) or was restored from
+ * a previous execution of this operator.
+ *
+ */
+public interface ManagedInitializationContext {
--- End diff --

I think this interface and its sub interfaces/implementations should be in 
the same module as `AbstractStreamOperator` and somewhere in the api package 
space.

Also, the naming could be changed to something like 
`StateInitializationContext` -> `FunctionInitializationContext` -> 
`OperatorInitializationContext`. Or something reflecting their purpose but 
`StateInitializationContext` should be at the bottom of the hierarchy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83676729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
 ---
@@ -25,6 +25,13 @@
 import java.util.HashSet;
 import java.util.Set;
 
+/**
+ * This class allows to register instances of {@link Closeable}, which are 
all closed if this registry is closed.
+ * 
--- End diff --

The correct way of separating paragraphs in Javadoc is this:
```
Paragraph one.

Paragraph two

...
```

I know it's not proper HTML nowadays but that's how it's supposed to be ... 
😜 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83669355
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState(
 
LOG.info("Restoring from latest valid checkpoint: {}.", 
latest);
 
-   for (Map.Entry 
taskGroupStateEntry: latest.getTaskStates().entrySet()) {
-   TaskState taskState = 
taskGroupStateEntry.getValue();
-   ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
-
-   if (executionJobVertex != null) {
-   // check that the number of key groups 
have not changed
-   if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
-   throw new 
IllegalStateException("The maximum parallelism (" +
-   
taskState.getMaxParallelism() + ") with which the latest " +
-   "checkpoint of the 
execution job vertex " + executionJobVertex +
-   " has been taken and 
the current maximum parallelism (" +
-   
executionJobVertex.getMaxParallelism() + ") changed. This " +
-   "is currently not 
supported.");
-   }
-
-
-   int oldParallelism = 
taskState.getParallelism();
-   int newParallelism = 
executionJobVertex.getParallelism();
-   boolean parallelismChanged = 
oldParallelism != newParallelism;
-   boolean hasNonPartitionedState = 
taskState.hasNonPartitionedState();
-
-   if (hasNonPartitionedState && 
parallelismChanged) {
-   throw new 
IllegalStateException("Cannot restore the latest checkpoint because " +
-   "the operator " + 
executionJobVertex.getJobVertexId() + " has non-partitioned " +
-   "state and its 
parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
-   " has parallelism " + 
newParallelism + " whereas the corresponding" +
-   "state object has a 
parallelism of " + oldParallelism);
-   }
-
-   List keyGroupPartitions 
= createKeyGroupPartitions(
-   
executionJobVertex.getMaxParallelism(),
-   newParallelism);
-   
-   // operator chain index -> list of the 
stored partitionables states from all parallel instances
-   @SuppressWarnings("unchecked")
-   List[] 
chainParallelStates =
-   new 
List[taskState.getChainLength()];
-
-   for (int i = 0; i < oldParallelism; 
++i) {
-
-   
ChainedStateHandle partitionableState =
-   
taskState.getPartitionableState(i);
-
-   if (partitionableState != null) 
{
-   for (int j = 0; j < 
partitionableState.getLength(); ++j) {
-   
OperatorStateHandle opParalleState = partitionableState.get(j);
-   if 
(opParalleState != null) {
-   
List opParallelStates =
-   
chainParallelStates[j];
-   if 
(opParallelStates == null) {
-   
opParallelStates = new ArrayList<>();
-   
chainParallelStates[j] = opParallelStates;
-   }
-   

[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83664806
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -48,7 +43,7 @@
  * {@link AbstractRichFunction#getRuntimeContext()}.
  */
 @Public
-public interface RuntimeContext {
+public interface RuntimeContext extends KeyedStateStore {
--- End diff --

I think it would be better to not have `RuntimeContext` be a 
`KeyedStateStore`. In the not-so-far future `RuntimeContext` will probably 
provide a `KeyedStateStore` or at least use one internally to implement the 
state methods. Properly separating the two now seems prudent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83674157
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.util.CollectionUtil;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class encapsulates all state handles for a task.
+ */
+public class TaskStateHandles implements Serializable {
--- End diff --

Very good addition for simplifying the handling of all the state handles 
that are flying around. 👍 😄 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83670574
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -38,8 +38,8 @@
private final StreamStateHandle delegateStateHandle;
 
public OperatorStateHandle(
-   StreamStateHandle delegateStateHandle,
-   Map stateNameToPartitionOffsets) {
+   Map stateNameToPartitionOffsets,
--- End diff --

This is only changing ordering but it's triggering some one-line changes in 
other files that make it hard to keep track of what changes are really changes. 
Could you maybe revert that and change it in a follow-up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83663696
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -305,39 +306,42 @@ public void close() throws Exception {
super.close();
}
}
-   
+
// 

//  Checkpoint and restore
// 

 
-   @Override
-   public void initializeState(OperatorStateStore stateStore) throws 
Exception {
 
-   this.stateStore = stateStore;
+   @Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
-   ListState offsets =
-   
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+   OperatorStateStore stateStore = 
context.getManagedOperatorStateStore();
+   offsetsStateForCheckpoint = 
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
-   restoreToOffset = new HashMap<>();
+   if (context.isRestored()) {
+   restoreToOffset = new HashMap<>();
+   for (Serializable serializable : 
offsetsStateForCheckpoint.get()) {
+   @SuppressWarnings("unchecked")
+   Tuple2 kafkaOffset = 
(Tuple2) serializable;
+   restoreToOffset.put(kafkaOffset.f0, 
kafkaOffset.f1);
+   }
 
-   for (Serializable serializable : offsets.get()) {
-   @SuppressWarnings("unchecked")
-   Tuple2 kafkaOffset = 
(Tuple2) serializable;
-   restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
+   LOG.info("Setting restore state in the 
FlinkKafkaConsumer.");
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Using the following offsets: {}", 
restoreToOffset);
+   }
+   } else {
+   LOG.info("No restore state for FlinkKafkaConsumer.");
}
-
-   LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoreToOffset);
}
 
@Override
-   public void prepareSnapshot(long checkpointId, long timestamp) throws 
Exception {
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
if (!running) {
LOG.debug("storeOperatorState() called on closed 
source");
} else {
 
-   ListState listState =
-   
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
-   listState.clear();
+   offsetsStateForCheckpoint.clear();
 
final AbstractFetcher fetcher = this.kafkaFetcher;
if (fetcher == null) {
--- End diff --

This is a workaround for the fact that we initialise the fetcher in `run()` 
and not in `open()`. Might be worthwhile to change that in a follow-up, if at 
all possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2648

[FLINK-4844] Partitionable Raw Keyed/Operator State

Partitionable operator and keyed state are currently only available by 
using backends. However, the serialization code for many operators is build 
around reading/writing their state to a stream for checkpointing. We want to 
provide partitionable states also through streams, so that migrating existing 
operators becomes more easy.

This PR includes the following main changes:

# 1) `KeyedStateCheckpointOutputStream` and 
`OperatorStateCheckpointedOutputStream`
Those class allow writing partitionable keyed (and operator) state in a 
stream for checkpointing. They enhance the basic stream interface with methods 
to signal the start of new partitions.

# 2) Changes to `StreamTask` and `AbstractStreamOperator`

The lifecycle of StreamTask is slightly modified for the initialization of 
operator states. In `AbstractStreamOperator`, two new hooks have ben added that 
new operators can override:

```
/**
 * Stream operators with state, which want to participate in a snapshot 
need to override this hook method.
 *
 * @param context context that provides information and means required 
for taking a snapshot
 */
public void snapshotState(StateSnapshotContext context) throws 
Exception {

}

/**
 * Stream operators with state which can be restored need to override 
this hook method.
 *
 * @param context context that allows to register different states.
 */
public void initializeState(StateInitializationContext context) throws 
Exception {

}
```

Access to snapshot/restore partitionable state is provided through the 
respective context

# 3) Exposing partitionable states to UDFs

The interface `CheckpointedFunction` must be implemented by stateful UDFs:

```
/**
 * This method is called when a snapshot for a checkpoint is requested. 
This acts as a hook to the function to
 * ensure that all state is exposed by means previously offered through 
{@link FunctionInitializationContext} when
 * the Function was initialized, or offered now by {@link 
FunctionSnapshotContext} itself.
 *
 * @param context the context for drawing a snapshot of the operator
 * @throws Exception
 */
void snapshotState(FunctionSnapshotContext context) throws Exception;

/**
 * This method is called when an operator is initialized, so that the 
function can set up it's state through
 * the provided context. Initialization typically includes registering 
user states through the state stores
 * that the context offers.
 *
 * @param context the context for initializing the operator
 * @throws Exception
 */
void initializeState(FunctionInitializationContext context) throws 
Exception;
```

Contexts for initialization and snapshot provide a subset of the 
functionality of the internal contexts from `AbstractStreamOperator` and which 
is safe to present to user code.

# 4) This PR also introduces serval classes that bundle state handles
One example of this would be `TaskStateHandles`. The purpose of this is a) 
reducing the number of parameters passed through several methods and b) making 
adding/removing state handles simpler.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
stream-keyed-state-checkpointing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2648


commit 2f72bdfe9e39f76a766c5fad68500caa39e9a624
Author: Stefan Richter 
Date:   2016-10-04T08:59:38Z

StreamCheckpointed operator state WIP

commit f3a1ee0963ac8521cb9581dcb1ab6547c50c304f
Author: Stefan Richter 
Date:   2016-10-14T10:15:07Z

Added to One/TwoInputStreamOperatorTestHarness

commit b3d7b683b6fd3e6a7fb2158e67fe94ac6e28e730
Author: Stefan Richter 
Date:   2016-10-17T09:38:50Z

Small optimizations for GC friendliness.

commit 614592d475486c1a2eff8e9fc24a423ff18a78bf
Author: Stefan Richter 
Date:   2016-10-13T09:32:19Z

Added AbstractUdfStreamOperatorLifecycleTest

commit 886ca3e2d4362db2f5125b0c1ea1f2078042c456
Author: Stefan Richter 
Date:   2016-10-17T11:51:40Z

Rename KeyedStateCheckpointOutputStream




---
If your project is set up for it, you can reply to this email and have your
reply