Hi

I am trying to port my spark application in flink.

In spark i have used below command to join multiple stream :
  
   val stream=stream1.join(stream2).join(stream3).join(stream4)


As per my understanding flink required window operation because  flink don't
works on RDD like spark.

so i tried below code to port my spark code in flink . but i don't know its
a right approach or right way to implement  join between multiple stream .

 val stream_join_1 =
stream1.join(stream2).where(_.getField(0).toString()).equalTo(_.getField(0).toString()).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{
(l, r)  =>
              
(l.getField(0).toString(),(l.getField(1).toString(),r.getField(1).toString()))
    }

   val stream_join_2 =
stream3.join(stream4).where(_.getField(0).toString()).equalTo(_.getField(0).toString()).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{
(l, r)  =>
               
(l.getField(0).toString(),(l.getField(1).toString(),r.getField(1).toString()))
    }

   val stream =
stream_join_1.join(stream_join_2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).apply{
(l, r)  =>
                (l._1,(((l._2._1,l._2._2),r._2._1),r._2._2))
   }


please help me to find out right approach .


Regards
Prateek 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-perform-multiple-stream-join-functionality-tp7184.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to