This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7b65644161767e90f0834db76ba116d4d92449d5
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Sun Jan 29 14:20:51 2023 +0000

    [FLINK-30328][tests] Use RocksDBMemory factory interface in 
TaskManagerWideRocksDbMemorySharingITCase
---
 .../TaskManagerWideRocksDbMemorySharingITCase.java | 191 +++++++--------------
 1 file changed, 63 insertions(+), 128 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java
index e633bc005ad..bbc4da21655 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java
@@ -21,76 +21,57 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.StateBackendOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import 
org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.RocksDBMemoryFactory;
 import org.apache.flink.contrib.streaming.state.RocksDBOptions;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import 
org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics;
-import org.apache.flink.runtime.testutils.InMemoryReporter;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.TestLogger;
 
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.rocksdb.Cache;
+import org.rocksdb.WriteBufferManager;
 
-import java.math.BigInteger;
-import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
-import static 
org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.calculateActualCacheCapacity;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Tests that {@link RocksDBOptions#FIX_PER_TM_MEMORY_SIZE} works as expected, 
i.e. make RocksDB use
  * the same BlockCache and WriteBufferManager objects. It does so using 
RocksDB metrics.
  */
-public class TaskManagerWideRocksDbMemorySharingITCase {
+public class TaskManagerWideRocksDbMemorySharingITCase extends TestLogger {
     private static final int PARALLELISM = 4;
     private static final int NUMBER_OF_JOBS = 5;
     private static final int NUMBER_OF_TASKS = NUMBER_OF_JOBS * PARALLELISM;
-
     private static final MemorySize SHARED_MEMORY = 
MemorySize.ofMebiBytes(NUMBER_OF_TASKS * 25);
-    private static final double WRITE_BUFFER_RATIO = 0.5;
-    private static final double EXPECTED_BLOCK_CACHE_SIZE =
-            calculateActualCacheCapacity(SHARED_MEMORY.getBytes(), 
WRITE_BUFFER_RATIO);
-    // try to check that the memory usage is limited
-    // however, there is no hard limit actually
-    // because of https://issues.apache.org/jira/browse/FLINK-15532
-    private static final double EFFECTIVE_LIMIT = EXPECTED_BLOCK_CACHE_SIZE * 
1.5;
-
-    private static final int NUM_MEASUREMENTS = 100;
-
-    private InMemoryReporter metricsReporter;
     private MiniClusterWithClientResource cluster;
 
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
     @Before
     public void init() throws Exception {
-        metricsReporter = InMemoryReporter.create();
         cluster =
                 new MiniClusterWithClientResource(
                         new MiniClusterResourceConfiguration.Builder()
-                                
.setConfiguration(getConfiguration(metricsReporter))
+                                .setConfiguration(getConfiguration())
                                 .setNumberTaskManagers(1)
                                 .setNumberSlotsPerTaskManager(NUMBER_OF_TASKS)
                                 .build());
@@ -100,116 +81,48 @@ public class TaskManagerWideRocksDbMemorySharingITCase {
     @After
     public void destroy() {
         cluster.after();
-        metricsReporter.close();
     }
 
-    @Ignore
     @Test
     public void testBlockCache() throws Exception {
+        List<Cache> createdCaches = new CopyOnWriteArrayList<>();
+        List<WriteBufferManager> createdWriteBufferManagers = new 
CopyOnWriteArrayList<>();
+        TestingRocksDBMemoryFactory memoryFactory =
+                new TestingRocksDBMemoryFactory(
+                        sharedObjects.add(createdCaches),
+                        sharedObjects.add(createdWriteBufferManagers));
         List<JobID> jobIDs = new ArrayList<>(NUMBER_OF_JOBS);
         try {
-            // launch jobs
             for (int i = 0; i < NUMBER_OF_JOBS; i++) {
-                
jobIDs.add(cluster.getRestClusterClient().submitJob(dag()).get());
+                
jobIDs.add(cluster.getRestClusterClient().submitJob(dag(memoryFactory)).get());
             }
-
-            // wait for init
-            Deadline initDeadline = Deadline.fromNow(Duration.ofMinutes(1));
             for (JobID jid : jobIDs) {
                 waitForAllTaskRunning(cluster.getMiniCluster(), jid, false);
-                waitForAllMetricsReported(jid, initDeadline);
             }
+            Assert.assertEquals(1, createdCaches.size());
+            Assert.assertEquals(1, createdWriteBufferManagers.size());
 
-            // check declared capacity
-            collectGaugeValues(jobIDs, "rocksdb.block-cache-capacity")
-                    .forEach(
-                            size ->
-                                    assertEquals(
-                                            "Unexpected rocksdb block cache 
capacity",
-                                            EXPECTED_BLOCK_CACHE_SIZE,
-                                            size,
-                                            0));
-
-            // do some work and check the actual usage of memory
-            double[] deviations = new double[NUM_MEASUREMENTS];
-            for (int i = 0; i < NUM_MEASUREMENTS; i++) {
-                Thread.sleep(50L);
-                double[] blockCacheUsages =
-                        collectGaugeValues(jobIDs, "rocksdb.block-cache-usage")
-                                .mapToDouble(value -> value)
-                                .toArray();
-                assertTrue(
-                        String.format(
-                                "total block cache usage is too high: %s 
(limit: %s, effective limit: %s)",
-                                Arrays.toString(blockCacheUsages),
-                                EXPECTED_BLOCK_CACHE_SIZE,
-                                EFFECTIVE_LIMIT),
-                        Arrays.stream(blockCacheUsages).max().getAsDouble() <= 
EFFECTIVE_LIMIT);
-                deviations[i] = new 
StandardDeviation().evaluate(blockCacheUsages);
-            }
-            validateDeviations(deviations);
         } finally {
             for (JobID jobID : jobIDs) {
-                cluster.getRestClusterClient().cancel(jobID).get();
+                try {
+                    cluster.getRestClusterClient().cancel(jobID).get();
+                } catch (Exception e) {
+                    log.warn("Can not cancel job {}", jobID, e);
+                }
             }
         }
     }
 
-    private static void validateDeviations(double[] deviations) {
-        DescriptiveStatisticsHistogramStatistics percentile =
-                new DescriptiveStatisticsHistogramStatistics(deviations);
-        assertTrue(
-                String.format(
-                        "Block cache usage reported by different tasks varies 
too much: %s\n"
-                                + "That likely mean that they use different 
cache objects",
-                        Arrays.toString(deviations)),
-                // some deviation is possible because:
-                // 1. records are being processed in parallel with requesting 
metrics
-                // 2. reporting metrics is not synchronized
-                percentile.getQuantile(.50d) <= 10_000d
-                        && percentile.getQuantile(.75d) <= 500_000d);
-    }
-
-    private void waitForAllMetricsReported(JobID jid, Deadline deadline)
-            throws InterruptedException {
-        List<Double> gaugeValues = collectGaugeValues(jid, 
"rocksdb.block-cache-capacity");
-        while (deadline.hasTimeLeft() && isEmptyOrHasZeroes(gaugeValues)) {
-            Thread.sleep(100);
-            gaugeValues = collectGaugeValues(jid, 
"rocksdb.block-cache-capacity");
-        }
-        if (isEmptyOrHasZeroes(gaugeValues)) {
-            Assert.fail(
-                    String.format(
-                            "some tasks are still reporting zero cache 
capacity: %s", gaugeValues));
-        }
-    }
-
-    private boolean isEmptyOrHasZeroes(List<Double> gaugeValues) {
-        return gaugeValues.isEmpty() || gaugeValues.stream().anyMatch(x -> x 
== 0);
-    }
-
-    // collect at least one metric according to the given pattern
-    // then convert it to double and return
-    private List<Double> collectGaugeValues(JobID jobID, String metricPattern) 
{
-        //noinspection unchecked
-        List<Double> list =
-                metricsReporter.findJobMetricGroups(jobID, 
metricPattern).stream()
-                        .map(triple -> ((Gauge<BigInteger>) 
triple.f2).getValue().doubleValue())
-                        .collect(Collectors.toList());
-        checkState(!list.isEmpty());
-        return list;
-    }
-
-    private Stream<Double> collectGaugeValues(List<JobID> jobIDs, String 
metricPattern) {
-        return jobIDs.stream().flatMap(jobID -> collectGaugeValues(jobID, 
metricPattern).stream());
-    }
-
-    private JobGraph dag() {
+    private JobGraph dag(RocksDBMemoryFactory memoryFactory) {
         Configuration configuration = new Configuration();
         StreamExecutionEnvironment env =
                 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setParallelism(PARALLELISM);
 
+        EmbeddedRocksDBStateBackend backend = new 
EmbeddedRocksDBStateBackend(true);
+        backend.setRocksDBMemoryFactory(memoryFactory);
+        env.setStateBackend(backend);
+
         // don't flush memtables by checkpoints
         env.enableCheckpointing(24 * 60 * 60 * 1000, 
CheckpointingMode.EXACTLY_ONCE);
         env.setRestartStrategy(noRestart());
@@ -246,19 +159,41 @@ public class TaskManagerWideRocksDbMemorySharingITCase {
         return env.getStreamGraph().getJobGraph();
     }
 
-    private static Configuration getConfiguration(InMemoryReporter 
metricsReporter) {
+    private static Configuration getConfiguration() {
         Configuration configuration = new Configuration();
-
         configuration.set(RocksDBOptions.FIX_PER_TM_MEMORY_SIZE, 
SHARED_MEMORY);
-
-        configuration.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
         configuration.set(RocksDBOptions.USE_MANAGED_MEMORY, false);
-        configuration.setDouble(RocksDBOptions.WRITE_BUFFER_RATIO, 
WRITE_BUFFER_RATIO);
-
-        metricsReporter.addToConfiguration(configuration);
-        configuration.set(RocksDBNativeMetricOptions.BLOCK_CACHE_CAPACITY, 
true);
-        configuration.set(RocksDBNativeMetricOptions.BLOCK_CACHE_USAGE, true);
 
         return configuration;
     }
+
+    private static class TestingRocksDBMemoryFactory implements 
RocksDBMemoryFactory {
+        private final SharedReference<List<Cache>> createdCaches;
+        private final SharedReference<List<WriteBufferManager>> 
createdWriteBufferManagers;
+
+        private TestingRocksDBMemoryFactory(
+                SharedReference<List<Cache>> createdCaches,
+                SharedReference<List<WriteBufferManager>> 
createdWriteBufferManagers) {
+            this.createdCaches = createdCaches;
+            this.createdWriteBufferManagers = createdWriteBufferManagers;
+        }
+
+        @Override
+        public Cache createCache(long cacheCapacity, double 
highPriorityPoolRatio) {
+            Cache cache =
+                    RocksDBMemoryFactory.DEFAULT.createCache(cacheCapacity, 
highPriorityPoolRatio);
+            createdCaches.get().add(cache);
+            return cache;
+        }
+
+        @Override
+        public WriteBufferManager createWriteBufferManager(
+                long writeBufferManagerCapacity, Cache cache) {
+            WriteBufferManager writeBufferManager =
+                    RocksDBMemoryFactory.DEFAULT.createWriteBufferManager(
+                            writeBufferManagerCapacity, cache);
+            createdWriteBufferManagers.get().add(writeBufferManager);
+            return writeBufferManager;
+        }
+    }
 }

Reply via email to