I found TestStormRunner::UnionForAlert fail with type incompatibility, I think that is because Hao has refactored processing layer, but we should fix those unit test cases.
object UnionForAlert extends App{ val config : Config = ConfigFactory.load; val env = StormExecutionEnvironment(config) val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)) val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a)) tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = false) env.execute() } 2015-12-28 18:19:17,940 ERROR [Thread-8-MapperProducer_9] storm.AbstractStreamBolt[98]: Got exception when processing source: WordPrependForAlertExecutor(test)_2:6, stream: default, id: {}, [key1, {word=test_abc}] java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.eagle.datastream.Tuple2 at org.apache.eagle.datastream.UnionForAlert$$anonfun$1.apply(TestStormRunner.scala:34) at org.apache.eagle.datastream.core.StreamAlertExpansion$$anonfun$1.apply(StreamAlertExpansion.scala:159) at org.apache.eagle.datastream.storm.MapBoltWrapper.onValues(MapBoltWrapper.scala:56) at org.apache.eagle.datastream.storm.AbstractStreamBolt.execute(AbstractStreamBolt.scala:93) at backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) at backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) thanks Edward