leventov commented on a change in pull request #7038: Fix and document 
concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine 
concurrency specification of Firehose
URL: https://github.com/apache/incubator-druid/pull/7038#discussion_r258198153
 
 

 ##########
 File path: 
server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
 ##########
 @@ -165,59 +168,141 @@ public int getBufferSize()
   @JsonProperty
   public long getMaxIdleTime()
   {
-    return maxIdleTime;
+    return maxIdleTimeMillis;
   }
 
+  /**
+   * Apart from adhering to {@link Firehose} contract regarding concurrency, 
this class has two methods that might be
+   * called concurrently with any other methods and each other, from arbitrary 
number of threads: {@link #addAll} and
+   * {@link #shutdown}.
+   *
+   * This class creates and manages one thread for calling {@link #close()} 
asynchronously in response to a {@link
+   * #shutdown} request, or after this Firehose has been idle (no calls to 
{@link #addAll}) for {@link
+   * #maxIdleTimeMillis}.
+   */
+  @VisibleForTesting
   public class EventReceiverFirehose implements ChatHandler, Firehose, 
EventReceiverFirehoseMetric
   {
-    private final ScheduledExecutorService exec;
-    private final ExecutorService idleDetector;
-    private final BlockingQueue<InputRow> buffer;
-    private final InputRowParser<Map<String, Object>> parser;
+    /**
+     * This field needs to be volatile because it's intialized via 
double-checked locking in {@link #shutdown}. See
+     * 
https://github.com/apache/incubator-druid/pull/6662#discussion_r254161160.
+     */
+    private volatile @Nullable Thread delayedCloseExecutor;
 
-    private final Object readLock = new Object();
+    /** Contains {@link InputRow} objects, the last one is {@link 
#FIREHOSE_CLOSED} which is a "poison pill". */
+    private final BlockingQueue<Object> buffer;
+    private final InputRowParser<Map<String, Object>> parser;
 
-    private volatile InputRow nextRow = null;
+    /** This field needs to be volatile to ensure progress in {@link #addRows} 
method where it is read in a loop. */
     private volatile boolean closed = false;
+
+    /**
+     * This field and {@link #rowsRunOut} are not volatile because they are 
accessed only from {@link #hasMore()} and
+     * {@link #nextRow()} methods that are called from a single thread 
according to {@link Firehose} spec.
+     */
+    private InputRow nextRow = null;
+    private boolean rowsRunOut = false;
+
     private final AtomicLong bytesReceived = new AtomicLong(0);
-    private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
+    private final AtomicLong lastBufferAddFailLoggingTimeNs = new 
AtomicLong(System.nanoTime());
     private final ConcurrentHashMap<String, Long> producerSequences = new 
ConcurrentHashMap<>();
-    private final Stopwatch idleWatch = Stopwatch.createUnstarted();
 
-    public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
+    /**
+     * This field and {@link #requestedShutdownTimeNsHolder} use nanoseconds 
instead of milliseconds not to deal with
+     * the fact that {@link System#currentTimeMillis()} can "go backward", e. 
g. due to time correction on the server.
+     */
+    private final AtomicReference<Long> idleCloseTimeNsHolder = new 
AtomicReference<>();
+    private final AtomicReference<Long> requestedShutdownTimeNsHolder = new 
AtomicReference<>();
+
+    EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
     {
       this.buffer = new ArrayBlockingQueue<>(bufferSize);
       this.parser = parser;
-      exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
-      idleDetector = 
Execs.singleThreaded("event-receiver-firehose-idle-detector-%d");
-      idleDetector.submit(() -> {
-        long idled;
-        try {
-          while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) < 
maxIdleTime) {
-            Thread.sleep(maxIdleTime - idled);
-          }
-        }
-        catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return;
+
+      if (maxIdleTimeMillis != Long.MAX_VALUE) {
+        idleCloseTimeNsHolder.set(System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis));
+        synchronized (this) {
+          createDelayedCloseExecutor();
         }
-        log.info("Firehose has been idle for %d ms, closing.", idled);
-        close();
-      });
-      idleWatch.start();
+      }
     }
 
+    @GuardedBy("this")
+    private Thread createDelayedCloseExecutor()
+    {
+      Thread delayedCloseExecutor = new Thread(
+          () -> {
+            // The closed = true is visible after close() because there is a 
happens-before edge between
+            // delayedCloseExecutor.interrupt() call in close() and catching 
InterruptedException below in this loop.
+            while (!closed) {
+              Long closeTimeNs = null;
+              Boolean dueToShutdownRequest = null;
+              Long idleCloseTimeNs = idleCloseTimeNsHolder.get();
+              if (idleCloseTimeNs != null) {
+                closeTimeNs = idleCloseTimeNs;
+                dueToShutdownRequest = false;
+              }
+              Long requestedShutdownTimeNs = 
requestedShutdownTimeNsHolder.get();
+              if (requestedShutdownTimeNs != null) {
+                if (closeTimeNs == null || requestedShutdownTimeNs - 
closeTimeNs <= 0) { // overflow-aware comparison
+                  closeTimeNs = requestedShutdownTimeNs;
+                  dueToShutdownRequest = true;
+                }
+              }
+              if (closeTimeNs == null) {
+                log.error(
 
 Review comment:
   It won't allow discovering the bug in the code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to