Myasuka commented on a change in pull request #19177:
URL: https://github.com/apache/flink/pull/19177#discussion_r840197342



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark<KEY> {
+    private final int maxParallelism;
+
+    private final int parallelismBefore;
+    private final int parallelismAfter;
+
+    private final int managedMemorySize;
+
+    private final StateBackend stateBackend;
+    private final CheckpointStorageAccess checkpointStorageAccess;
+
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState stateForSubtask;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    private final StreamRecordGenerator<KEY> streamRecordGenerator;
+    private final Supplier<KeyedProcessFunction<KEY, KEY, Void>> 
stateProcessFunctionSupplier;
+
+    public RescalingBenchmark(
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final int managedMemorySize,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess,
+            final StreamRecordGenerator<KEY> streamRecordGenerator,
+            final Supplier<KeyedProcessFunction<KEY, KEY, Void>> 
stateProcessFunctionSupplier) {
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.managedMemorySize = managedMemorySize;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+        this.streamRecordGenerator = streamRecordGenerator;
+        this.stateProcessFunctionSupplier = stateProcessFunctionSupplier;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /** rescaling on one subtask, this is the benchmark entrance. */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(stateForSubtask);
+    }
+
+    /** close test harness for subtask. */
+    public void closeHarnessForSubtask() throws Exception {

Review comment:
       I think the method could be renamed to `closeOperator`, which looks 
better, as this method would not bind to the implementation.
   Remember to update related comments.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkBuilder.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Supplier;
+
+/** Builder for rescalingBenchmark. */
+public class RescalingBenchmarkBuilder<KEY> {
+    private int maxParallelism = 128;
+    private int parallelismBefore = 2;
+    private int parallelismAfter = 1;
+    private int managedMemorySize = 512 * 1024 * 1024;
+    private StateBackend stateBackend = new EmbeddedRocksDBStateBackend();
+    private RescalingBenchmark.StreamRecordGenerator<KEY> 
streamRecordGenerator;
+    private Supplier<KeyedProcessFunction<KEY, KEY, Void>> 
stateProcessFunctionSupplier;
+    private CheckpointStorageAccess checkpointStorageAccess;
+
+    public RescalingBenchmarkBuilder setMaxParallelism(int maxParallelism) {

Review comment:
       We should better return as `RescalingBenchmarkBuilder<KEY>`, the same 
for below methods.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    private static final Random random = new Random(0);

Review comment:
       Actually, we can use `ThreadLocalRandom.current()` to replace this 
`random` here as we do not need to ensure the deterministic results.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    private static final Random random = new Random(0);
+    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+    @Test
+    public void testScalingOut() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmarkBuilder()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(1)
+                        .setParallelismAfter(2)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new 
FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        .setStateProcessFunctionSupplier(() -> new 
TestKeyedFunction())
+                        .build();
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeHarnessForSubtask();
+        benchmark.tearDown();
+    }
+
+    @Test
+    public void testScalingIn() throws Exception {
+        RescalingBenchmark benchmark =
+                new RescalingBenchmarkBuilder()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(2)
+                        .setParallelismAfter(1)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new 
FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        .setStateProcessFunctionSupplier(() -> new 
TestKeyedFunction())
+                        .build();
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeHarnessForSubtask();
+        benchmark.tearDown();
+    }
+
+    @NotThreadSafe

Review comment:
       Since we only generate and process the record one by one, why we have to 
add such annotation here?

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from savepoint. */

Review comment:
       ```suggestion
   /** The benchmark of rescaling from checkpoint. */
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark<KEY> {
+    private final int maxParallelism;
+
+    private final int parallelismBefore;
+    private final int parallelismAfter;
+
+    private final int managedMemorySize;
+
+    private final StateBackend stateBackend;
+    private final CheckpointStorageAccess checkpointStorageAccess;
+
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState stateForSubtask;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    private final StreamRecordGenerator<KEY> streamRecordGenerator;
+    private final Supplier<KeyedProcessFunction<KEY, KEY, Void>> 
stateProcessFunctionSupplier;
+
+    public RescalingBenchmark(
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final int managedMemorySize,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess,
+            final StreamRecordGenerator<KEY> streamRecordGenerator,
+            final Supplier<KeyedProcessFunction<KEY, KEY, Void>> 
stateProcessFunctionSupplier) {
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.managedMemorySize = managedMemorySize;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+        this.streamRecordGenerator = streamRecordGenerator;
+        this.stateProcessFunctionSupplier = stateProcessFunctionSupplier;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /** rescaling on one subtask, this is the benchmark entrance. */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(stateForSubtask);
+    }
+
+    /** close test harness for subtask. */
+    public void closeHarnessForSubtask() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws 
Exception {

Review comment:
       Simliarly, I think the method could be renamed to 
`prepareStateForOperator`.
   Remember to update related comments.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    private static final Random random = new Random(0);
+    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+    @Test
+    public void testScalingOut() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmarkBuilder()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(1)
+                        .setParallelismAfter(2)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new 
FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        .setStateProcessFunctionSupplier(() -> new 
TestKeyedFunction())
+                        .build();

Review comment:
       ```suggestion
           RescalingBenchmark<Integer> benchmark =
                   new RescalingBenchmarkBuilder<Integer>()
                           .setMaxParallelism(128)
                           .setParallelismBefore(1)
                           .setParallelismAfter(2)
                           .setManagedMemorySize(512 * 1024 * 1024)
                           .setCheckpointStorageAccess(
                                   new 
FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
                                           .createCheckpointStorage(new 
JobID()))
                           .setStateBackend(new 
EmbeddedRocksDBStateBackend(true))
                           .setStreamRecordGenerator(new 
IntegerRecordGenerator())
                           
.setStateProcessFunctionSupplier(TestKeyedFunction::new)
                           .build();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to