Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138732890 --- Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java --- @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException { } } - /* - * Appends new writes to the pendingWrites. It is better to keep it in - * our own queue rather than writing it to the HDFS output stream because - * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + } + } + + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()}, concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()}, and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimerIfNeeded(TransactionLogWriter)}. + * + * @throws IOException if the position of the writer cannot be determined */ - private void append(Entry e) throws IOException { - pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } - // Returns all currently pending writes. New writes - // will accumulate in a new list. - private List<Entry> getPendingWrites() { - synchronized (this) { - List<Entry> save = this.pendingWrites; - this.pendingWrites = new LinkedList<>(); - return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}. + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + if (elapsed >= slowAppendThreshold) { + long currentPosition = writer.getPosition(); + long bytesWritten = currentPosition - positionBeforeWrite; + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog - TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { return; --- End diff -- It would not always be correct to fail, because the edits from the current thread may actually have been synced. The logic in close() (which is also synchronized) is that this only gets closed after syncing. So a sync has just happened. However, it seems that it is possible that append() is called concurrently and it might append new edits, so there is a possible race condition. I guess we need an extra flag named closing. Close would first set closing to true. Now all calls to append() will fail. Then it will sync and set closed to true, such that any remaining threads do not attempt to sync.
---