This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 3421d527b [STORM-1316] port storm.trident.state-test to java (#3240)
3421d527b is described below

commit 3421d527b6ed59075b3ba0812ab7a8e1974a5873
Author: nd368 <[email protected]>
AuthorDate: Mon Oct 23 19:21:11 2023 +0100

    [STORM-1316] port storm.trident.state-test to java (#3240)
    
    * [STORM-1316] port storm.trident.state-test to java
    
    * Adress review comments + fix Mockito upgrade related things
    
    ---------
    
    Co-authored-by: Richard Zowalla <[email protected]>
---
 .../storm/trident/testing/MemoryBackingMap.java    |   6 +-
 .../clj/org/apache/storm/trident/state_test.clj    | 148 -----------------
 .../jvm/org/apache/storm/trident/StateTest.java    | 175 +++++++++++++++++++++
 3 files changed, 178 insertions(+), 151 deletions(-)

diff --git 
a/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryBackingMap.java 
b/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryBackingMap.java
index 5dc43f51d..f9ebce0d4 100644
--- 
a/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryBackingMap.java
+++ 
b/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryBackingMap.java
@@ -18,11 +18,11 @@ import java.util.List;
 import java.util.Map;
 import org.apache.storm.trident.state.map.IBackingMap;
 
-public class MemoryBackingMap implements IBackingMap<Object> {
+public class MemoryBackingMap<T> implements IBackingMap<T> {
     Map vals = new HashMap();
 
     @Override
-    public List<Object> multiGet(List<List<Object>> keys) {
+    public List<T> multiGet(List<List<Object>> keys) {
         List ret = new ArrayList();
         for (List key : keys) {
             ret.add(vals.get(key));
@@ -31,7 +31,7 @@ public class MemoryBackingMap implements IBackingMap<Object> {
     }
 
     @Override
-    public void multiPut(List<List<Object>> keys, List<Object> vals) {
+    public void multiPut(List<List<Object>> keys, List<T> vals) {
         for (int i = 0; i < keys.size(); i++) {
             List key = keys.get(i);
             Object val = vals.get(i);
diff --git a/storm-core/test/clj/org/apache/storm/trident/state_test.clj 
b/storm-core/test/clj/org/apache/storm/trident/state_test.clj
deleted file mode 100644
index d42034e13..000000000
--- a/storm-core/test/clj/org/apache/storm/trident/state_test.clj
+++ /dev/null
@@ -1,148 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.trident.state-test
-  (:use [clojure test])
-  (:import [org.apache.storm.trident.operation.builtin Count])
-  (:import [org.apache.storm.trident.state OpaqueValue])
-  (:import [org.apache.storm.trident.state CombinerValueUpdater])
-  (:import [org.apache.storm.trident.topology.state TransactionalState 
TestTransactionalState])
-  (:import [org.apache.storm.trident.state.map TransactionalMap OpaqueMap])
-  (:import [org.apache.storm.trident.testing MemoryBackingMap MemoryMapState])
-  (:import [org.apache.storm.utils ZookeeperAuthInfo Utils])
-  (:import [org.apache.storm.shade.org.apache.curator.framework 
CuratorFramework])
-  (:import [org.apache.storm.shade.org.apache.curator.framework.api 
CreateBuilder ProtectACLCreateModeStatPathAndBytesable])
-  (:import [org.apache.zookeeper CreateMode ZooDefs ZooDefs$Ids])
-  (:import [org.mockito ArgumentMatchers Mockito])
-  (:import [org.mockito.exceptions.base MockitoAssertionError])
-  (:use [org.apache.storm config]))
-
-(defn single-remove [map key]
-  (-> map (.multiRemove [[key]])))
-
-(defn single-put [map key val]
-  (-> map (.multiPut [[key]] [val])))
-
-(defn single-get [map key]
-  (-> map (.multiGet [[key]]) first))
-
-(defn single-update [map key amt]
-  (-> map (.multiUpdate [[key]] [(CombinerValueUpdater. (Count.) amt)]) first))
-
-(deftest test-opaque-value
-  (let [opqval (OpaqueValue. 8 "v1" "v0")
-        upval0 (.update opqval 8 "v2")
-        upval1 (.update opqval 9 "v2")
-        ]
-    (is (= "v1" (.get opqval nil)))
-    (is (= "v1" (.get opqval 100)))
-    (is (= "v1" (.get opqval 9)))
-    (is (= "v0" (.get opqval 8)))
-    (let [has-exception (try
-                          (.get opqval 7) false
-                          (catch Exception e true))]
-      (is (= true has-exception)))
-    (is (= "v0" (.getPrev opqval)))
-    (is (= "v1" (.getCurr opqval)))
-    ;; update with current
-    (is (= "v0" (.getPrev upval0)))
-    (is (= "v2" (.getCurr upval0)))
-    (not (identical? opqval upval0))
-    ;; update
-    (is (= "v1" (.getPrev upval1)))
-    (is (= "v2" (.getCurr upval1)))
-    (not (identical? opqval upval1))
-    ))
-
-(deftest test-opaque-map
-  (let [map (OpaqueMap/build (MemoryBackingMap.))]
-    (.beginCommit map 1)
-    (is (= nil (single-get map "a")))
-    ;; tests that intra-batch caching works
-    (is (= 1 (single-update map "a" 1)))
-    (is (= 1 (single-get map "a")))
-    (is (= 3 (single-update map "a" 2)))
-    (is (= 3 (single-get map "a")))
-    (.commit map 1)
-    (.beginCommit map 1)
-    (is (= nil (single-get map "a")))
-    (is (= 2 (single-update map "a" 2)))
-    (.commit map 1)
-    (.beginCommit map 2)
-    (is (= 2 (single-get map "a")))
-    (is (= 5 (single-update map "a" 3)))
-    (is (= 6 (single-update map "a" 1)))
-    (.commit map 2)
-    ))
-
-(deftest test-transactional-map
-  (let [map (TransactionalMap/build (MemoryBackingMap.))]
-    (.beginCommit map 1)
-    (is (= nil (single-get map "a")))
-    ;; tests that intra-batch caching works
-    (is (= 1 (single-update map "a" 1)))
-    (is (= 3 (single-update map "a" 2)))
-    (.commit map 1)
-    (.beginCommit map 1)
-    (is (= 3 (single-get map "a")))
-    ;; tests that intra-batch caching has no effect if it's the same commit as 
previous commit
-    (is (= 3 (single-update map "a" 1)))
-    (is (= 3 (single-update map "a" 2)))
-    (.commit map 1)
-    (.beginCommit map 2)
-    (is (= 3 (single-get map "a")))
-    (is (= 6 (single-update map "a" 3)))
-    (is (= 7 (single-update map "a" 1)))
-    (.commit map 2)
-    ))
-
-(deftest test-create-node-acl
-  (testing "Creates ZooKeeper nodes with the correct ACLs"
-    (let [curator (Mockito/mock CuratorFramework)
-          builder0 (Mockito/mock CreateBuilder)
-          builder1 (Mockito/mock ProtectACLCreateModeStatPathAndBytesable)
-          expectedAcls ZooDefs$Ids/CREATOR_ALL_ACL]
-      (. (Mockito/when (.create curator)) (thenReturn builder0))
-      (. (Mockito/when (.creatingParentsIfNeeded builder0)) (thenReturn 
builder1))
-      (. (Mockito/when (.withMode builder1 (ArgumentMatchers/isA CreateMode))) 
(thenReturn builder1))
-      (. (Mockito/when (.withACL builder1 (Mockito/anyList))) (thenReturn 
builder1))
-      (TestTransactionalState/createNode curator "" (byte-array 0) 
expectedAcls nil)
-      (is (nil?
-        (try
-          (. (Mockito/verify builder1) (withACL expectedAcls))
-        (catch MockitoAssertionError e
-          e)))))))
-
-(deftest test-memory-map-state-remove
-  (let [map (MemoryMapState. (Utils/uuid))]
-    (.beginCommit map 1)
-    (single-put map "a" 1)
-    (single-put map "b" 2)
-    (.commit map 1)
-    (.beginCommit map 2)
-    (single-remove map "a")
-    (is (nil? (single-get map "a")))
-    (is (= 2 (single-get map "b")))
-    (.commit map 2)
-    (.beginCommit map 2)
-    (is (= 1 (single-get map "a")))
-    (is (= 2 (single-get map "b")))
-    (single-remove map "a")
-    (.commit map 2)
-    (.beginCommit map 3)
-    (is (nil? (single-get map "a")))
-    (is (= 2 (single-get map "b")))    
-    (.commit map 3)
-    ))
diff --git a/storm-core/test/jvm/org/apache/storm/trident/StateTest.java 
b/storm-core/test/jvm/org/apache/storm/trident/StateTest.java
new file mode 100644
index 000000000..68f615d2a
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/trident/StateTest.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.trident;
+
+import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
+import org.apache.storm.shade.org.apache.curator.framework.api.CreateBuilder;
+import 
org.apache.storm.shade.org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable;
+import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
+import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.apache.storm.shade.org.apache.zookeeper.ZooDefs;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.state.CombinerValueUpdater;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.ValueUpdater;
+import org.apache.storm.trident.state.map.MapState;
+import org.apache.storm.trident.state.map.OpaqueMap;
+import org.apache.storm.trident.state.map.TransactionalMap;
+import org.apache.storm.trident.testing.MemoryBackingMap;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.topology.state.TestTransactionalState;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class StateTest {
+
+    private void singleRemove(MemoryMapState<Object> map, Object key){
+        List<List<Object>> keys = 
Collections.singletonList(Collections.singletonList(key));
+        map.multiRemove(keys);
+    }
+
+    private void singlePut(MemoryMapState<Object> map, Object key, Object val){
+        List<List<Object>> keys = 
Collections.singletonList(Collections.singletonList(key));
+        List<Object> vals = Collections.singletonList(val);
+        map.multiPut(keys, vals);
+    }
+
+    private Object singleGet(MapState<Object> map, Object key){
+        List<List<Object>> keys = 
Collections.singletonList(Collections.singletonList(key));
+        return map.multiGet(keys).get(0);
+    }
+
+    private Object singleUpdate(MapState<Object> map, Object key, Long amt){
+        List<List<Object>> keys = 
Collections.singletonList(Collections.singletonList(key));
+        CombinerValueUpdater valueUpdater = new CombinerValueUpdater(new 
Count(), amt);
+        List<ValueUpdater> updaters = Collections.singletonList(valueUpdater);
+        return map.multiUpdate(keys, updaters).get(0);
+    }
+
+    @Test
+    public void testOpaqueValue() {
+        OpaqueValue<String> opqval = new OpaqueValue<>(8L, "v1", "v0");
+        OpaqueValue<String> upval0 = opqval.update(8L, "v2");
+        OpaqueValue<String> upval1 = opqval.update(9L, "v2");
+        assertEquals(opqval.get(null), "v1");
+        assertEquals(opqval.get(100L), "v1");
+        assertEquals(opqval.get(9L), "v1");
+        assertEquals(opqval.get(8L), "v0");
+        Assertions.assertThrows(Exception.class, () -> opqval.get(7L));
+        assertEquals(opqval.getPrev(), "v0");
+        assertEquals(opqval.getCurr(), "v1");
+        // update with current
+        assertEquals(upval0.getPrev(), "v0");
+        assertEquals(upval0.getCurr(), "v2");
+        assertNotEquals(opqval, upval0);
+        // update
+        assertEquals(upval1.getPrev(), "v1");
+        assertEquals(upval1.getCurr(), "v2");
+        assertNotEquals(opqval, upval1);
+    }
+
+    @Test
+    public void testOpaqueMap() {
+        MapState<Object> map = OpaqueMap.build(new MemoryBackingMap<>());
+        map.beginCommit(1L);
+        assertEquals(singleGet(map, "a"), null);
+        // tests that intra-batch caching works
+        assertEquals(singleUpdate(map, "a", 1L), 1L);
+        assertEquals(singleGet(map, "a"), 1L);
+        assertEquals(singleUpdate(map, "a", 2L), 3L);
+        assertEquals(singleGet(map, "a"), 3L);
+        map.commit(1L);
+        map.beginCommit(1L);
+        assertEquals(singleGet(map, "a"), null);
+        assertEquals(singleUpdate(map, "a", 2L), 2L);
+        map.commit(1L);
+        map.beginCommit(2L);
+        assertEquals(singleGet(map, "a"), 2L);
+        assertEquals(singleUpdate(map, "a", 3L), 5L);
+        assertEquals(singleUpdate(map, "a", 1L), 6L);
+        map.commit(2L);
+    }
+
+    @Test
+    public void testTransactionalMap() {
+        MapState<Object> map = TransactionalMap.build(new 
MemoryBackingMap<>());
+        map.beginCommit(1L);
+        assertEquals(singleGet(map, "a"), null);
+        // tests that intra-batch caching works
+        assertEquals(singleUpdate(map, "a", 1L), 1L);
+        assertEquals(singleUpdate(map, "a", 2L), 3L);
+        map.commit(1L);
+        map.beginCommit(1L);
+        assertEquals(singleGet(map, "a"), 3L);
+        // tests that intra-batch caching has no effect if it's the same 
commit as previous commit
+        assertEquals(singleUpdate(map, "a", 1L), 3L);
+        assertEquals(singleUpdate(map, "a", 2L), 3L);
+        map.commit(1L);
+        map.beginCommit(2L);
+        assertEquals(singleGet(map, "a"), 3L);
+        assertEquals(singleUpdate(map, "a", 3L), 6L);
+        assertEquals(singleUpdate(map, "a", 1L), 7L);
+        map.commit(2L);
+    }
+
+    @Test
+    public void testCreateNodeAcl() throws Exception {
+        // Creates ZooKeeper nodes with the correct ACLs
+        CuratorFramework curator = Mockito.mock(CuratorFramework.class);
+        CreateBuilder builder0 = Mockito.mock(CreateBuilder.class);
+        ProtectACLCreateModeStatPathAndBytesable builder1 = 
Mockito.mock(ProtectACLCreateModeStatPathAndBytesable.class);
+        List<ACL> expectedAcls = ZooDefs.Ids.CREATOR_ALL_ACL;
+        Mockito.when(curator.create()).thenReturn(builder0);
+        Mockito.when(builder0.creatingParentsIfNeeded()).thenReturn(builder1);
+        
Mockito.when(builder1.withMode(ArgumentMatchers.isA(CreateMode.class))).thenReturn(builder1);
+        Mockito.when(builder1.withACL(Mockito.anyList())).thenReturn(builder1);
+        TestTransactionalState.createNode(curator, "", new byte[0], 
expectedAcls, null);
+        Mockito.verify(builder1).withACL(expectedAcls);
+    }
+
+    @Test
+    public void testMemoryMapStateRemove() {
+        MemoryMapState<Object> map = new MemoryMapState<>(Utils.uuid());
+        map.beginCommit(1L);
+        singlePut(map, "a", 1);
+        singlePut(map, "b", 2);
+        map.commit(1L);
+        map.beginCommit(2L);
+        singleRemove(map, "a");
+        assertEquals(singleGet(map, "a"), null);
+        assertEquals(singleGet(map, "b"), 2);
+        map.commit(2L);
+        map.beginCommit(2L);
+        assertEquals(singleGet(map, "a"), 1);
+        assertEquals(singleGet(map, "b"), 2);
+        singleRemove(map, "a");
+        map.commit(2L);
+        map.beginCommit(3L);
+        assertEquals(singleGet(map, "a"), null);
+        assertEquals(singleGet(map, "b"), 2);
+        map.commit(3L);
+    }
+}

Reply via email to