Hao, is this because of your new code on dsl? why do we keep eagle.Tuple2
if we already use scala.Tuple2?
Thanks
Edward
On 12/28/15, 18:26, "Zhang, Edward (GDI Hadoop)" wrote:
>I found TestStormRunner::UnionForAlert fail with type incompatibility, I
>think that is because Hao has refactored processing layer, but we should
>fix those unit test cases.
>
>
>object UnionForAlert extends App{
> val config : Config = ConfigFactory.load;
> val env = StormExecutionEnvironment(config)
> val tail1 =
>env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).ma
>p2(a => ("key1",a))
> val tail2 =
>env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map
>2(a => ("key2",a))
> tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume
>= false)
> env.execute()
>}
>
>2015-12-28 18:19:17,940 ERROR [Thread-8-MapperProducer_9]
>storm.AbstractStreamBolt[98]: Got exception when processing source:
>WordPrependForAlertExecutor(test)_2:6, stream: default, id: {}, [key1,
>{word=test_abc}]
>java.lang.ClassCastException: scala.Tuple2 cannot be cast to
>org.apache.eagle.datastream.Tuple2
>at
>org.apache.eagle.datastream.UnionForAlert$$anonfun$1.apply(TestStormRunner
>.scala:34)
>at
>org.apache.eagle.datastream.core.StreamAlertExpansion$$anonfun$1.apply(Str
>eamAlertExpansion.scala:159)
>at
>org.apache.eagle.datastream.storm.MapBoltWrapper.onValues(MapBoltWrapper.s
>cala:56)
>at
>org.apache.eagle.datastream.storm.AbstractStreamBolt.execute(AbstractStrea
>mBolt.scala:93)
>at
>backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(execu
>tor.clj:633)
>at
>backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.c
>lj:401)
>at
>backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj
>:58)
>at
>backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.ja
>va:125)
>at
>backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQue
>ue.java:99)
>at
>backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj
>:80)
>at
>backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.
>clj:748)
>at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>at clojure.lang.AFn.run(AFn.java:24)
>at java.lang.Thread.run(Thread.java:745)
>
>thanks
>Edward