Hey Иван,

Use *TumblingProcessingTimeWindows* instead of TumblingEventTimeWindows.
TumblingEventTimeWindows requires a watermark strategy.


*Ref*
https://stackoverflow.com/questions/72291659/flink-tumbling-window-is-not-triggered-no-watermark-strategy

*Regards*
Shrihari

On Fri, Jun 30, 2023 at 9:16 AM Иван Борисов <ivan.s.bori...@gmail.com>
wrote:

> Hello,
> plz help me, I can't join two streams. In the joined stream I've got
> zero messages and can't understand why?
>
> Kafka Topics:
> 1st stream
> topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
> 123123131}, 'compare_with': 'T2'}
> 2nd stream
> topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
> 53543543}, 'compare_with': 'T1'}
>
>
> DataStream<Sensor>T1_Stream = env.fromSource(
> T1_Source,
> WatermarkStrategy.noWatermarks(),
> "T1 Stream");
>
> DataStream<Sensor> T2_Stream = env.fromSource(
> T2_Source,
> WatermarkStrategy.noWatermarks(),
> "T2 Stream");
>
> DataStream<Double> comparisonStream = T1_Stream
> .join(T2_Stream)
> .where(T1 -> T1.getCompare_with())
> .equalTo(T2 -> T2.getSensor_Name())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .apply((JoinFunction<Sensor, Sensor, Double>) (T1, T2) -> {
> double firstValue = T1.getTemp();
> double secondValue = T2.getTemp();
> double m = firstValue-secondValue;
> return m;
> });
> comparisonStream.writeAsText("/tmp/output_k.txt",
> org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
>
> And my file is empty!
> What am I do wrong?
>
> --
> Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
> mob./WhatsApp: 7 913  088 8882
> Telegram: @Ivan_S_Borisov
> Skype: ivan.s.borisov
> e-mail: ivan.s.bori...@gmail.com
>

-- 


THIS EMAIL COMMUNICATION IS PRIVILEGED AND MAY CONTAIN CONFIDENTIAL 
INFORMATION OF RAPIDO. IF YOU ARE NOT THE INTENDED RECIPIENT, YOU ARE 
HEREBY NOTIFIED THAT YOU HAVE RECEIVED THIS MESSAGE IN ERROR AND ANY 
REVIEW, DISSEMINATION, DISTRIBUTION OR COPYING OF THIS MESSAGE IS STRICTLY 
PROHIBITED. PLEASE NOTIFY US IMMEDIATELY BY EMAIL AND DELETE THE MESSAGE 
FROM YOUR SYSTEM.****

NOTHING CONTAINED IN THIS DISCLAIMER SHALL BE 
CONSTRUED IN ANY WAY TO GRANT PERMISSION TO TRANSMIT CONFIDENTIAL 
INFORMATION OR AS A WAIVER OF ANY CONFIDENTIALITY OR PRIVILEGE.****

RAPIDO 
DOES NOT ACCEPT ANY RESPONSIBILITY OR LIABILITY ARISING FROM THE USE OF 
THIS COMMUNICATION. NO REPRESENTATION IS BEING MADE THAT THE INFORMATION 
PRESENTED IS ACCURATE, CURRENT OR COMPLETE AND SUCH INFORMATION IS AT ALL 
TIMES SUBJECT TO CHANGE WITHOUT NOTICE

Reply via email to