Re: [BUG]UnionForAlert test case fail

2016-01-14 Thread Zhang, Edward (GDI Hadoop)
Hao, is this because of your new code on dsl? why do we keep eagle.Tuple2
if we already use scala.Tuple2?

Thanks
Edward

On 12/28/15, 18:26, "Zhang, Edward (GDI Hadoop)"  wrote:

>I found TestStormRunner::UnionForAlert fail with type incompatibility, I
>think that is because Hao has refactored processing layer, but we should
>fix those unit test cases.
>
>
>object UnionForAlert extends App{
>  val config : Config = ConfigFactory.load;
>  val env = StormExecutionEnvironment(config)
>  val tail1 = 
>env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).ma
>p2(a => ("key1",a))
>  val tail2 = 
>env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map
>2(a => ("key2",a))
>  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume
>= false)
>  env.execute()
>}
>
>2015-12-28 18:19:17,940 ERROR [Thread-8-MapperProducer_9]
>storm.AbstractStreamBolt[98]: Got exception when processing source:
>WordPrependForAlertExecutor(test)_2:6, stream: default, id: {}, [key1,
>{word=test_abc}]
>java.lang.ClassCastException: scala.Tuple2 cannot be cast to
>org.apache.eagle.datastream.Tuple2
>at 
>org.apache.eagle.datastream.UnionForAlert$$anonfun$1.apply(TestStormRunner
>.scala:34)
>at 
>org.apache.eagle.datastream.core.StreamAlertExpansion$$anonfun$1.apply(Str
>eamAlertExpansion.scala:159)
>at 
>org.apache.eagle.datastream.storm.MapBoltWrapper.onValues(MapBoltWrapper.s
>cala:56)
>at 
>org.apache.eagle.datastream.storm.AbstractStreamBolt.execute(AbstractStrea
>mBolt.scala:93)
>at 
>backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(execu
>tor.clj:633)
>at 
>backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.c
>lj:401)
>at 
>backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj
>:58)
>at 
>backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.ja
>va:125)
>at 
>backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQue
>ue.java:99)
>at 
>backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj
>:80)
>at 
>backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.
>clj:748)
>at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>at clojure.lang.AFn.run(AFn.java:24)
>at java.lang.Thread.run(Thread.java:745)
>
>thanks
>Edward



[BUG]UnionForAlert test case fail

2015-12-28 Thread Zhang, Edward (GDI Hadoop)
I found TestStormRunner::UnionForAlert fail with type incompatibility, I think 
that is because Hao has refactored processing layer, but we should fix those 
unit test cases.


object UnionForAlert extends App{
  val config : Config = ConfigFactory.load;
  val env = StormExecutionEnvironment(config)
  val tail1 = 
env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a 
=> ("key1",a))
  val tail2 = 
env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a 
=> ("key2",a))
  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = 
false)
  env.execute()
}

2015-12-28 18:19:17,940 ERROR [Thread-8-MapperProducer_9] 
storm.AbstractStreamBolt[98]: Got exception when processing source: 
WordPrependForAlertExecutor(test)_2:6, stream: default, id: {}, [key1, 
{word=test_abc}]
java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
org.apache.eagle.datastream.Tuple2
at 
org.apache.eagle.datastream.UnionForAlert$$anonfun$1.apply(TestStormRunner.scala:34)
at 
org.apache.eagle.datastream.core.StreamAlertExpansion$$anonfun$1.apply(StreamAlertExpansion.scala:159)
at 
org.apache.eagle.datastream.storm.MapBoltWrapper.onValues(MapBoltWrapper.scala:56)
at 
org.apache.eagle.datastream.storm.AbstractStreamBolt.execute(AbstractStreamBolt.scala:93)
at 
backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
at 
backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
at 
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at 
backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)

thanks
Edward