Hello All,

I am trying to perform JUNIT testing for topologies but getting the below error.

java.lang.AssertionError: Test timed out (10000ms) (not (every? exhausted? 
(spout-objects spouts)))
at org.apache.storm.testing$complete_topology.doInvoke(testing.clj:529)
at clojure.lang.RestFn.invoke(RestFn.java:1124)
at org.apache.storm.testing4j$_completeTopology.invoke(testing4j.clj:63)
at org.apache.storm.Testing.completeTopology(Unknown Source)
at 
XXXXXXXX.LocationSummaryTopologyUnitTest.lambda$0(LocationSummaryTopologyUnitTest.java:124)
at 
org.apache.storm.testing4j$_withTrackedCluster$fn__10274.invoke(testing4j.clj:120)
at org.apache.storm.testing4j$_withTrackedCluster.invoke(testing4j.clj:120)
at org.apache.storm.Testing.withTrackedCluster(Unknown Source)
at 
XXXXXXXX.LocationSummaryTopologyUnitTest.testLocationSummaryTopology(LocationSummaryTopologyUnitTest.java:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

Here's is my code:

@Test
public void testLocationSummaryTopology()
{
TagMessage tagMessage;
List<TagSummary> actualTagSummaryList;
List<TagSummary> expectedTagSummaryList;
List<ZoneStatus> expectedZoneStatusList;
List<DeptRecountStatus> expectedDeptRecountStatusList;

try
{
if((tagMessageList != null) && !tagMessageList.isEmpty())
{
MkClusterParam clusterParam = new MkClusterParam();
clusterParam.setSupervisors(1);
stormConfig.put(Config.STORM_LOCAL_MODE_ZMQ, false);
clusterParam.setDaemonConf(stormConfig);

Testing.withTrackedCluster(clusterParam, cluster ->
{
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("TagSummaryFakeSpout", new FeederSpout());
topologyBuilder.setBolt("TagSummaryBolt", new 
TagSummaryBolt()).shuffleGrouping("TagSummaryFakeSpout");
StormTopology topology = topologyBuilder.createTopology();

MockedSources mockedSources = new MockedSources();

for(TagMessage tagMessageUncompressed : tagMessageList)
{
mockedSources.addMockData("TagSummaryFakeSpout", new 
Values(CompressionUtils.compressAndBase64Encode(tagMessageUncompressed)));
}

Config conf = new Config();
conf.setNumWorkers(2);

CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
completeTopologyParam.setMockedSources(mockedSources);
completeTopologyParam.setStormConf(conf);

Map result = Testing.completeTopology(cluster, topology, completeTopologyParam);

//assertEquals(tagMessageList.size(), Testing.readTuples(result, 
"TagSummaryFakeSpout").size());
});

tagMessage = tagMessageList.get(Constants.ZERO);
actualTagSummaryList = tagMessage.getTagSummaries();
expectedTagSummaryList = repo.getTagSummary(auditTask.getId(), 
actualTagSummaryList.stream().map(TagSummary::getTagId).collect(Collectors.toList()));
expectedZoneStatusList = repo.getZoneStatus(auditTask.getId(), 
actualTagSummaryList.stream().map(TagSummary::getZoneId).collect(Collectors.toList()));
expectedDeptRecountStatusList = repo.getDeptRecountStatus(auditTask.getId());

assertEquals(expectedTagSummaryList.size(), actualTagSummaryList.size());
assertEquals(expectedTagSummaryList.get(Constants.ZERO).getEventId().trim(), 
actualTagSummaryList.get(Constants.ZERO).getEventId().trim());
assertTrue(expectedZoneStatusList.size() > Constants.ZERO);
assertTrue(expectedDeptRecountStatusList.size() > Constants.ZERO);
}
}

I am acking the message in the Bolts but still getting Timeout error.

Could someone let me know what the error means and the resolution for this?

Thanks,
Harish.

Reply via email to