Github user poornachandra commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138720628 --- Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java --- @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException { } } - /* - * Appends new writes to the pendingWrites. It is better to keep it in - * our own queue rather than writing it to the HDFS output stream because - * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + } + } + + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()}, concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()}, and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimerIfNeeded(TransactionLogWriter)}. + * + * @throws IOException if the position of the writer cannot be determined */ - private void append(Entry e) throws IOException { - pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } - // Returns all currently pending writes. New writes - // will accumulate in a new list. - private List<Entry> getPendingWrites() { - synchronized (this) { - List<Entry> save = this.pendingWrites; - this.pendingWrites = new LinkedList<>(); - return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}. + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + if (elapsed >= slowAppendThreshold) { + long currentPosition = writer.getPosition(); + long bytesWritten = currentPosition - positionBeforeWrite; + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog - TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { return; } - // prevent writer being dereferenced - tmpWriter = writer; - - List<Entry> currentPending = getPendingWrites(); - if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - - // write out all accumulated entries to log. - for (Entry e : currentPending) { - tmpWriter.append(e); - entryCount++; - latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e : currentPending) { + writer.append(e); + } + // sequence are guaranteed to be ascending, so the last one is the greatest + latestSeq = currentPending[currentPending.length - 1].getKey().get(); + writtenUpTo = latestSeq; } } - long lastSynced = syncedUpTo.get(); + // giving up the sync lock here allows other threads to write their edits before the sync happens. + // hence, we can have the edits from n threads in one sync. + // someone else might have already synced our edits, avoid double syncing - if (lastSynced < latestSeq) { - tmpWriter.sync(); - metricsCollector.histogram("wal.sync.size", entryCount); - syncedUpTo.compareAndSet(lastSynced, latestSeq); + if (syncedUpTo < latestSeq) { + synchronized (this) { + // someone else might have synced our edits while we were waiting + if (syncedUpTo < latestSeq) { + writer.sync(); + syncedUpTo = writtenUpTo; + stopTimerIfNeeded(writer); + } + } } + // in any case, emit metrics for the number entries we wrote. + // because the thread that actually syncs does not know how many it synced (it not write all of them) + metricsCollector.histogram("wal.sync.size", entryCount); --- End diff -- Can we then emit metrics for sync bytes then?
---