[ https://issues.apache.org/jira/browse/HDFS-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16821898#comment-16821898 ]
angerszhu commented on HDFS-14437: ---------------------------------- [~starphin] Just add some code in #TestEditCode.Transactions.run() Run test: #testMultiThreadedEditLog **Improtant** add thread num and change bufferCapacity {code:java} @Override public void run() { PermissionStatus p = namesystem.createFsOwnerPermissions( new FsPermission((short)0777)); FSEditLog editLog = namesystem.getEditLog(); for (int i = 0; i < numTransactions; i++) { INodeFile inode = new INodeFile(namesystem.allocateNewInodeId(), null, p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize, (byte)0); inode.toUnderConstruction("", ""); editLog.logOpenFile("/filename" + (startIndex + i), inode, false, false); editLog.logCloseFile("/filename" + (startIndex + i), inode); editLog.logSync(); if(i % 100 ==0){ try { editLog.rollEditLog(); } catch (IOException e) { e.printStackTrace(); } } } } {code} > Exception happened when rollEditLog expects empty > EditsDoubleBuffer.bufCurrent but not > ----------------------------------------------------------------------------------------- > > Key: HDFS-14437 > URL: https://issues.apache.org/jira/browse/HDFS-14437 > Project: Hadoop HDFS > Issue Type: Bug > Components: ha, namenode, qjm > Reporter: angerszhu > Priority: Major > > For the problem mentioned in https://issues.apache.org/jira/browse/HDFS-10943 > , I have sort the process of write and flush EditLog and some important > function, I found the in the class FSEditLog class, the close() function > will call such process like below: > > {code:java} > waitForSyncToFinish(); > endCurrentLogSegment(true);{code} > since we have gain the object lock in the function close(), so when > waitForSyncToFish() method return, it mean all logSync job has done and all > data in bufReady has been flushed out, and since current thread has the lock > of this object, when call endCurrentLogSegment(), no other thread will gain > the lock so they can't write new editlog into currentBuf. > But when we don't call waitForSyncToFish() before endCurrentLogSegment(), > there may be some autoScheduled logSync()'s flush process is doing, since > this process don't need > synchronization since it has mention in the comment of logSync() method : > > {code:java} > /** > * Sync all modifications done by this thread. > * > * The internal concurrency design of this class is as follows: > * - Log items are written synchronized into an in-memory buffer, > * and each assigned a transaction ID. > * - When a thread (client) would like to sync all of its edits, logSync() > * uses a ThreadLocal transaction ID to determine what edit number must > * be synced to. > * - The isSyncRunning volatile boolean tracks whether a sync is currently > * under progress. > * > * The data is double-buffered within each edit log implementation so that > * in-memory writing can occur in parallel with the on-disk writing. > * > * Each sync occurs in three steps: > * 1. synchronized, it swaps the double buffer and sets the isSyncRunning > * flag. > * 2. unsynchronized, it flushes the data to storage > * 3. synchronized, it resets the flag and notifies anyone waiting on the > * sync. > * > * The lack of synchronization on step 2 allows other threads to continue > * to write into the memory buffer while the sync is in progress. > * Because this step is unsynchronized, actions that need to avoid > * concurrency with sync() should be synchronized and also call > * waitForSyncToFinish() before assuming they are running alone. > */ > public void logSync() { > long syncStart = 0; > // Fetch the transactionId of this thread. > long mytxid = myTransactionId.get().txid; > > boolean sync = false; > try { > EditLogOutputStream logStream = null; > synchronized (this) { > try { > printStatistics(false); > // if somebody is already syncing, then wait > while (mytxid > synctxid && isSyncRunning) { > try { > wait(1000); > } catch (InterruptedException ie) { > } > } > // > // If this transaction was already flushed, then nothing to do > // > if (mytxid <= synctxid) { > numTransactionsBatchedInSync++; > if (metrics != null) { > // Metrics is non-null only when used inside name node > metrics.incrTransactionsBatchedInSync(); > } > return; > } > > // now, this thread will do the sync > syncStart = txid; > isSyncRunning = true; > sync = true; > // swap buffers > try { > if (journalSet.isEmpty()) { > throw new IOException("No journals available to flush"); > } > editLogStream.setReadyToFlush(); > } catch (IOException e) { > final String msg = > "Could not sync enough journals to persistent storage " + > "due to " + e.getMessage() + ". " + > "Unsynced transactions: " + (txid - synctxid); > LOG.fatal(msg, new Exception()); > synchronized(journalSetLock) { > IOUtils.cleanup(LOG, journalSet); > } > terminate(1, msg); > } > } finally { > // Prevent RuntimeException from blocking other log edit write > doneWithAutoSyncScheduling(); > } > //editLogStream may become null, > //so store a local variable for flush. > logStream = editLogStream; > } > > // do the sync > long start = now(); > try { > if (logStream != null) { > logStream.flush(); > } > } catch (IOException ex) { > synchronized (this) { > final String msg = > "Could not sync enough journals to persistent storage. " > + "Unsynced transactions: " + (txid - synctxid); > LOG.fatal(msg, new Exception()); > synchronized(journalSetLock) { > IOUtils.cleanup(LOG, journalSet); > } > terminate(1, msg); > } > } > long elapsed = now() - start; > if (metrics != null) { // Metrics non-null only when used inside name node > metrics.addSync(elapsed); > } > > } finally { > // Prevent RuntimeException from blocking other log edit sync > synchronized (this) { > if (sync) { > synctxid = syncStart; > for (JournalManager jm : journalSet.getJournalManagers()) { > /** > * {@link FileJournalManager#lastReadableTxId} is only meaningful > * for file-based journals. Therefore the interface is not added to > * other types of {@link JournalManager}. > */ > if (jm instanceof FileJournalManager) { > ((FileJournalManager)jm).setLastReadableTxId(syncStart); > } > } > isSyncRunning = false; > } > this.notifyAll(); > } > } > } > {code} > So when not call waitForSyncFinish() before endCurrentLogSegment() will came > to a situation that when it can't guarantee that when call > endCurrentLogSegment() method, there is no flush job was doing. Then in the > endCurrentLogSegment() method process , bufReady may haven't been flushed out > totally, then it swap with the bufCurrent, finally when call > EditLogOutputStream's close() function, there is still un-flushed bytes in > bufCurrent then cause the error in > https://issues.apache.org/jira/browse/HDFS-10943 > > so maybe we should add waitForSyncFinish() before endCurrentLogSegment() > method in rollEditLog() method in FSEditLog class ? > {code:java} > synchronized long rollEditLog() throws IOException { > LOG.info("Rolling edit logs"); > waitForSyncToFinish(); > endCurrentLogSegment(true); > > long nextTxId = getLastWrittenTxId() + 1; > startLogSegment(nextTxId, true); > > assert curSegmentTxId == nextTxId; > return nextTxId; > }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org