Re: Need help with finding inner workings of watermark stream idleness
Thanks, Seth. Yea this looks perfect. I had a feeling I'd need to get deep into things, and no time like the present haha. May ask for more guidance with those inner workings to get a bit of a road map. But that gets into the feature idea and beyond the scope of this thread's original question so I'll just do that in a jira ticket in a bit. Just wanted this so I could structure the ticket and plan of attack better. Thanks!! On Tue, Feb 1, 2022, 2:03 PM Seth Wiesman wrote: > Hi Jeff, > > I think the class you're looking for is StatusWatermarkValve. Note that > this is fairly deep into the runtime stack. > > Seth > > On Tue, Feb 1, 2022 at 2:34 PM Jeff Carter wrote: > > > Thanks, Till. > > > > That definitely helps a bit. I'm still not seeing where there is some > idle > > variable that the output.markIdle is setting to true (or whatever it > sets). > > Like the ideal thing would be if there is just some "output.isIdle()" > that > > could be called to know if the stream is or isnt idle. Since that doesn't > > exist, what is the variable in "output" that dictates if it is idle or > not > > that that I'd just have to make an isIdle() method to make its state > > visible to other code. > > > > I see the checkIfIdle() method in the code (in at least the testing > piece) > > you pointed out, but that seems like it's just a way to set a timer and > > check if the idle state should be set or not. But I dont know if that's > > setting some isIdle variable or if it's just checked and calculated > > everytime and that method is basically the variable I'm looking for. But > > that might just be my confusion. > > > > On Tue, Jan 11, 2022, 11:05 AM Till Rohrmann > wrote: > > > > > Hi Jeff, > > > > > > I think this happens in the WatermarksWithIdleness [1]. > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java#L73 > > > > > > Cheers, > > > Till > > > > > > On Tue, Jan 11, 2022 at 6:05 PM Jeff Carter > > wrote: > > > > > > > I'm looking into making a feature for flink related to watermarks and > > am > > > > digging into the inner watermark mechanisms, specifically with > > idleness. > > > > I'm familiar with idleness, but digging into the root code I can only > > get > > > > to where idlenessTimeout gets set in > > WatermarkStrategyWithIdleness.java. > > > > > > > > But what I'm looking for the pieces beyond that. If I set the > idleness > > > to > > > > 500 milliseconds, where in the code does it actually go "I haven't > > seen a > > > > message in 500 milliseconds. I'm setting this stream to idle."? > > > > > > > > The reason being that what I'm thinking of would need to be able to > see > > > if > > > > any streams are marked idle, and if so react accordingly. > > > > > > > > Thanks for any help in advance. > > > > > > > > > >
Re: Need help with finding inner workings of watermark stream idleness
Hi Jeff, I think the class you're looking for is StatusWatermarkValve. Note that this is fairly deep into the runtime stack. Seth On Tue, Feb 1, 2022 at 2:34 PM Jeff Carter wrote: > Thanks, Till. > > That definitely helps a bit. I'm still not seeing where there is some idle > variable that the output.markIdle is setting to true (or whatever it sets). > Like the ideal thing would be if there is just some "output.isIdle()" that > could be called to know if the stream is or isnt idle. Since that doesn't > exist, what is the variable in "output" that dictates if it is idle or not > that that I'd just have to make an isIdle() method to make its state > visible to other code. > > I see the checkIfIdle() method in the code (in at least the testing piece) > you pointed out, but that seems like it's just a way to set a timer and > check if the idle state should be set or not. But I dont know if that's > setting some isIdle variable or if it's just checked and calculated > everytime and that method is basically the variable I'm looking for. But > that might just be my confusion. > > On Tue, Jan 11, 2022, 11:05 AM Till Rohrmann wrote: > > > Hi Jeff, > > > > I think this happens in the WatermarksWithIdleness [1]. > > > > [1] > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java#L73 > > > > Cheers, > > Till > > > > On Tue, Jan 11, 2022 at 6:05 PM Jeff Carter > wrote: > > > > > I'm looking into making a feature for flink related to watermarks and > am > > > digging into the inner watermark mechanisms, specifically with > idleness. > > > I'm familiar with idleness, but digging into the root code I can only > get > > > to where idlenessTimeout gets set in > WatermarkStrategyWithIdleness.java. > > > > > > But what I'm looking for the pieces beyond that. If I set the idleness > > to > > > 500 milliseconds, where in the code does it actually go "I haven't > seen a > > > message in 500 milliseconds. I'm setting this stream to idle."? > > > > > > The reason being that what I'm thinking of would need to be able to see > > if > > > any streams are marked idle, and if so react accordingly. > > > > > > Thanks for any help in advance. > > > > > >
Re: Need help with finding inner workings of watermark stream idleness
Thanks, Till. That definitely helps a bit. I'm still not seeing where there is some idle variable that the output.markIdle is setting to true (or whatever it sets). Like the ideal thing would be if there is just some "output.isIdle()" that could be called to know if the stream is or isnt idle. Since that doesn't exist, what is the variable in "output" that dictates if it is idle or not that that I'd just have to make an isIdle() method to make its state visible to other code. I see the checkIfIdle() method in the code (in at least the testing piece) you pointed out, but that seems like it's just a way to set a timer and check if the idle state should be set or not. But I dont know if that's setting some isIdle variable or if it's just checked and calculated everytime and that method is basically the variable I'm looking for. But that might just be my confusion. On Tue, Jan 11, 2022, 11:05 AM Till Rohrmann wrote: > Hi Jeff, > > I think this happens in the WatermarksWithIdleness [1]. > > [1] > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java#L73 > > Cheers, > Till > > On Tue, Jan 11, 2022 at 6:05 PM Jeff Carter wrote: > > > I'm looking into making a feature for flink related to watermarks and am > > digging into the inner watermark mechanisms, specifically with idleness. > > I'm familiar with idleness, but digging into the root code I can only get > > to where idlenessTimeout gets set in WatermarkStrategyWithIdleness.java. > > > > But what I'm looking for the pieces beyond that. If I set the idleness > to > > 500 milliseconds, where in the code does it actually go "I haven't seen a > > message in 500 milliseconds. I'm setting this stream to idle."? > > > > The reason being that what I'm thinking of would need to be able to see > if > > any streams are marked idle, and if so react accordingly. > > > > Thanks for any help in advance. > > >
Re: Need help with finding inner workings of watermark stream idleness
Hi Jeff, I think this happens in the WatermarksWithIdleness [1]. [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java#L73 Cheers, Till On Tue, Jan 11, 2022 at 6:05 PM Jeff Carter wrote: > I'm looking into making a feature for flink related to watermarks and am > digging into the inner watermark mechanisms, specifically with idleness. > I'm familiar with idleness, but digging into the root code I can only get > to where idlenessTimeout gets set in WatermarkStrategyWithIdleness.java. > > But what I'm looking for the pieces beyond that. If I set the idleness to > 500 milliseconds, where in the code does it actually go "I haven't seen a > message in 500 milliseconds. I'm setting this stream to idle."? > > The reason being that what I'm thinking of would need to be able to see if > any streams are marked idle, and if so react accordingly. > > Thanks for any help in advance. >
Need help with finding inner workings of watermark stream idleness
I'm looking into making a feature for flink related to watermarks and am digging into the inner watermark mechanisms, specifically with idleness. I'm familiar with idleness, but digging into the root code I can only get to where idlenessTimeout gets set in WatermarkStrategyWithIdleness.java. But what I'm looking for the pieces beyond that. If I set the idleness to 500 milliseconds, where in the code does it actually go "I haven't seen a message in 500 milliseconds. I'm setting this stream to idle."? The reason being that what I'm thinking of would need to be able to see if any streams are marked idle, and if so react accordingly. Thanks for any help in advance.