Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174385129
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
 ---
    @@ -40,6 +54,81 @@
      */
     public class KvStateRegistryTest extends TestLogger {
     
    +   @Test
    +   public void testKvStateEntry() throws InterruptedException {
    +           final int threads = 10;
    +
    +           final CountDownLatch latch1 = new CountDownLatch(threads);
    +           final CountDownLatch latch2 = new CountDownLatch(1);
    +
    +           final List<KvStateInfo<?, ?, ?>> infos = 
Collections.synchronizedList(new ArrayList<>());
    +
    +           final JobID jobID = new JobID();
    +
    +           final JobVertexID jobVertexId = new JobVertexID();
    +           final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
    +           final String registrationName = "foobar";
    +
    +           final KvStateRegistry kvStateRegistry = new KvStateRegistry();
    +           final KvStateID stateID = kvStateRegistry.registerKvState(
    +                           jobID,
    +                           jobVertexId,
    +                           keyGroupRange,
    +                           registrationName,
    +                           new DummyKvState()
    +           );
    +
    +           for (int i = 0; i < threads; i++) {
    +                   new Thread(() -> {
    +                           final KvStateEntry<?, ?, ?> kvState = 
kvStateRegistry.getKvState(stateID);
    +                           final KvStateInfo<?, ?, ?> stateInfo = 
kvState.getInfoForCurrentThread();
    +                           infos.add(stateInfo);
    +
    +                           latch1.countDown();
    +                           try {
    +                                   latch2.await();
    +                           } catch (InterruptedException e) {
    +                                   Assert.fail(e.getMessage());
    +                           }
    +
    +                   }).start();
    +           }
    +
    +           latch1.await();
    +
    +           final KvStateEntry<?, ?, ?> kvState = 
kvStateRegistry.getKvState(stateID);
    +
    +           // verify that all the threads are done correctly.
    +           Assert.assertEquals(threads, infos.size());
    +           Assert.assertEquals(threads, kvState.getCacheSize());
    +
    +           latch2.countDown();
    +
    +           for (KvStateInfo<?, ?, ?> infoA: infos) {
    +                   boolean found = false;
    +                   for (KvStateInfo<?, ?, ?> infoB: infos) {
    +                           if (infoA == infoB) {
    +                                   if (found) {
    +                                           Assert.fail("Already found");
    +                                   }
    +                                   found = true;
    +                           } else {
    +                                   Assert.assertTrue(infoA != infoB && 
infoA.equals(infoB));
    +                           }
    +                   }
    +           }
    +
    +           kvStateRegistry.unregisterKvState(
    +                           jobID,
    +                           jobVertexId,
    +                           keyGroupRange,
    +                           registrationName,
    +                           stateID);
    +
    +           // we have to call for garbage collection to be sure that 
everything is cleared up.
    --- End diff --
    
    ?


---

Reply via email to