This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 884c4cda1 StormExecutor: adding an unit test to changes introduced in 
STORM-3693  (#3834)
884c4cda1 is described below

commit 884c4cda1818d47730d99fa0e840028174cde422
Author: reiabreu <rui.ab...@gmail.com>
AuthorDate: Sun Jan 19 12:34:37 2025 +0000

    StormExecutor: adding an unit test to changes introduced in STORM-3693  
(#3834)
    
    * Porting changes from STORM-3693
    
    * Refactoring changes proposed on STORM-3693 and adding an unit test
    
    * Update Executor.java
    
    * Update SpoutExecutor.java
    
    * Update SpoutExecutor.java
    
    * Update SpoutExecutor.java
    
    * Adding missing license
    
    * Adding missing license
---
 .../jvm/org/apache/storm/executor/Executor.java    | 14 ++--
 .../apache/storm/executor/spout/SpoutExecutor.java | 33 ++++++--
 .../apache/storm/executor/SpoutExecutorTest.java   | 87 ++++++++++++++++++++++
 3 files changed, 121 insertions(+), 13 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java 
b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 2d287c777..edd77743f 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -2,9 +2,9 @@
  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
  * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- * <p>
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
  * and limitations under the License.
@@ -285,19 +285,17 @@ public abstract class Executor implements Callable, 
JCQueue.Consumer {
         int taskId = addressedTuple.getDest();
 
         TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
-        String streamId = tuple.getSourceStreamId();
-        boolean isSpout = this instanceof SpoutExecutor;
         if (isDebug) {
             LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, 
taskId);
         }
 
+        acceptTupleAction(taskId, tuple);
+    }
+
+    protected void acceptTupleAction(int taskId, TupleImpl tuple) {
         try {
             if (taskId != AddressedTuple.BROADCAST_DEST) {
                 tupleActionFn(taskId, tuple);
-            } else if (isSpout && 
streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
-                //taskId is irrelevant here. Ensures pending.rotate() is 
called once per tick.
-                tupleActionFn(taskIds.get(0), tuple);
-
             } else {
                 for (Integer t : taskIds) {
                     tupleActionFn(t, tuple);
diff --git 
a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java 
b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 25c84474a..734fca2a2 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -80,7 +80,7 @@ public class SpoutExecutor extends Executor {
         this.emittedCount = new MutableLong(0);
         this.emptyEmitStreak = new MutableLong(0);
         this.stats = new SpoutExecutorStats(
-            ConfigUtils.samplingRate(this.getTopoConf()), 
ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
+                ConfigUtils.samplingRate(this.getTopoConf()), 
ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
         this.skippedMaxSpoutMs = 
workerData.getMetricRegistry().rateCounter("__skipped-max-spout-ms", 
componentId,
                 taskIds.get(0));
         this.skippedInactiveMs = 
workerData.getMetricRegistry().rateCounter("__skipped-inactive-ms", componentId,
@@ -131,8 +131,8 @@ public class SpoutExecutor extends Executor {
             }
             ISpout spoutObject = (ISpout) taskData.getTaskObject();
             spoutOutputCollector = new SpoutOutputCollectorImpl(
-                spoutObject, this, taskData, emittedCount,
-                hasAckers, rand, hasEventLoggers, isDebug, pending);
+                    spoutObject, this, taskData, emittedCount,
+                    hasAckers, rand, hasEventLoggers, isDebug, pending);
             SpoutOutputCollector outputCollector = new 
SpoutOutputCollector(spoutOutputCollector);
             this.outputCollectors.add(outputCollector);
 
@@ -283,6 +283,29 @@ public class SpoutExecutor extends Executor {
         skippedInactiveMs.inc(Time.currentTimeMillis() - start);
     }
 
+    @Override
+    protected void acceptTupleAction(int taskId, TupleImpl tuple) {
+
+        String streamId = tuple.getSourceStreamId();
+
+        try {
+            if (taskId != AddressedTuple.BROADCAST_DEST) {
+                tupleActionFn(taskId, tuple);
+            } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+                //taskId is irrelevant here. Ensures pending.rotate() is 
called once per tick.
+                tupleActionFn(taskIds.get(0), tuple);
+
+            } else {
+                for (Integer t : taskIds) {
+                    tupleActionFn(t, tuple);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
     @Override
     public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
         String streamId = tuple.getSourceStreamId();
@@ -370,6 +393,6 @@ public class SpoutExecutor extends Executor {
 
     public long getThreadId() {
         return threadId;
-    }   
-    
+    }
+
 }
diff --git 
a/storm-client/test/jvm/org/apache/storm/executor/SpoutExecutorTest.java 
b/storm-client/test/jvm/org/apache/storm/executor/SpoutExecutorTest.java
new file mode 100644
index 000000000..a44597cde
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/executor/SpoutExecutorTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.executor;
+
+import org.apache.storm.Constants;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.metrics2.RateCounter;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.junit.platform.commons.util.ReflectionUtils;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+public class SpoutExecutorTest {
+
+
+    @Test
+    public void testPendingTuplesRotateShouldBeCalledOnlyOnce() throws 
Exception {
+
+        RateCounter rateCounter = Mockito.mock(RateCounter.class);
+
+        StormMetricRegistry stormMetricRegistry = 
Mockito.mock(StormMetricRegistry.class);
+        
Mockito.when(stormMetricRegistry.rateCounter(anyString(),anyString(),anyInt())).thenReturn(rateCounter);
+
+        Map<String,Object> hashmap = Utils.readDefaultConfig();
+
+        IStateStorage stateStorage = Mockito.mock(IStateStorage.class);
+
+        ComponentCommon componentCommon = Mockito.mock(ComponentCommon.class);
+        Mockito.when(componentCommon.get_json_conf()).thenReturn(null);
+
+        WorkerTopologyContext workerTopologyContext = 
Mockito.mock(WorkerTopologyContext.class);
+        
Mockito.when(workerTopologyContext.getComponentId(anyInt())).thenReturn("1");
+        
Mockito.when(workerTopologyContext.getComponentCommon(anyString())).thenReturn(componentCommon);
+
+        WorkerState workerState = Mockito.mock(WorkerState.class);
+        
Mockito.when(workerState.getWorkerTopologyContext()).thenReturn(workerTopologyContext);
+        Mockito.when(workerState.getStateStorage()).thenReturn(stateStorage);
+        Mockito.when(workerState.getTopologyConf()).thenReturn(hashmap);
+        
Mockito.when(workerState.getMetricRegistry()).thenReturn(stormMetricRegistry);
+
+        SpoutExecutor spoutExecutor = new 
SpoutExecutor(workerState,List.of(1L,5L),new HashMap<>());
+
+        TupleImpl tuple =   Mockito.mock(TupleImpl.class);
+        
Mockito.when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
+        AddressedTuple addressedTuple =   Mockito.mock(AddressedTuple.class);
+        
Mockito.when(addressedTuple.getDest()).thenReturn(AddressedTuple.BROADCAST_DEST);
+        Mockito.when(addressedTuple.getTuple()).thenReturn(tuple);
+
+        RotatingMap rotatingMap = Mockito.mock(RotatingMap.class);
+        Field fieldRotatingMap = ReflectionUtils
+                .findFields(SpoutExecutor.class, f -> 
f.getName().equals("pending"),
+                        ReflectionUtils.HierarchyTraversalMode.TOP_DOWN)
+                .get(0);
+        fieldRotatingMap.setAccessible(true);
+        fieldRotatingMap.set(spoutExecutor, rotatingMap);
+
+        spoutExecutor.accept(addressedTuple);
+
+        Mockito.verify(rotatingMap,Mockito.times(1)).rotate();
+    }
+}

Reply via email to