leventov commented on a change in pull request #7038: Fix and document concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine concurrency specification of Firehose URL: https://github.com/apache/incubator-druid/pull/7038#discussion_r258198153
########## File path: server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java ########## @@ -165,59 +168,141 @@ public int getBufferSize() @JsonProperty public long getMaxIdleTime() { - return maxIdleTime; + return maxIdleTimeMillis; } + /** + * Apart from adhering to {@link Firehose} contract regarding concurrency, this class has two methods that might be + * called concurrently with any other methods and each other, from arbitrary number of threads: {@link #addAll} and + * {@link #shutdown}. + * + * This class creates and manages one thread for calling {@link #close()} asynchronously in response to a {@link + * #shutdown} request, or after this Firehose has been idle (no calls to {@link #addAll}) for {@link + * #maxIdleTimeMillis}. + */ + @VisibleForTesting public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric { - private final ScheduledExecutorService exec; - private final ExecutorService idleDetector; - private final BlockingQueue<InputRow> buffer; - private final InputRowParser<Map<String, Object>> parser; + /** + * This field needs to be volatile because it's intialized via double-checked locking in {@link #shutdown}. See + * https://github.com/apache/incubator-druid/pull/6662#discussion_r254161160. + */ + private volatile @Nullable Thread delayedCloseExecutor; - private final Object readLock = new Object(); + /** Contains {@link InputRow} objects, the last one is {@link #FIREHOSE_CLOSED} which is a "poison pill". */ + private final BlockingQueue<Object> buffer; + private final InputRowParser<Map<String, Object>> parser; - private volatile InputRow nextRow = null; + /** This field needs to be volatile to ensure progress in {@link #addRows} method where it is read in a loop. */ private volatile boolean closed = false; + + /** + * This field and {@link #rowsRunOut} are not volatile because they are accessed only from {@link #hasMore()} and + * {@link #nextRow()} methods that are called from a single thread according to {@link Firehose} spec. + */ + private InputRow nextRow = null; + private boolean rowsRunOut = false; + private final AtomicLong bytesReceived = new AtomicLong(0); - private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0); + private final AtomicLong lastBufferAddFailLoggingTimeNs = new AtomicLong(System.nanoTime()); private final ConcurrentHashMap<String, Long> producerSequences = new ConcurrentHashMap<>(); - private final Stopwatch idleWatch = Stopwatch.createUnstarted(); - public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser) + /** + * This field and {@link #requestedShutdownTimeNsHolder} use nanoseconds instead of milliseconds not to deal with + * the fact that {@link System#currentTimeMillis()} can "go backward", e. g. due to time correction on the server. + */ + private final AtomicReference<Long> idleCloseTimeNsHolder = new AtomicReference<>(); + private final AtomicReference<Long> requestedShutdownTimeNsHolder = new AtomicReference<>(); + + EventReceiverFirehose(InputRowParser<Map<String, Object>> parser) { this.buffer = new ArrayBlockingQueue<>(bufferSize); this.parser = parser; - exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d"); - idleDetector = Execs.singleThreaded("event-receiver-firehose-idle-detector-%d"); - idleDetector.submit(() -> { - long idled; - try { - while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) < maxIdleTime) { - Thread.sleep(maxIdleTime - idled); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; + + if (maxIdleTimeMillis != Long.MAX_VALUE) { + idleCloseTimeNsHolder.set(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis)); + synchronized (this) { + createDelayedCloseExecutor(); } - log.info("Firehose has been idle for %d ms, closing.", idled); - close(); - }); - idleWatch.start(); + } } + @GuardedBy("this") + private Thread createDelayedCloseExecutor() + { + Thread delayedCloseExecutor = new Thread( + () -> { + // The closed = true is visible after close() because there is a happens-before edge between + // delayedCloseExecutor.interrupt() call in close() and catching InterruptedException below in this loop. + while (!closed) { + Long closeTimeNs = null; + Boolean dueToShutdownRequest = null; + Long idleCloseTimeNs = idleCloseTimeNsHolder.get(); + if (idleCloseTimeNs != null) { + closeTimeNs = idleCloseTimeNs; + dueToShutdownRequest = false; + } + Long requestedShutdownTimeNs = requestedShutdownTimeNsHolder.get(); + if (requestedShutdownTimeNs != null) { + if (closeTimeNs == null || requestedShutdownTimeNs - closeTimeNs <= 0) { // overflow-aware comparison + closeTimeNs = requestedShutdownTimeNs; + dueToShutdownRequest = true; + } + } + if (closeTimeNs == null) { + log.error( Review comment: It won't allow discovering the bug in the code. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org