This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f56021cc2cf1b78bfbdb2ba24b4b639dedca4247
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Jan 13 17:24:57 2021 +0100

    [FLINK-20978] Implement test for migrating from Rocks savepoint
    
    This closes #14648
---
 .../flink/test/state/BackendSwitchSpecs.java       | 153 +++++++++++
 .../state/HeapSavepointStateBackendSwitchTest.java |  40 +++
 .../RocksSavepointStateBackendSwitchTest.java      |  40 +++
 .../state/SavepointStateBackendSwitchTestBase.java | 295 +++++++++++++++++++++
 4 files changed, 528 insertions(+)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java 
b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
new file mode 100644
index 0000000..2cccafd
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder;
+import org.apache.flink.contrib.streaming.state.RocksDBResourceContainer;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.util.Collection;
+
+/**
+ * Specifications for creating state backends to be used in {@link
+ * SavepointStateBackendSwitchTestBase}.
+ */
+public final class BackendSwitchSpecs {
+
+    /**
+     * A factory interface for creating a state backend that should be able to 
restore its state
+     * from the given state handles.
+     */
+    public interface BackendSwitchSpec extends AutoCloseable {
+
+        CheckpointableKeyedStateBackend<String> createBackend(
+                KeyGroupRange keyGroupRange,
+                int numKeyGroups,
+                Collection<KeyedStateHandle> stateHandles)
+                throws Exception;
+    }
+
+    /** Specification for a {@link RocksDBKeyedStateBackend}. */
+    static final BackendSwitchSpec ROCKS = new RocksSpec();
+
+    /** Specification for a {@link HeapKeyedStateBackend}. */
+    static final BackendSwitchSpec HEAP = new HeapSpec();
+
+    private static final class RocksSpec implements BackendSwitchSpec {
+
+        private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+        @Override
+        public CheckpointableKeyedStateBackend<String> createBackend(
+                KeyGroupRange keyGroupRange,
+                int numKeyGroups,
+                Collection<KeyedStateHandle> stateHandles)
+                throws Exception {
+            final RocksDBResourceContainer optionsContainer = new 
RocksDBResourceContainer();
+
+            temporaryFolder.create();
+            return new RocksDBKeyedStateBackendBuilder<>(
+                            "no-op",
+                            ClassLoader.getSystemClassLoader(),
+                            temporaryFolder.newFolder(),
+                            optionsContainer,
+                            stateName -> optionsContainer.getColumnOptions(),
+                            new KvStateRegistry()
+                                    .createTaskRegistry(new JobID(), new 
JobVertexID()),
+                            StringSerializer.INSTANCE,
+                            numKeyGroups,
+                            keyGroupRange,
+                            new ExecutionConfig(),
+                            TestLocalRecoveryConfig.disabled(),
+                            RocksDBStateBackend.PriorityQueueStateType.ROCKSDB,
+                            TtlTimeProvider.DEFAULT,
+                            new UnregisteredMetricsGroup(),
+                            stateHandles,
+                            UncompressedStreamCompressionDecorator.INSTANCE,
+                            new CloseableRegistry())
+                    .build();
+        }
+
+        @Override
+        public void close() throws Exception {
+            temporaryFolder.delete();
+        }
+
+        @Override
+        public String toString() {
+            return "ROCKS";
+        }
+    }
+
+    private static final class HeapSpec implements BackendSwitchSpec {
+        @Override
+        public CheckpointableKeyedStateBackend<String> createBackend(
+                KeyGroupRange keyGroupRange,
+                int numKeyGroups,
+                Collection<KeyedStateHandle> stateHandles)
+                throws Exception {
+            ExecutionConfig executionConfig = new ExecutionConfig();
+            return new HeapKeyedStateBackendBuilder<>(
+                            Mockito.mock(TaskKvStateRegistry.class),
+                            StringSerializer.INSTANCE,
+                            this.getClass().getClassLoader(),
+                            numKeyGroups,
+                            keyGroupRange,
+                            executionConfig,
+                            TtlTimeProvider.DEFAULT,
+                            stateHandles,
+                            
AbstractStateBackend.getCompressionDecorator(executionConfig),
+                            TestLocalRecoveryConfig.disabled(),
+                            new HeapPriorityQueueSetFactory(keyGroupRange, 
numKeyGroups, 128),
+                            true,
+                            new CloseableRegistry())
+                    .build();
+        }
+
+        @Override
+        public void close() throws Exception {}
+
+        @Override
+        public String toString() {
+            return "HEAP";
+        }
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
new file mode 100644
index 0000000..d5708a6
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.test.state.BackendSwitchSpecs.BackendSwitchSpec;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/** Tests for switching a HEAP state backend to a different one. */
+@RunWith(Parameterized.class)
+public class HeapSavepointStateBackendSwitchTest extends 
SavepointStateBackendSwitchTestBase {
+    public HeapSavepointStateBackendSwitchTest(BackendSwitchSpec toBackend) {
+        super(BackendSwitchSpecs.HEAP, toBackend);
+    }
+
+    @Parameterized.Parameters
+    public static Collection<BackendSwitchSpec> targetBackends() {
+        return Collections.singletonList(BackendSwitchSpecs.HEAP);
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/RocksSavepointStateBackendSwitchTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/RocksSavepointStateBackendSwitchTest.java
new file mode 100644
index 0000000..d7f7e2d
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/RocksSavepointStateBackendSwitchTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.test.state.BackendSwitchSpecs.BackendSwitchSpec;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Tests for switching a RocksDB state backend to a different one. */
+@RunWith(Parameterized.class)
+public class RocksSavepointStateBackendSwitchTest extends 
SavepointStateBackendSwitchTestBase {
+    public RocksSavepointStateBackendSwitchTest(BackendSwitchSpec toBackend) {
+        super(BackendSwitchSpecs.ROCKS, toBackend);
+    }
+
+    @Parameterized.Parameters(name = "to: {0}")
+    public static Collection<BackendSwitchSpec> targetBackends() {
+        return Arrays.asList(BackendSwitchSpecs.HEAP, 
BackendSwitchSpecs.ROCKS);
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/SavepointStateBackendSwitchTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/SavepointStateBackendSwitchTestBase.java
new file mode 100644
index 0000000..933b8df
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/SavepointStateBackendSwitchTestBase.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
+import org.apache.flink.streaming.api.operators.TimerSerializer;
+import org.apache.flink.test.state.BackendSwitchSpecs.BackendSwitchSpec;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the unified savepoint format. They verify you can switch a state 
backend through a
+ * savepoint.
+ */
+public abstract class SavepointStateBackendSwitchTestBase {
+
+    private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 
1);
+    private static final int NUM_KEY_GROUPS = 
KEY_GROUP_RANGE.getNumberOfKeyGroups();
+
+    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
+
+    private final BackendSwitchSpec fromBackend;
+
+    private final BackendSwitchSpec toBackend;
+
+    protected SavepointStateBackendSwitchTestBase(
+            BackendSwitchSpec fromBackend, BackendSwitchSpec toBackend) {
+        this.fromBackend = fromBackend;
+        this.toBackend = toBackend;
+    }
+
+    @Test
+    public void switchStateBackend() throws Exception {
+
+        final File pathToWrite = tempFolder.newFile();
+
+        final MapStateDescriptor<Long, Long> mapStateDescriptor =
+                new MapStateDescriptor<>("my-map-state", Long.class, 
Long.class);
+        mapStateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+        final ValueStateDescriptor<Long> valueStateDescriptor =
+                new ValueStateDescriptor<>("my-value-state", Long.class);
+        valueStateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+        final ListStateDescriptor<Long> listStateDescriptor =
+                new ListStateDescriptor<>("my-list-state", Long.class);
+        listStateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+        final Integer namespace1 = 1;
+        final Integer namespace2 = 2;
+        final Integer namespace3 = 3;
+
+        try (final CheckpointableKeyedStateBackend<String> keyedBackend =
+                fromBackend.createBackend(
+                        KEY_GROUP_RANGE, NUM_KEY_GROUPS, 
Collections.emptyList())) {
+            takeSavepoint(
+                    keyedBackend,
+                    pathToWrite,
+                    mapStateDescriptor,
+                    valueStateDescriptor,
+                    listStateDescriptor,
+                    namespace1,
+                    namespace2,
+                    namespace3);
+        }
+
+        final SnapshotResult<KeyedStateHandle> stateHandles;
+        try (BufferedInputStream bis =
+                new BufferedInputStream((new FileInputStream(pathToWrite)))) {
+            stateHandles =
+                    InstantiationUtil.deserializeObject(
+                            bis, 
Thread.currentThread().getContextClassLoader());
+        }
+        final KeyedStateHandle stateHandle = 
stateHandles.getJobManagerOwnedSnapshot();
+        try (final CheckpointableKeyedStateBackend<String> keyedBackend =
+                toBackend.createBackend(
+                        KEY_GROUP_RANGE,
+                        NUM_KEY_GROUPS,
+                        StateObjectCollection.singleton(stateHandle))) {
+            verifyRestoredState(
+                    mapStateDescriptor,
+                    valueStateDescriptor,
+                    listStateDescriptor,
+                    namespace1,
+                    namespace2,
+                    namespace3,
+                    keyedBackend);
+        }
+    }
+
+    private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> 
mapState)
+            throws Exception {
+        int i = 0;
+        Iterator<Map.Entry<UK, UV>> itt = mapState.iterator();
+        while (itt.hasNext()) {
+            i++;
+            itt.next();
+        }
+        return i;
+    }
+
+    private void takeSavepoint(
+            CheckpointableKeyedStateBackend<String> keyedBackend,
+            File pathToWrite,
+            MapStateDescriptor<Long, Long> stateDescr,
+            ValueStateDescriptor<Long> valueStateDescriptor,
+            ListStateDescriptor<Long> listStateDescriptor,
+            Integer namespace1,
+            Integer namespace2,
+            Integer namespace3)
+            throws Exception {
+
+        InternalMapState<String, Integer, Long, Long> mapState =
+                keyedBackend.createInternalState(IntSerializer.INSTANCE, 
stateDescr);
+
+        InternalValueState<String, Integer, Long> valueState =
+                keyedBackend.createInternalState(IntSerializer.INSTANCE, 
valueStateDescriptor);
+
+        InternalListState<String, Integer, Long> listState =
+                keyedBackend.createInternalState(IntSerializer.INSTANCE, 
listStateDescriptor);
+
+        keyedBackend.setCurrentKey("abc");
+        mapState.setCurrentNamespace(namespace1);
+        mapState.put(33L, 33L);
+        mapState.put(55L, 55L);
+
+        mapState.setCurrentNamespace(namespace2);
+        mapState.put(22L, 22L);
+        mapState.put(11L, 11L);
+        listState.setCurrentNamespace(namespace2);
+        listState.add(4L);
+        listState.add(5L);
+        listState.add(6L);
+
+        mapState.setCurrentNamespace(namespace3);
+        mapState.put(44L, 44L);
+
+        keyedBackend.setCurrentKey("mno");
+        mapState.setCurrentNamespace(namespace3);
+        mapState.put(11L, 11L);
+        mapState.put(22L, 22L);
+        mapState.put(33L, 33L);
+        mapState.put(44L, 44L);
+        mapState.put(55L, 55L);
+        valueState.setCurrentNamespace(namespace3);
+        valueState.update(1239L);
+        listState.setCurrentNamespace(namespace3);
+        listState.add(1L);
+        listState.add(2L);
+        listState.add(3L);
+
+        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<String, 
Integer>> priorityQueue =
+                keyedBackend.create(
+                        "event-time",
+                        new TimerSerializer<>(
+                                keyedBackend.getKeySerializer(), 
IntSerializer.INSTANCE));
+        priorityQueue.add(new TimerHeapInternalTimer<>(1234L, "mno", 
namespace3));
+        priorityQueue.add(new TimerHeapInternalTimer<>(2345L, "mno", 
namespace2));
+        priorityQueue.add(new TimerHeapInternalTimer<>(3456L, "mno", 
namespace3));
+
+        RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
+                keyedBackend.snapshot(
+                        0L,
+                        0L,
+                        new MemCheckpointStreamFactory(4 * 1024 * 1024),
+                        new CheckpointOptions(
+                                CheckpointType.SAVEPOINT,
+                                
CheckpointStorageLocationReference.getDefault()));
+
+        snapshot.run();
+
+        try (BufferedOutputStream bis =
+                new BufferedOutputStream(new FileOutputStream(pathToWrite))) {
+            InstantiationUtil.serializeObject(bis, snapshot.get());
+        }
+    }
+
+    private void verifyRestoredState(
+            MapStateDescriptor<Long, Long> mapStateDescriptor,
+            ValueStateDescriptor<Long> valueStateDescriptor,
+            ListStateDescriptor<Long> listStateDescriptor,
+            Integer namespace1,
+            Integer namespace2,
+            Integer namespace3,
+            CheckpointableKeyedStateBackend<String> keyedBackend)
+            throws Exception {
+        InternalMapState<String, Integer, Long, Long> state =
+                keyedBackend.createInternalState(IntSerializer.INSTANCE, 
mapStateDescriptor);
+
+        InternalValueState<String, Integer, Long> valueState =
+                keyedBackend.createInternalState(IntSerializer.INSTANCE, 
valueStateDescriptor);
+
+        InternalListState<String, Integer, Long> listState =
+                keyedBackend.createInternalState(IntSerializer.INSTANCE, 
listStateDescriptor);
+
+        keyedBackend.setCurrentKey("abc");
+        state.setCurrentNamespace(namespace1);
+        assertEquals(33L, (long) state.get(33L));
+        assertEquals(55L, (long) state.get(55L));
+        assertEquals(2, getStateSize(state));
+
+        state.setCurrentNamespace(namespace2);
+        assertEquals(22L, (long) state.get(22L));
+        assertEquals(11L, (long) state.get(11L));
+        assertEquals(2, getStateSize(state));
+        listState.setCurrentNamespace(namespace2);
+        assertThat(listState.get(), contains(4L, 5L, 6L));
+
+        state.setCurrentNamespace(namespace3);
+        assertEquals(44L, (long) state.get(44L));
+        assertEquals(1, getStateSize(state));
+
+        keyedBackend.setCurrentKey("mno");
+        state.setCurrentNamespace(namespace3);
+        assertEquals(11L, (long) state.get(11L));
+        assertEquals(22L, (long) state.get(22L));
+        assertEquals(33L, (long) state.get(33L));
+        assertEquals(44L, (long) state.get(44L));
+        assertEquals(55L, (long) state.get(55L));
+        assertEquals(5, getStateSize(state));
+        valueState.setCurrentNamespace(namespace3);
+        assertEquals(1239L, (long) valueState.value());
+        listState.setCurrentNamespace(namespace3);
+        assertThat(listState.get(), contains(1L, 2L, 3L));
+
+        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<String, 
Integer>> priorityQueue =
+                keyedBackend.create(
+                        "event-time",
+                        new TimerSerializer<>(
+                                keyedBackend.getKeySerializer(), 
IntSerializer.INSTANCE));
+
+        assertThat(priorityQueue.size(), equalTo(3));
+        assertThat(
+                priorityQueue.poll(),
+                equalTo(new TimerHeapInternalTimer<>(1234L, "mno", 
namespace3)));
+        assertThat(
+                priorityQueue.poll(),
+                equalTo(new TimerHeapInternalTimer<>(2345L, "mno", 
namespace2)));
+        assertThat(
+                priorityQueue.poll(),
+                equalTo(new TimerHeapInternalTimer<>(3456L, "mno", 
namespace3)));
+    }
+}

Reply via email to