StephanEwen edited a comment on pull request #13920:
URL: https://github.com/apache/flink/pull/13920#issuecomment-723639969


   ## On Volatile Reads and Performance Considerations
   
   I think the `volatile` read assumption is not completely correct. There is a 
common interpretation that this just means "load variable from memory if 
modified by another thread", meaning it's for free if no other thread touches 
it. But it is a bit different, actually: volatile reads in Java insert a "load 
barrier" for the processor - it affect all variables and memory after that 
barrier, to guarantee the happens-before relationship between everything 
before/after that volatile read. In hardware (x86), a load barrier means 
draining the processor's load buffer completely before continuing, which may 
(and frequently will) stall the processor pipeline.
   
   As a concrete example in Flink: If you look at the current mailbox runtime 
in the StreamTask, it does in fact a lot more work before processing a record 
than the previous "run loop", but is still faster, purely because it has less 
volatile variable access.
   
   Here are two good references:
     - 
https://mechanical-sympathy.blogspot.com/2011/07/memory-barriersfences.html
     - https://www.infoq.com/articles/memory_barriers_jvm_concurrency/
   
   The conclusion here is to use volatile only where you want to establish 
load/store barriers. Typically at the end of a block of work.
   It is not a good idea to have any volatile variables accessed/modified in 
the middle of a block of code that should be visible together, because each 
volatile variable inserts another barrier and potentially causing another stall 
in the processor pipeline.
   
   In Flink this means the following:
     - We have the load/store barriers already once after every record is 
processed, by the mailbox. That is enough to make sure changes by the 
processing thread during one record's processing are made visible to other 
threads before the next record is processed.
     - We don't need a specific additional happens-before relationship around 
values for metrics, so the metrics do not need an additional volatile access.
   
   There is now also an additional static thread to work with that variable, 
which is one more utility thread. There have been quite a few of those recently 
added to Flink, and I think we should start to be careful about adding those. 
One thread alone doesn't hurt, but it adds up over time. Every thread adds 
memory footprint overhead (more than 1MB, again sounds not like much, but doing 
this 10s of times makes a difference) and increases overhead through context 
switches (having 10s of additional threads that jump in every second will 
eventually be measurable). So threads should be added is necessary, but not if 
there is another equally good solution.
   
   I think we really should go the "keep it simple" approach here. No volatile 
on the metric path, no extra threads for "black magic". Simplest possible 
solution, as predictability as possible. I believe it will actually work well, 
the remaining metric code follows that principle and I am not aware of any 
problem.
   If we actually discover problems, we can still look at adding a complex 
magic solution. But I am very hesitant to add that complexity now, without 
really knowing there is a problems that warrants such a solution. Complexity 
should not come before the problem.
   
   ## Event-time Lag Gauge
   
   We can grab the watermark "for free" this way:
     - We don't have to eagerly compute the lag. The best point to compute it 
is directly when the metric is probed, in the `Gauge.getValue()` method. The we 
compute it exactly as often as it is needed.
     - The computation then happens in the metric reporter or the single metric 
thread that periodically gets all metric values to report them.
     - All watermarks leave the source through the `WatermarkOutput` defined 
here: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java#L101
     - The field `WatermarkToDataOutput.maxWatermarkSoFar` already has the 
value we need. So the `getValue()` method could simply to `Math.max(0L, 
System.currentTimeMillis() - output.getMaxWatermarkSoFar())`.
   
   Not using a volatile field here would still guarantee visibility (because 
there are barriers happening in the mailbox loop), visibility might just be a 
few microseconds late, because we don't flush and wait for the store buffer in 
the processor immediately on writing the current watermark value.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to