[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-25 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r251053803
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
 ##
 @@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.RunnableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+class StateSnapshotTransformerTest {
 
 Review comment:
   Test classes should always extend `TestLogger`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-25 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r251053803
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
 ##
 @@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.RunnableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+class StateSnapshotTransformerTest {
 
 Review comment:
   Test classes should always extend `TestLogger`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r250218492
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import java.util.Optional;
+
+abstract class RocksDBSnapshotTransformFactoryAdaptor implements 
StateSnapshotTransformFactory {
+   final StateSnapshotTransformFactory snapshotTransformFactory;
+
+   
RocksDBSnapshotTransformFactoryAdaptor(StateSnapshotTransformFactory 
snapshotTransformFactory) {
+   this.snapshotTransformFactory = snapshotTransformFactory;
+   }
+
+   @Override
+   public Optional> 
createForDeserializedState() {
+   throw new UnsupportedOperationException("Only serialized state 
filtering is supported in RocksDB backend");
+   }
+
+   @SuppressWarnings("unchecked")
+   static  StateSnapshotTransformFactory 
wrapStateSnapshotTransformFactory(
+   StateDescriptor stateDesc,
+   StateSnapshotTransformFactory snapshotTransformFactory) {
+   if (stateDesc instanceof ListStateDescriptor) {
+   TypeSerializer elementSerializer =  
((ListStateDescriptor) stateDesc).getElementSerializer();
 
 Review comment:
   Also if the `TypeSerializer` is stateful, we might run into problems because 
we are sharing the elementSerializer across all `StateSnapshotTransformers`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r250212255
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import java.util.Optional;
+
+abstract class RocksDBSnapshotTransformFactoryAdaptor implements 
StateSnapshotTransformFactory {
+   final StateSnapshotTransformFactory snapshotTransformFactory;
+
+   
RocksDBSnapshotTransformFactoryAdaptor(StateSnapshotTransformFactory 
snapshotTransformFactory) {
+   this.snapshotTransformFactory = snapshotTransformFactory;
+   }
+
+   @Override
+   public Optional> 
createForDeserializedState() {
+   throw new UnsupportedOperationException("Only serialized state 
filtering is supported in RocksDB backend");
+   }
+
+   @SuppressWarnings("unchecked")
+   static  StateSnapshotTransformFactory 
wrapStateSnapshotTransformFactory(
+   StateDescriptor stateDesc,
+   StateSnapshotTransformFactory snapshotTransformFactory) {
+   if (stateDesc instanceof ListStateDescriptor) {
+   TypeSerializer elementSerializer =  
((ListStateDescriptor) stateDesc).getElementSerializer();
 
 Review comment:
   I would like to hear @tzulitai opinion on the way we obtain the element 
serializer here. If I understood it correctly, then we should obtain it from 
the meta state information because it might be reconfigured.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r250210160
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.RunnableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+class StateSnapshotTransformerTest {
+   private final AbstractKeyedStateBackend backend;
+   private final BlockerCheckpointStreamFactory streamFactory;
+   private final StateSnapshotTransformFactory snapshotTransformFactory;
+
+   StateSnapshotTransformerTest(
+   AbstractKeyedStateBackend backend,
+   BlockerCheckpointStreamFactory streamFactory) {
+
+   this.backend = backend;
+   this.streamFactory = streamFactory;
+   this.snapshotTransformFactory = 
SingleThreadAccessCheckingsnapshotTransformFactory.create();
+   }
+
+   void testNonConcurrentSnapshotTransformerAccess() throws Exception {
+   List testStates = Arrays.asList(
+   new TestValueState(),
+   new TestListState(),
+   new TestMapState()
+   );
+
+   for (TestState state : testStates) {
+   for (int i = 0; i < 100; i++) {
+   backend.setCurrentKey(i);
+   state.setToRandomValue();
+   }
+
+   CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation();
+
+   RunnableFuture> 
snapshot1 =
+   backend.snapshot(1L, 0L, streamFactory, 
checkpointOptions);
+
+   RunnableFuture> 
snapshot2 =
+   backend.snapshot(2L, 0L, streamFactory, 
checkpointOptions);
+
+   Thread runner1 = new Thread(snapshot1, "snapshot1");
+   runner1.start();
+   Thread runner2 = new Thread(snapshot2, "snapshot2");
+   runner2.start();
+
+   runner1.join();
+   runner2.join();
+
+   snapshot1.get();
+   snapshot2.get();
+   }
+   }
+
+   private abstract class TestState {
+   final Random rnd;
+
+   private TestState() {
+   this.rnd = new Random();
+   }
+
+   abstract void setToRandomValue() throws Exception;
+
+   String getRandomString() {
+   return StringUtils.getRandomString(rnd, 5, 10);
+   }
+   }
+
+   private class TestValueState extends TestState {
+   private final InternalValueState state;
+
+   private TestValueState() throws Exception {
+   this.state = backend.createInternalState(
+   VoidNamespaceSerializer.INSTANCE,
+   new ValueStateDescriptor<>("TestValueState", 
StringSerializer.INSTANCE),
+

[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r250206990
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java
 ##
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED;
+
+/** Collection of common state snapshot transformers and their factories. */
+public class StateSnapshotTransformers {
 
 Review comment:
   Alright, makes sense. Thanks for the clarification.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247968937
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ##
 @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws 
Exception {
}
}
 
+   @Test
+   public void testNonConcurrentSnapshotTransformerAccess() throws 
Exception {
+   Random rnd = new Random();
+   BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+   AbstractKeyedStateBackend backend = null;
+   try {
+   backend = createKeyedBackend(IntSerializer.INSTANCE);
+   InternalValueState 
valueState = backend.createInternalState(
+   VoidNamespaceSerializer.INSTANCE,
+   new ValueStateDescriptor<>("test", 
StringSerializer.INSTANCE),
+   
createSingleThreadAccessCheckingStateSnapshotTransformFactory());
+
+   valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+   for (int i = 0; i < 100; i++) {
+   backend.setCurrentKey(i);
+   
valueState.update(StringUtils.getRandomString(rnd,5, 10));
+   }
+
+   CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation();
+
+   RunnableFuture> 
snapshot1 =
+   backend.snapshot(1L, 0L, streamFactory, 
checkpointOptions);
+
+   RunnableFuture> 
snapshot2 =
+   backend.snapshot(2L, 0L, streamFactory, 
checkpointOptions);
+
+   Thread runner1 = new Thread(snapshot1, "snapshot1");
+   runner1.start();
+   Thread runner2 = new Thread(snapshot2, "snapshot2");
+   runner2.start();
+
+   runner1.join();
+   runner2.join();
+
+   snapshot1.get();
+   snapshot2.get();
+   } finally {
+   if (backend != null) {
+   IOUtils.closeQuietly(backend);
+   backend.dispose();
+   }
+   }
+   }
+
+   private static  StateSnapshotTransformFactory
+   createSingleThreadAccessCheckingStateSnapshotTransformFactory() {
+   return new StateSnapshotTransformFactory() {
+   @Override
+   public Optional> 
createForDeserializedState() {
+   return createStateSnapshotTransformer();
+   }
+
+   @Override
+   public Optional> 
createForSerializedState() {
+   return createStateSnapshotTransformer();
+   }
+
+   private  Optional> 
createStateSnapshotTransformer() {
+   return Optional.of(new 
StateSnapshotTransformer() {
+   private Thread currentThread = null;
+
+   @Nullable
+   @Override
+   public T1 filterOrTransform(@Nullable 
T1 value) {
+   if (currentThread == null) {
+   currentThread = 
Thread.currentThread();
+   } else {
+   
assertEquals("Concurrent access from another thread",
+   currentThread, 
Thread.currentThread());
+   }
 
 Review comment:
   The current implementation assumes that there is only a single element to 
transform, right? Otherwise it should fail with the second element.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247980006
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ##
 @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws 
Exception {
}
}
 
+   @Test
+   public void testNonConcurrentSnapshotTransformerAccess() throws 
Exception {
+   Random rnd = new Random();
+   BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+   AbstractKeyedStateBackend backend = null;
+   try {
+   backend = createKeyedBackend(IntSerializer.INSTANCE);
+   InternalValueState 
valueState = backend.createInternalState(
+   VoidNamespaceSerializer.INSTANCE,
+   new ValueStateDescriptor<>("test", 
StringSerializer.INSTANCE),
 
 Review comment:
   Would be good to not only test with the `ValueStateDescriptor`. I think 
there might be a bug with the RocksDB list and map transformer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247967726
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java
 ##
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import javax.annotation.Nullable;
 
 Review comment:
   this import should be separated according to Flink's import guidelines: 
https://flink.apache.org/contribute-code.html#code-style


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247977733
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import java.util.Optional;
+
+abstract class RocksDBSnapshotTransformFactoryAdaptor implements 
StateSnapshotTransformFactory {
+   @Override
+   public Optional> 
createForDeserializedState() {
+   throw new UnsupportedOperationException("Only serialized state 
filtering is supported in RocksDB backend");
+   }
+
+   @SuppressWarnings("unchecked")
+   static  StateSnapshotTransformFactory 
wrapStateSnapshotTransformerFactory(
+   StateDescriptor stateDesc,
+   StateSnapshotTransformFactory snapshotTransformFactory) {
+   if (stateDesc instanceof ListStateDescriptor) {
+   Optional> original = 
snapshotTransformFactory.createForDeserializedState();
+   return new RocksDBSnapshotTransformFactoryAdaptor() 
{
+   @Override
+   public 
Optional> createForSerializedState() {
+   return original.map(est -> 
(StateSnapshotTransformer) createRocksDBListStateTransformer(stateDesc, 
est));
+   }
+   };
+   } else if (stateDesc instanceof MapStateDescriptor) {
+   Optional> original = 
snapshotTransformFactory.createForSerializedState();
+   return new RocksDBSnapshotTransformFactoryAdaptor() 
{
+   @Override
+   public 
Optional> createForSerializedState() {
+   return 
original.map(RocksDBMapState.StateSnapshotTransformerWrapper::new);
+   }
+   };
+   } else {
+   return new RocksDBSnapshotTransformFactoryAdaptor() 
{
+   @Override
+   public 
Optional> createForSerializedState() {
+   return 
snapshotTransformFactory.createForSerializedState();
+   }
+   };
+   }
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  StateSnapshotTransformer 
createRocksDBListStateTransformer(
+   StateDescriptor stateDesc,
+   StateSnapshotTransformer elementTransformer) {
+   return (StateSnapshotTransformer) new 
RocksDBListState.StateSnapshotTransformerWrapper<>(
+   elementTransformer, ((ListStateDescriptor) 
stateDesc).getElementSerializer());
+   }
+}
 
 Review comment:
   I think there is no need for all this type casting:
   ```
   return new RocksDBSnapshotTransformFactoryAdaptor() {
@Override
public Optional> 
createForSerializedState() {
return original.map(est -> 
createRocksDBListStateTransformer(((ListStateDescriptor) stateDesc), est));
}
   
private  StateSnapshotTransformer 
createRocksDBListStateTransformer(
ListStateDescriptor stateDesc,
StateSnapshotTransformer elementTransformer) {
return new RocksDBListState.StateSnapshotTransformerWrapper<>(
elementTransformer, stateDesc.getElementSerializer());
}
   };
   ```
   
   This also includes changes to the the anonymous class.


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247979598
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import java.util.Optional;
+
+abstract class RocksDBSnapshotTransformFactoryAdaptor implements 
StateSnapshotTransformFactory {
+   @Override
+   public Optional> 
createForDeserializedState() {
+   throw new UnsupportedOperationException("Only serialized state 
filtering is supported in RocksDB backend");
+   }
+
+   @SuppressWarnings("unchecked")
+   static  StateSnapshotTransformFactory 
wrapStateSnapshotTransformerFactory(
+   StateDescriptor stateDesc,
+   StateSnapshotTransformFactory snapshotTransformFactory) {
+   if (stateDesc instanceof ListStateDescriptor) {
+   Optional> original = 
snapshotTransformFactory.createForDeserializedState();
 
 Review comment:
   Is it safe to share this instance of the `StateSnapshotTransformer` in the 
`RocksDBSnapshotTransformFactoryAdaptor`? I think this can lead to the same 
problem we are trying to fix here. Maybe adding a test for this would be good 
if I'm not wrong here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247968390
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java
 ##
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED;
+
+/** Collection of common state snapshot transformers and their factories. */
+public class StateSnapshotTransformers {
 
 Review comment:
   Why did you group the implementations under `StateSnapshotTransformers`? 
They could also simply live in their own package, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247969937
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ##
 @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws 
Exception {
}
}
 
+   @Test
+   public void testNonConcurrentSnapshotTransformerAccess() throws 
Exception {
+   Random rnd = new Random();
+   BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+   AbstractKeyedStateBackend backend = null;
+   try {
+   backend = createKeyedBackend(IntSerializer.INSTANCE);
+   InternalValueState 
valueState = backend.createInternalState(
+   VoidNamespaceSerializer.INSTANCE,
+   new ValueStateDescriptor<>("test", 
StringSerializer.INSTANCE),
+   
createSingleThreadAccessCheckingStateSnapshotTransformFactory());
+
+   valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+   for (int i = 0; i < 100; i++) {
+   backend.setCurrentKey(i);
+   
valueState.update(StringUtils.getRandomString(rnd,5, 10));
 
 Review comment:
   whitespace missing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247966443
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ##
 @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws 
Exception {
}
}
 
+   @Test
+   public void testNonConcurrentSnapshotTransformerAccess() throws 
Exception {
+   Random rnd = new Random();
+   BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+   AbstractKeyedStateBackend backend = null;
+   try {
+   backend = createKeyedBackend(IntSerializer.INSTANCE);
+   InternalValueState 
valueState = backend.createInternalState(
+   VoidNamespaceSerializer.INSTANCE,
+   new ValueStateDescriptor<>("test", 
StringSerializer.INSTANCE),
+   
createSingleThreadAccessCheckingStateSnapshotTransformFactory());
+
+   valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+   for (int i = 0; i < 100; i++) {
+   backend.setCurrentKey(i);
+   
valueState.update(StringUtils.getRandomString(rnd,5, 10));
+   }
+
+   CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation();
+
+   RunnableFuture> 
snapshot1 =
+   backend.snapshot(1L, 0L, streamFactory, 
checkpointOptions);
+
+   RunnableFuture> 
snapshot2 =
+   backend.snapshot(2L, 0L, streamFactory, 
checkpointOptions);
+
+   Thread runner1 = new Thread(snapshot1, "snapshot1");
+   runner1.start();
+   Thread runner2 = new Thread(snapshot2, "snapshot2");
+   runner2.start();
+
+   runner1.join();
+   runner2.join();
+
+   snapshot1.get();
+   snapshot2.get();
+   } finally {
+   if (backend != null) {
+   IOUtils.closeQuietly(backend);
+   backend.dispose();
+   }
+   }
+   }
+
+   private static  StateSnapshotTransformFactory
+   createSingleThreadAccessCheckingStateSnapshotTransformFactory() {
 
 Review comment:
   This is only my personal taste, but I like it more if the return type and 
the function name is on the same line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247960807
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
 ##
 @@ -145,13 +146,13 @@ private RegisteredKeyValueStateBackendMetaInfo(
return stateSerializerProvider.previousSchemaSerializer();
}
 
-   @Nullable
-   public StateSnapshotTransformer getSnapshotTransformer() {
-   return snapshotTransformer;
+   @Nonnull
+   public StateSnapshotTransformFactory 
getStateSnapshotTransformFactory() {
+   return stateSnapshotTransformFactory;
}
 
-   public void updateSnapshotTransformer(StateSnapshotTransformer 
snapshotTransformer) {
-   this.snapshotTransformer = snapshotTransformer;
+   public void 
updateSnapshotTransformerFactory(StateSnapshotTransformFactory 
stateSnapshotTransformFactory) {
 
 Review comment:
   `er` not needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247968937
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ##
 @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws 
Exception {
}
}
 
+   @Test
+   public void testNonConcurrentSnapshotTransformerAccess() throws 
Exception {
+   Random rnd = new Random();
+   BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
+   AbstractKeyedStateBackend backend = null;
+   try {
+   backend = createKeyedBackend(IntSerializer.INSTANCE);
+   InternalValueState 
valueState = backend.createInternalState(
+   VoidNamespaceSerializer.INSTANCE,
+   new ValueStateDescriptor<>("test", 
StringSerializer.INSTANCE),
+   
createSingleThreadAccessCheckingStateSnapshotTransformFactory());
+
+   valueState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+   for (int i = 0; i < 100; i++) {
+   backend.setCurrentKey(i);
+   
valueState.update(StringUtils.getRandomString(rnd,5, 10));
+   }
+
+   CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation();
+
+   RunnableFuture> 
snapshot1 =
+   backend.snapshot(1L, 0L, streamFactory, 
checkpointOptions);
+
+   RunnableFuture> 
snapshot2 =
+   backend.snapshot(2L, 0L, streamFactory, 
checkpointOptions);
+
+   Thread runner1 = new Thread(snapshot1, "snapshot1");
+   runner1.start();
+   Thread runner2 = new Thread(snapshot2, "snapshot2");
+   runner2.start();
+
+   runner1.join();
+   runner2.join();
+
+   snapshot1.get();
+   snapshot2.get();
+   } finally {
+   if (backend != null) {
+   IOUtils.closeQuietly(backend);
+   backend.dispose();
+   }
+   }
+   }
+
+   private static  StateSnapshotTransformFactory
+   createSingleThreadAccessCheckingStateSnapshotTransformFactory() {
+   return new StateSnapshotTransformFactory() {
+   @Override
+   public Optional> 
createForDeserializedState() {
+   return createStateSnapshotTransformer();
+   }
+
+   @Override
+   public Optional> 
createForSerializedState() {
+   return createStateSnapshotTransformer();
+   }
+
+   private  Optional> 
createStateSnapshotTransformer() {
+   return Optional.of(new 
StateSnapshotTransformer() {
+   private Thread currentThread = null;
+
+   @Nullable
+   @Override
+   public T1 filterOrTransform(@Nullable 
T1 value) {
+   if (currentThread == null) {
+   currentThread = 
Thread.currentThread();
+   } else {
+   
assertEquals("Concurrent access from another thread",
+   currentThread, 
Thread.currentThread());
+   }
 
 Review comment:
   The current implementation assumes that there is only a single element to 
transform, right? Otherwise it should fail with the second element.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247973376
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import java.util.Optional;
+
+abstract class RocksDBSnapshotTransformFactoryAdaptor implements 
StateSnapshotTransformFactory {
+   @Override
+   public Optional> 
createForDeserializedState() {
+   throw new UnsupportedOperationException("Only serialized state 
filtering is supported in RocksDB backend");
+   }
+
+   @SuppressWarnings("unchecked")
+   static  StateSnapshotTransformFactory 
wrapStateSnapshotTransformerFactory(
+   StateDescriptor stateDesc,
+   StateSnapshotTransformFactory snapshotTransformFactory) {
+   if (stateDesc instanceof ListStateDescriptor) {
+   Optional> original = 
snapshotTransformFactory.createForDeserializedState();
+   return new RocksDBSnapshotTransformFactoryAdaptor() 
{
+   @Override
+   public 
Optional> createForSerializedState() {
+   return original.map(est -> 
(StateSnapshotTransformer) createRocksDBListStateTransformer(stateDesc, 
est));
+   }
+   };
+   } else if (stateDesc instanceof MapStateDescriptor) {
+   Optional> original = 
snapshotTransformFactory.createForSerializedState();
+   return new RocksDBSnapshotTransformFactoryAdaptor() 
{
+   @Override
+   public 
Optional> createForSerializedState() {
+   return 
original.map(RocksDBMapState.StateSnapshotTransformerWrapper::new);
+   }
+   };
+   } else {
+   return new RocksDBSnapshotTransformFactoryAdaptor() 
{
+   @Override
+   public 
Optional> createForSerializedState() {
+   return 
snapshotTransformFactory.createForSerializedState();
+   }
+   };
+   }
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  StateSnapshotTransformer 
createRocksDBListStateTransformer(
 
 Review comment:
   This method could be moved into the inner class which actually uses it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-15 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r247973273
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import java.util.Optional;
+
+abstract class RocksDBSnapshotTransformFactoryAdaptor implements 
StateSnapshotTransformFactory {
+   @Override
+   public Optional> 
createForDeserializedState() {
+   throw new UnsupportedOperationException("Only serialized state 
filtering is supported in RocksDB backend");
+   }
+
+   @SuppressWarnings("unchecked")
+   static  StateSnapshotTransformFactory 
wrapStateSnapshotTransformerFactory(
+   StateDescriptor stateDesc,
+   StateSnapshotTransformFactory snapshotTransformFactory) {
+   if (stateDesc instanceof ListStateDescriptor) {
+   Optional> original = 
snapshotTransformFactory.createForDeserializedState();
+   return new RocksDBSnapshotTransformFactoryAdaptor() 
{
 
 Review comment:
   Why using anonymous classes and not giving the class a proper name?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-09 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r246327200
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformer.java
 ##
 @@ -183,4 +207,17 @@ public 
MapStateSnapshotTransformer(StateSnapshotTransformer entryValueTransfo
 
Optional> 
createForSerializedState();
}
+
+   abstract class StateSnapshotTransformFactoryWrapAdaptor 
implements StateSnapshotTransformFactory {
 
 Review comment:
   But maybe we could find a bit more meaningful name for this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-04 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r245370394
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformer.java
 ##
 @@ -183,4 +207,17 @@ public 
MapStateSnapshotTransformer(StateSnapshotTransformer entryValueTransfo
 
Optional> 
createForSerializedState();
}
+
+   abstract class StateSnapshotTransformFactoryWrapAdaptor 
implements StateSnapshotTransformFactory {
 
 Review comment:
   Maybe `CollectionStateSnapshotTransformerFactory` would be a better name


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-04 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r245358031
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ##
 @@ -229,7 +227,7 @@ public HeapKeyedStateBackend(
private  StateTable tryRegisterStateTable(
TypeSerializer namespaceSerializer,
StateDescriptor stateDesc,
-   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws StateMigrationException {
+   @Nonnull StateSnapshotTransformFactory 
snapshotTransformer) throws StateMigrationException {
 
 Review comment:
   change parameter name


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-04 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r245370513
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformer.java
 ##
 @@ -120,6 +122,17 @@ public 
ListStateSnapshotTransformer(StateSnapshotTransformer entryValueTransf
}
}
 
+   class ListStateSnapshotTransformFactory extends 
StateSnapshotTransformFactoryWrapAdaptor> {
 
 Review comment:
   Why do all these factory classes live within the `StateSnapshotTransformer`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-04 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r245368875
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
 ##
 @@ -147,7 +147,8 @@ public StateKeyGroupWriter getKeyGroupWriter() {

localKeySerializer.serialize(element.key, dov);

localStateSerializer.serialize(element.state, dov);
};
-   StateSnapshotTransformer stateSnapshotTransformer = 
owningStateTable.metaInfo.getSnapshotTransformer();
+   StateSnapshotTransformer stateSnapshotTransformer = 
owningStateTable.metaInfo.
+   
getStateSnapshotTransformFactory().createForDeserializedState().orElse(null);
 
 Review comment:
   The part below this line could be refactored to not check again if 
`stateSnapshotTransformer != null`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-04 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r245362372
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
 ##
 @@ -145,13 +146,13 @@ private RegisteredKeyValueStateBackendMetaInfo(
return stateSerializerProvider.previousSchemaSerializer();
}
 
-   @Nullable
-   public StateSnapshotTransformer getSnapshotTransformer() {
-   return snapshotTransformer;
+   @Nonnull
+   public StateSnapshotTransformFactory 
getStateSnapshotTransformFactory() {
 
 Review comment:
   Should the factory be called `StateSnapshotTransformerFactory`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services