Hey Иван, Use *TumblingProcessingTimeWindows* instead of TumblingEventTimeWindows. TumblingEventTimeWindows requires a watermark strategy.
*Ref* https://stackoverflow.com/questions/72291659/flink-tumbling-window-is-not-triggered-no-watermark-strategy *Regards* Shrihari On Fri, Jun 30, 2023 at 9:16 AM Иван Борисов <ivan.s.bori...@gmail.com> wrote: > Hello, > plz help me, I can't join two streams. In the joined stream I've got > zero messages and can't understand why? > > Kafka Topics: > 1st stream > topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp': > 123123131}, 'compare_with': 'T2'} > 2nd stream > topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp': > 53543543}, 'compare_with': 'T1'} > > > DataStream<Sensor>T1_Stream = env.fromSource( > T1_Source, > WatermarkStrategy.noWatermarks(), > "T1 Stream"); > > DataStream<Sensor> T2_Stream = env.fromSource( > T2_Source, > WatermarkStrategy.noWatermarks(), > "T2 Stream"); > > DataStream<Double> comparisonStream = T1_Stream > .join(T2_Stream) > .where(T1 -> T1.getCompare_with()) > .equalTo(T2 -> T2.getSensor_Name()) > .window(TumblingEventTimeWindows.of(Time.seconds(60))) > .apply((JoinFunction<Sensor, Sensor, Double>) (T1, T2) -> { > double firstValue = T1.getTemp(); > double secondValue = T2.getTemp(); > double m = firstValue-secondValue; > return m; > }); > comparisonStream.writeAsText("/tmp/output_k.txt", > org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE); > > And my file is empty! > What am I do wrong? > > -- > Yours truly, Ivan Borisov | С уважением, Иван Борисов > mob./WhatsApp: 7 913 088 8882 > Telegram: @Ivan_S_Borisov > Skype: ivan.s.borisov > e-mail: ivan.s.bori...@gmail.com > -- THIS EMAIL COMMUNICATION IS PRIVILEGED AND MAY CONTAIN CONFIDENTIAL INFORMATION OF RAPIDO. IF YOU ARE NOT THE INTENDED RECIPIENT, YOU ARE HEREBY NOTIFIED THAT YOU HAVE RECEIVED THIS MESSAGE IN ERROR AND ANY REVIEW, DISSEMINATION, DISTRIBUTION OR COPYING OF THIS MESSAGE IS STRICTLY PROHIBITED. PLEASE NOTIFY US IMMEDIATELY BY EMAIL AND DELETE THE MESSAGE FROM YOUR SYSTEM.**** NOTHING CONTAINED IN THIS DISCLAIMER SHALL BE CONSTRUED IN ANY WAY TO GRANT PERMISSION TO TRANSMIT CONFIDENTIAL INFORMATION OR AS A WAIVER OF ANY CONFIDENTIALITY OR PRIVILEGE.**** RAPIDO DOES NOT ACCEPT ANY RESPONSIBILITY OR LIABILITY ARISING FROM THE USE OF THIS COMMUNICATION. NO REPRESENTATION IS BEING MADE THAT THE INFORMATION PRESENTED IS ACCURATE, CURRENT OR COMPLETE AND SUCH INFORMATION IS AT ALL TIMES SUBJECT TO CHANGE WITHOUT NOTICE