Hello,
plz help me, I can't join two streams. In the joined stream I've got
zero messages and can't understand why?

Kafka Topics:
1st stream
topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
123123131}, 'compare_with': 'T2'}
2nd stream
topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
53543543}, 'compare_with': 'T1'}


DataStream<Sensor>T1_Stream = env.fromSource(
T1_Source,
WatermarkStrategy.noWatermarks(),
"T1 Stream");

DataStream<Sensor> T2_Stream = env.fromSource(
T2_Source,
WatermarkStrategy.noWatermarks(),
"T2 Stream");

DataStream<Double> comparisonStream = T1_Stream
.join(T2_Stream)
.where(T1 -> T1.getCompare_with())
.equalTo(T2 -> T2.getSensor_Name())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply((JoinFunction<Sensor, Sensor, Double>) (T1, T2) -> {
double firstValue = T1.getTemp();
double secondValue = T2.getTemp();
double m = firstValue-secondValue;
return m;
});
comparisonStream.writeAsText("/tmp/output_k.txt",
org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);

And my file is empty!
What am I do wrong?

-- 
Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
mob./WhatsApp: 7 913  088 8882
Telegram: @Ivan_S_Borisov
Skype: ivan.s.borisov
e-mail: ivan.s.bori...@gmail.com

Reply via email to