[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r251053803 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java ## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.RunnableFuture; + +import static org.junit.Assert.assertEquals; + +class StateSnapshotTransformerTest { Review comment: Test classes should always extend `TestLogger`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r251053803 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java ## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.RunnableFuture; + +import static org.junit.Assert.assertEquals; + +class StateSnapshotTransformerTest { Review comment: Test classes should always extend `TestLogger`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r250218492 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + final StateSnapshotTransformFactory snapshotTransformFactory; + + RocksDBSnapshotTransformFactoryAdaptor(StateSnapshotTransformFactory snapshotTransformFactory) { + this.snapshotTransformFactory = snapshotTransformFactory; + } + + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + TypeSerializer elementSerializer = ((ListStateDescriptor) stateDesc).getElementSerializer(); Review comment: Also if the `TypeSerializer` is stateful, we might run into problems because we are sharing the elementSerializer across all `StateSnapshotTransformers`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r250212255 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + final StateSnapshotTransformFactory snapshotTransformFactory; + + RocksDBSnapshotTransformFactoryAdaptor(StateSnapshotTransformFactory snapshotTransformFactory) { + this.snapshotTransformFactory = snapshotTransformFactory; + } + + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + TypeSerializer elementSerializer = ((ListStateDescriptor) stateDesc).getElementSerializer(); Review comment: I would like to hear @tzulitai opinion on the way we obtain the element serializer here. If I understood it correctly, then we should obtain it from the meta state information because it might be reconfigured. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r250210160 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.RunnableFuture; + +import static org.junit.Assert.assertEquals; + +class StateSnapshotTransformerTest { + private final AbstractKeyedStateBackend backend; + private final BlockerCheckpointStreamFactory streamFactory; + private final StateSnapshotTransformFactory snapshotTransformFactory; + + StateSnapshotTransformerTest( + AbstractKeyedStateBackend backend, + BlockerCheckpointStreamFactory streamFactory) { + + this.backend = backend; + this.streamFactory = streamFactory; + this.snapshotTransformFactory = SingleThreadAccessCheckingsnapshotTransformFactory.create(); + } + + void testNonConcurrentSnapshotTransformerAccess() throws Exception { + List testStates = Arrays.asList( + new TestValueState(), + new TestListState(), + new TestMapState() + ); + + for (TestState state : testStates) { + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + state.setToRandomValue(); + } + + CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); + + RunnableFuture> snapshot1 = + backend.snapshot(1L, 0L, streamFactory, checkpointOptions); + + RunnableFuture> snapshot2 = + backend.snapshot(2L, 0L, streamFactory, checkpointOptions); + + Thread runner1 = new Thread(snapshot1, "snapshot1"); + runner1.start(); + Thread runner2 = new Thread(snapshot2, "snapshot2"); + runner2.start(); + + runner1.join(); + runner2.join(); + + snapshot1.get(); + snapshot2.get(); + } + } + + private abstract class TestState { + final Random rnd; + + private TestState() { + this.rnd = new Random(); + } + + abstract void setToRandomValue() throws Exception; + + String getRandomString() { + return StringUtils.getRandomString(rnd, 5, 10); + } + } + + private class TestValueState extends TestState { + private final InternalValueState state; + + private TestValueState() throws Exception { + this.state = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("TestValueState", StringSerializer.INSTANCE), +
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r250206990 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED; + +/** Collection of common state snapshot transformers and their factories. */ +public class StateSnapshotTransformers { Review comment: Alright, makes sense. Thanks for the clarification. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247968937 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), + createSingleThreadAccessCheckingStateSnapshotTransformFactory()); + + valueState.setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + valueState.update(StringUtils.getRandomString(rnd,5, 10)); + } + + CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); + + RunnableFuture> snapshot1 = + backend.snapshot(1L, 0L, streamFactory, checkpointOptions); + + RunnableFuture> snapshot2 = + backend.snapshot(2L, 0L, streamFactory, checkpointOptions); + + Thread runner1 = new Thread(snapshot1, "snapshot1"); + runner1.start(); + Thread runner2 = new Thread(snapshot2, "snapshot2"); + runner2.start(); + + runner1.join(); + runner2.join(); + + snapshot1.get(); + snapshot2.get(); + } finally { + if (backend != null) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + + private static StateSnapshotTransformFactory + createSingleThreadAccessCheckingStateSnapshotTransformFactory() { + return new StateSnapshotTransformFactory() { + @Override + public Optional> createForDeserializedState() { + return createStateSnapshotTransformer(); + } + + @Override + public Optional> createForSerializedState() { + return createStateSnapshotTransformer(); + } + + private Optional> createStateSnapshotTransformer() { + return Optional.of(new StateSnapshotTransformer() { + private Thread currentThread = null; + + @Nullable + @Override + public T1 filterOrTransform(@Nullable T1 value) { + if (currentThread == null) { + currentThread = Thread.currentThread(); + } else { + assertEquals("Concurrent access from another thread", + currentThread, Thread.currentThread()); + } Review comment: The current implementation assumes that there is only a single element to transform, right? Otherwise it should fail with the second element. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247980006 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), Review comment: Would be good to not only test with the `ValueStateDescriptor`. I think there might be a bug with the RocksDB list and map transformer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247967726 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import javax.annotation.Nullable; Review comment: this import should be separated according to Flink's import guidelines: https://flink.apache.org/contribute-code.html#code-style This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247977733 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformerFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + Optional> original = snapshotTransformFactory.createForDeserializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return original.map(est -> (StateSnapshotTransformer) createRocksDBListStateTransformer(stateDesc, est)); + } + }; + } else if (stateDesc instanceof MapStateDescriptor) { + Optional> original = snapshotTransformFactory.createForSerializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return original.map(RocksDBMapState.StateSnapshotTransformerWrapper::new); + } + }; + } else { + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return snapshotTransformFactory.createForSerializedState(); + } + }; + } + } + + @SuppressWarnings("unchecked") + private static StateSnapshotTransformer createRocksDBListStateTransformer( + StateDescriptor stateDesc, + StateSnapshotTransformer elementTransformer) { + return (StateSnapshotTransformer) new RocksDBListState.StateSnapshotTransformerWrapper<>( + elementTransformer, ((ListStateDescriptor) stateDesc).getElementSerializer()); + } +} Review comment: I think there is no need for all this type casting: ``` return new RocksDBSnapshotTransformFactoryAdaptor() { @Override public Optional> createForSerializedState() { return original.map(est -> createRocksDBListStateTransformer(((ListStateDescriptor) stateDesc), est)); } private StateSnapshotTransformer createRocksDBListStateTransformer( ListStateDescriptor stateDesc, StateSnapshotTransformer elementTransformer) { return new RocksDBListState.StateSnapshotTransformerWrapper<>( elementTransformer, stateDesc.getElementSerializer()); } }; ``` This also includes changes to the the anonymous class.
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247979598 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformerFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + Optional> original = snapshotTransformFactory.createForDeserializedState(); Review comment: Is it safe to share this instance of the `StateSnapshotTransformer` in the `RocksDBSnapshotTransformFactoryAdaptor`? I think this can lead to the same problem we are trying to fix here. Maybe adding a test for this would be good if I'm not wrong here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247968390 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED; + +/** Collection of common state snapshot transformers and their factories. */ +public class StateSnapshotTransformers { Review comment: Why did you group the implementations under `StateSnapshotTransformers`? They could also simply live in their own package, right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247969937 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), + createSingleThreadAccessCheckingStateSnapshotTransformFactory()); + + valueState.setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + valueState.update(StringUtils.getRandomString(rnd,5, 10)); Review comment: whitespace missing This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247966443 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), + createSingleThreadAccessCheckingStateSnapshotTransformFactory()); + + valueState.setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + valueState.update(StringUtils.getRandomString(rnd,5, 10)); + } + + CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); + + RunnableFuture> snapshot1 = + backend.snapshot(1L, 0L, streamFactory, checkpointOptions); + + RunnableFuture> snapshot2 = + backend.snapshot(2L, 0L, streamFactory, checkpointOptions); + + Thread runner1 = new Thread(snapshot1, "snapshot1"); + runner1.start(); + Thread runner2 = new Thread(snapshot2, "snapshot2"); + runner2.start(); + + runner1.join(); + runner2.join(); + + snapshot1.get(); + snapshot2.get(); + } finally { + if (backend != null) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + + private static StateSnapshotTransformFactory + createSingleThreadAccessCheckingStateSnapshotTransformFactory() { Review comment: This is only my personal taste, but I like it more if the return type and the function name is on the same line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247960807 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java ## @@ -145,13 +146,13 @@ private RegisteredKeyValueStateBackendMetaInfo( return stateSerializerProvider.previousSchemaSerializer(); } - @Nullable - public StateSnapshotTransformer getSnapshotTransformer() { - return snapshotTransformer; + @Nonnull + public StateSnapshotTransformFactory getStateSnapshotTransformFactory() { + return stateSnapshotTransformFactory; } - public void updateSnapshotTransformer(StateSnapshotTransformer snapshotTransformer) { - this.snapshotTransformer = snapshotTransformer; + public void updateSnapshotTransformerFactory(StateSnapshotTransformFactory stateSnapshotTransformFactory) { Review comment: `er` not needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247968937 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), + createSingleThreadAccessCheckingStateSnapshotTransformFactory()); + + valueState.setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + valueState.update(StringUtils.getRandomString(rnd,5, 10)); + } + + CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); + + RunnableFuture> snapshot1 = + backend.snapshot(1L, 0L, streamFactory, checkpointOptions); + + RunnableFuture> snapshot2 = + backend.snapshot(2L, 0L, streamFactory, checkpointOptions); + + Thread runner1 = new Thread(snapshot1, "snapshot1"); + runner1.start(); + Thread runner2 = new Thread(snapshot2, "snapshot2"); + runner2.start(); + + runner1.join(); + runner2.join(); + + snapshot1.get(); + snapshot2.get(); + } finally { + if (backend != null) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + + private static StateSnapshotTransformFactory + createSingleThreadAccessCheckingStateSnapshotTransformFactory() { + return new StateSnapshotTransformFactory() { + @Override + public Optional> createForDeserializedState() { + return createStateSnapshotTransformer(); + } + + @Override + public Optional> createForSerializedState() { + return createStateSnapshotTransformer(); + } + + private Optional> createStateSnapshotTransformer() { + return Optional.of(new StateSnapshotTransformer() { + private Thread currentThread = null; + + @Nullable + @Override + public T1 filterOrTransform(@Nullable T1 value) { + if (currentThread == null) { + currentThread = Thread.currentThread(); + } else { + assertEquals("Concurrent access from another thread", + currentThread, Thread.currentThread()); + } Review comment: The current implementation assumes that there is only a single element to transform, right? Otherwise it should fail with the second element. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247973376 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformerFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + Optional> original = snapshotTransformFactory.createForDeserializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return original.map(est -> (StateSnapshotTransformer) createRocksDBListStateTransformer(stateDesc, est)); + } + }; + } else if (stateDesc instanceof MapStateDescriptor) { + Optional> original = snapshotTransformFactory.createForSerializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return original.map(RocksDBMapState.StateSnapshotTransformerWrapper::new); + } + }; + } else { + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return snapshotTransformFactory.createForSerializedState(); + } + }; + } + } + + @SuppressWarnings("unchecked") + private static StateSnapshotTransformer createRocksDBListStateTransformer( Review comment: This method could be moved into the inner class which actually uses it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247973273 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformerFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + Optional> original = snapshotTransformFactory.createForDeserializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { Review comment: Why using anonymous classes and not giving the class a proper name? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r246327200 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformer.java ## @@ -183,4 +207,17 @@ public MapStateSnapshotTransformer(StateSnapshotTransformer entryValueTransfo Optional> createForSerializedState(); } + + abstract class StateSnapshotTransformFactoryWrapAdaptor implements StateSnapshotTransformFactory { Review comment: But maybe we could find a bit more meaningful name for this class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r245370394 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformer.java ## @@ -183,4 +207,17 @@ public MapStateSnapshotTransformer(StateSnapshotTransformer entryValueTransfo Optional> createForSerializedState(); } + + abstract class StateSnapshotTransformFactoryWrapAdaptor implements StateSnapshotTransformFactory { Review comment: Maybe `CollectionStateSnapshotTransformerFactory` would be a better name This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r245358031 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ## @@ -229,7 +227,7 @@ public HeapKeyedStateBackend( private StateTable tryRegisterStateTable( TypeSerializer namespaceSerializer, StateDescriptor stateDesc, - @Nullable StateSnapshotTransformer snapshotTransformer) throws StateMigrationException { + @Nonnull StateSnapshotTransformFactory snapshotTransformer) throws StateMigrationException { Review comment: change parameter name This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r245370513 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformer.java ## @@ -120,6 +122,17 @@ public ListStateSnapshotTransformer(StateSnapshotTransformer entryValueTransf } } + class ListStateSnapshotTransformFactory extends StateSnapshotTransformFactoryWrapAdaptor> { Review comment: Why do all these factory classes live within the `StateSnapshotTransformer`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r245368875 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java ## @@ -147,7 +147,8 @@ public StateKeyGroupWriter getKeyGroupWriter() { localKeySerializer.serialize(element.key, dov); localStateSerializer.serialize(element.state, dov); }; - StateSnapshotTransformer stateSnapshotTransformer = owningStateTable.metaInfo.getSnapshotTransformer(); + StateSnapshotTransformer stateSnapshotTransformer = owningStateTable.metaInfo. + getStateSnapshotTransformFactory().createForDeserializedState().orElse(null); Review comment: The part below this line could be refactored to not check again if `stateSnapshotTransformer != null`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r245362372 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java ## @@ -145,13 +146,13 @@ private RegisteredKeyValueStateBackendMetaInfo( return stateSerializerProvider.previousSchemaSerializer(); } - @Nullable - public StateSnapshotTransformer getSnapshotTransformer() { - return snapshotTransformer; + @Nonnull + public StateSnapshotTransformFactory getStateSnapshotTransformFactory() { Review comment: Should the factory be called `StateSnapshotTransformerFactory`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services