Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138759678 --- Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java --- @@ -85,105 +100,145 @@ public long getTimestamp() { @Override public void append(TransactionEdit edit) throws IOException { - long startTime = System.nanoTime(); - synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - } - - // wait for sync to complete - sync(); - long durationMillis = (System.nanoTime() - startTime) / 1000000L; - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } + append(Collections.singletonList(edit)); } @Override public void append(List<TransactionEdit> edits) throws IOException { - long startTime = System.nanoTime(); - synchronized (this) { - ensureAvailable(); - + if (closing) { // or closed, which implies closing + throw new IOException("Log " + getName() + " is closing or already closed, cannot append"); + } + if (!initialized) { + init(); + } + // synchronizing here ensures that elements in the queue are ordered by seq number + synchronized (logSequence) { for (TransactionEdit edit : edits) { - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); + pendingWrites.add(new Entry(new LongWritable(logSequence.getAndIncrement()), edit)); } } - - // wait for sync to complete + // try to sync all pending edits (competing for this with other threads) sync(); - long durationMillis = (System.nanoTime() - startTime) / 1000000L; - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } } - private void ensureAvailable() throws IOException { - if (closed) { - throw new IOException("Log " + getName() + " is already closed, cannot append!"); - } - if (!initialized) { - init(); + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; } } - /* - * Appends new writes to the pendingWrites. It is better to keep it in - * our own queue rather than writing it to the HDFS output stream because - * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()}, concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()}, and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimer(TransactionLogWriter)}. + * + * @throws IOException if the position of the writer cannot be determined */ - private void append(Entry e) throws IOException { - pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } - // Returns all currently pending writes. New writes - // will accumulate in a new list. - private List<Entry> getPendingWrites() { - synchronized (this) { - List<Entry> save = this.pendingWrites; - this.pendingWrites = new LinkedList<>(); - return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}. + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimer(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + long bytesWritten = writer.getPosition() - positionBeforeWrite; + if (elapsed >= slowAppendThreshold) { + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } + metricsCollector.histogram("wal.sync.size", countSinceLastSync); + metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); // single sync won't exceed max int } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog - TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { - return; - } - // prevent writer being dereferenced - tmpWriter = writer; - - List<Entry> currentPending = getPendingWrites(); - if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); + if (pendingWrites.isEmpty()) { + // this expected: close() sets closed to true after syncing all pending writes (including ours) + return; + } + // this should never happen because close() only sets closed=true after syncing. + // but if it should happen, we must fail this call because we don't know whether the edit was persisted + throw new IOException( + "Unexpected state: Writer is closed but there are pending edits. Cannot guarantee that edits were persisted"); } - - // write out all accumulated entries to log. - for (Entry e : currentPending) { - tmpWriter.append(e); - entryCount++; - latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e : currentPending) { + writer.append(e); + } + // sequence are guaranteed to be ascending, so the last one is the greatest + latestSeq = currentPending[currentPending.length - 1].getKey().get(); + writtenUpTo = latestSeq; } } - long lastSynced = syncedUpTo.get(); + // giving up the sync lock here allows other threads to write their edits before the sync happens. + // hence, we can have the edits from n threads in one sync. + // someone else might have already synced our edits, avoid double syncing - if (lastSynced < latestSeq) { - tmpWriter.sync(); - metricsCollector.histogram("wal.sync.size", entryCount); - syncedUpTo.compareAndSet(lastSynced, latestSeq); + if (syncedUpTo < latestSeq) { --- End diff -- Please see the previous discussion of this at https://github.com/apache/incubator-tephra/pull/53#pullrequestreview-62374698
---