Hi,
the follow code:
val text = env.socketTextStream(hostName, port)
val words1 = text.map { x =>
val res = x.split(",")
(res.apply(0)->res.apply(1))
}
val words2 =
env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4"))
val joinedStream = words1
.coGroup(words2)
.where(_._1)
.equalTo(_._1)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(1))
val res = joinedStream.apply(new InnerJoinFunction).print()
env.execute()
class InnerJoinFunction extends
CoGroupFunction[(String,String),(String,String),(String,String)]{
override def coGroup(T1: java.lang.Iterable[(String,String)],
T2: java.lang.Iterable[(String,String)],
out: Collector[(String, String)]): Unit = {
println("****************************")
println("T1="+T1+"T2="+T2)
import scala.collection.JavaConverters._
val scalaT2 = T2.asScala.toList
if(!T1.asScala.isEmpty && scalaT2.nonEmpty){
val transaction = T1.asScala.last
println("T2 last="+transaction)
for(snapshot <- scalaT2){
out.collect(transaction._1,transaction._2+snapshot._2)
}
}
}
}
--------------------------------
the code have no proplem,and can run,the follow is the result:(input
"a,1" then input "a,2")
****************************
T1=[(a,1)]T2=[(a,w2), (a,w1)]
T2 last=(a,1)
2> (a,1w2)
2> (a,1w1)
****************************
T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)]
T2 last=(a,2)
2> (a,2w2)
2> (a,2w1)
--------------------------------------------------
the T1 is increase,and T2 is not change.i worry,when input so
many,the T1 will out of storage.
so i want to write my "GlobalWindows.create()", to achieve T1 will
store the only one,from input(or read from kafka),and the history of
T1 will clear(input a,1 T1 is [(a,1)],then input a,2,T1 is
[(a,2)],not T1=[(a,1), (a,2)]),but T2 will not change.
i rewrite the "GlobalWindows",but it do not work,i read the code,find
must rewrite the "GlobalWindow",and must modify "the class Serializer
extends TypeSerializer<MyGlobalWindow>",but when i run,it can not
into there,why? some can tell me?