Thank you for your answer!

It's work right now, one more question:

I've got few streams from few Kafka topics (if it possible to do other
way and easier I could make one topic or any other modifications) with
sensors measurements into JSON messages:

topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
123123131}, 'compare_with': 'T2'}
topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
53543543}, 'compare_with': 'T1'}
topic3: {'data': {'temp':32, 'sensore_name': 'T3', 'timestamp':
6757575}, 'compare_with': 'T2'}
topic4: {'data': {'temp':12, 'sensore_name': 'T3', 'timestamp':
67856222}, 'compare_with': 'T1'}

I need to compare T1.data.temp - T2.data.temp (I need to compare it
with EXACTLY last measurement of other sensor (Shown in compare_with),
because measurements could come with different frequency: T1 1 message
per sec, T2 1 message per 5 sec., T3 3 message per sec.) calculate AVG
from this difference in 1 hour window, and if this difference more
than AVG, then make Alarm to somewhere... Don't understand how to do
it?

I did like that:

DataStream<MergedSensors> comparisonStream = T1_Stream
.join(T2_Stream)
.where(T1 -> T1.getArbitraged_with())
.equalTo(T2 -> T2.getTicker_symbol())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply((JoinFunction<Sensor, Sensor, MergedSensors>) (T1, T2) -> {

Tuple2<Double, Double> spread;
if (T1.getData().getTemp().isEmpty() || T2.getData().getTemp().isEmpty()) {
spread = new Tuple2<>(1.23, 4.56);
} else {
double a = T1.getData().getTemp();
double b = T2.getData().getTemp();
//return 33.22;
spread = new Tuple2<>(a, b);

}
return new MergedSensors(T1.getTimestamp(), T1.getMs_timestamp(), spread);
});

пт, 30 июн. 2023 г. в 12:40, Schwalbe Matthias <matthias.schwa...@viseca.ch>:
>
> Привет Иван,
>
> The source of your problem is quite easy:
> - If you do windowing by event time, all the sources need to emit watermarks.
> - watermarks are the logical clock used when event-time timing
> - you could use either processing time windows, or adjust watermark strategy 
> of your sources accordingly
>
> ... didn't check other potential sources of troubles in your code
>
> Hope this helps
>
> Thias
>
>
> -----Original Message-----
> From: Иван Борисов <ivan.s.bori...@gmail.com>
> Sent: Freitag, 30. Juni 2023 05:45
> To: user@flink.apache.org
> Subject: Join two streams
>
> Hello,
> plz help me, I can't join two streams. In the joined stream I've got zero 
> messages and can't understand why?
>
> Kafka Topics:
> 1st stream
> topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
> 123123131}, 'compare_with': 'T2'}
> 2nd stream
> topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
> 53543543}, 'compare_with': 'T1'}
>
>
> DataStream<Sensor>T1_Stream = env.fromSource( T1_Source, 
> WatermarkStrategy.noWatermarks(),
> "T1 Stream");
>
> DataStream<Sensor> T2_Stream = env.fromSource( T2_Source, 
> WatermarkStrategy.noWatermarks(),
> "T2 Stream");
>
> DataStream<Double> comparisonStream = T1_Stream
> .join(T2_Stream)
> .where(T1 -> T1.getCompare_with())
> .equalTo(T2 -> T2.getSensor_Name())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .apply((JoinFunction<Sensor, Sensor, Double>) (T1, T2) -> { double firstValue 
> = T1.getTemp(); double secondValue = T2.getTemp(); double m = 
> firstValue-secondValue; return m; }); 
> comparisonStream.writeAsText("/tmp/output_k.txt",
> org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
>
> And my file is empty!
> What am I do wrong?
>
> --
> Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
> mob./WhatsApp: 7 913  088 8882
> Telegram: @Ivan_S_Borisov
> Skype: ivan.s.borisov
> e-mail: ivan.s.bori...@gmail.com
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und 
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit 
> von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
> Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
> Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung 
> per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. 
> Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist 
> streng verboten.
>
> This message is intended only for the named recipient and may contain 
> confidential or privileged information. As the confidentiality of email 
> communication cannot be guaranteed, we do not accept any responsibility for 
> the confidentiality and the intactness of this message. If you have received 
> it in error, please advise the sender by return e-mail and delete this 
> message and any attachments. Any unauthorised use or dissemination of this 
> information is strictly prohibited.



-- 
Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
mob./WhatsApp: 7 913  088 8882
Telegram: @Ivan_S_Borisov
Skype: ivan.s.borisov
e-mail: ivan.s.bori...@gmail.com

Reply via email to