Hello, plz help me, I can't join two streams. In the joined stream I've got zero messages and can't understand why?
Kafka Topics: 1st stream topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp': 123123131}, 'compare_with': 'T2'} 2nd stream topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp': 53543543}, 'compare_with': 'T1'} DataStream<Sensor>T1_Stream = env.fromSource( T1_Source, WatermarkStrategy.noWatermarks(), "T1 Stream"); DataStream<Sensor> T2_Stream = env.fromSource( T2_Source, WatermarkStrategy.noWatermarks(), "T2 Stream"); DataStream<Double> comparisonStream = T1_Stream .join(T2_Stream) .where(T1 -> T1.getCompare_with()) .equalTo(T2 -> T2.getSensor_Name()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .apply((JoinFunction<Sensor, Sensor, Double>) (T1, T2) -> { double firstValue = T1.getTemp(); double secondValue = T2.getTemp(); double m = firstValue-secondValue; return m; }); comparisonStream.writeAsText("/tmp/output_k.txt", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE); And my file is empty! What am I do wrong? -- Yours truly, Ivan Borisov | С уважением, Иван Борисов mob./WhatsApp: 7 913 088 8882 Telegram: @Ivan_S_Borisov Skype: ivan.s.borisov e-mail: ivan.s.bori...@gmail.com