I found TestStormRunner::UnionForAlert fail with type incompatibility, I think 
that is because Hao has refactored processing layer, but we should fix those 
unit test cases.


object UnionForAlert extends App{
  val config : Config = ConfigFactory.load;
  val env = StormExecutionEnvironment(config)
  val tail1 = 
env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a 
=> ("key1",a))
  val tail2 = 
env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a 
=> ("key2",a))
  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = 
false)
  env.execute()
}

2015-12-28 18:19:17,940 ERROR [Thread-8-MapperProducer_9] 
storm.AbstractStreamBolt[98]: Got exception when processing source: 
WordPrependForAlertExecutor(test)_2:6, stream: default, id: {}, [key1, 
{word=test_abc}]
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.eagle.datastream.Tuple2
at 
org.apache.eagle.datastream.UnionForAlert$$anonfun$1.apply(TestStormRunner.scala:34)
at 
org.apache.eagle.datastream.core.StreamAlertExpansion$$anonfun$1.apply(StreamAlertExpansion.scala:159)
at 
org.apache.eagle.datastream.storm.MapBoltWrapper.onValues(MapBoltWrapper.scala:56)
at 
org.apache.eagle.datastream.storm.AbstractStreamBolt.execute(AbstractStreamBolt.scala:93)
at 
backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
at 
backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
at 
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at 
backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)

thanks
Edward

Reply via email to