Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/53#discussion_r138732890
  
    --- Diff: 
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java 
---
    @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
         }
       }
     
    -  /*
    -   * Appends new writes to the pendingWrites. It is better to keep it in
    -   * our own queue rather than writing it to the HDFS output stream because
    -   * HDFSOutputStream.writeChunk is not lightweight at all.
    +  /**
    +   * Return all pending writes at the time the method is called, or null 
if no writes are pending.
    +   *
    +   * Note that after this method returns, there can be additional pending 
writes,
    +   * added concurrently while the existing pending writes are removed.
    +   */
    +  @Nullable
    +  private Entry[] getPendingWrites() {
    +    synchronized (this) {
    +      if (pendingWrites.isEmpty()) {
    +        return null;
    +      }
    +      Entry[] entriesToSync = new Entry[pendingWrites.size()];
    +      for (int i = 0; i < entriesToSync.length; i++) {
    +        entriesToSync[i] = pendingWrites.remove();
    +      }
    +      return entriesToSync;
    +    }
    +  }
    +
    +  /**
    +   * When multiple threads try to log edits at the same time, they all 
will call (@link #append}
    +   * followed by {@link #sync()}, concurrently. Hence, it can happen that 
multiple {@code append()}
    +   * are followed by a single {@code sync}, or vice versa.
    +   *
    +   * We want to record the time and position of the first {@code append()} 
after a {@code sync()},
    +   * then measure the time after the next {@code sync()}, and log a 
warning if it exceeds a threshold.
    +   * Therefore this is called every time before we write the pending list 
out to the log writer.
    +   *
    +   * See {@link #stopTimerIfNeeded(TransactionLogWriter)}.
    +   *
    +   * @throws IOException if the position of the writer cannot be determined
        */
    -  private void append(Entry e) throws IOException {
    -    pendingWrites.add(e);
    +  private void startTimerIfNeeded(TransactionLogWriter writer, int 
entryCount) throws IOException {
    +    // no sync needed because this is only called within a sync block
    +    if (positionBeforeWrite == -1L) {
    +      positionBeforeWrite = writer.getPosition();
    +      countSinceLastSync = 0;
    +      stopWatch.reset().start();
    +    }
    +    countSinceLastSync += entryCount;
       }
     
    -  // Returns all currently pending writes. New writes
    -  // will accumulate in a new list.
    -  private List<Entry> getPendingWrites() {
    -    synchronized (this) {
    -      List<Entry> save = this.pendingWrites;
    -      this.pendingWrites = new LinkedList<>();
    -      return save;
    +  /**
    +   * Called by a {@code sync()} after flushing to file system. Issues a 
warning if the write(s)+sync
    +   * together exceed a threshold.
    +   *
    +   * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
    +   *
    +   * @throws IOException if the position of the writer cannot be determined
    +   */
    +  private void stopTimerIfNeeded(TransactionLogWriter writer) throws 
IOException {
    +    // this method is only called by a thread if it actually called 
sync(), inside a sync block
    +    if (positionBeforeWrite != -1L) { // actually it should never be -1, 
but just in case
    +      stopWatch.stop();
    +      long elapsed = stopWatch.elapsedMillis();
    +      if (elapsed >= slowAppendThreshold) {
    +        long currentPosition = writer.getPosition();
    +        long bytesWritten = currentPosition - positionBeforeWrite;
    +        LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} 
bytes.",
    +                 getName(), elapsed, countSinceLastSync, 
countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
    +      }
         }
    +    positionBeforeWrite = -1L;
    +    countSinceLastSync = 0;
       }
     
       private void sync() throws IOException {
         // writes out pending entries to the HLog
    -    TransactionLogWriter tmpWriter = null;
         long latestSeq = 0;
         int entryCount = 0;
         synchronized (this) {
           if (closed) {
             return;
    --- End diff --
    
    It would not always be correct to fail, because the edits from the current 
thread may actually have been synced. 
    
    The logic in close() (which is also synchronized) is that this only gets 
closed after syncing. So a sync has just happened. However, it seems that it is 
possible that append() is called concurrently and it might append new edits, so 
there is a possible race condition. 
    
    I guess we need an extra flag named closing. Close would first set closing 
to true. Now all calls to append() will fail. Then it will sync and set closed 
to true, such that any remaining threads do not attempt to sync. 


---

Reply via email to