[ 
https://issues.apache.org/jira/browse/HDFS-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16821898#comment-16821898
 ] 

angerszhu commented on HDFS-14437:
----------------------------------

[~starphin]

Just add some code in 

#TestEditCode.Transactions.run()

 

Run test:

#testMultiThreadedEditLog

 

**Improtant**

 add thread num and change bufferCapacity
 
{code:java}
@Override
public void run() {
  PermissionStatus p = namesystem.createFsOwnerPermissions(
                                      new FsPermission((short)0777));
  FSEditLog editLog = namesystem.getEditLog();

  for (int i = 0; i < numTransactions; i++) {
    INodeFile inode = new INodeFile(namesystem.allocateNewInodeId(), null,
        p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize, (byte)0);
    inode.toUnderConstruction("", "");

    editLog.logOpenFile("/filename" + (startIndex + i), inode, false, false);
    editLog.logCloseFile("/filename" + (startIndex + i), inode);
    editLog.logSync();
    if(i % 100 ==0){
      try {
        editLog.rollEditLog();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}
{code}

> Exception happened when   rollEditLog expects empty 
> EditsDoubleBuffer.bufCurrent  but not
> -----------------------------------------------------------------------------------------
>
>                 Key: HDFS-14437
>                 URL: https://issues.apache.org/jira/browse/HDFS-14437
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: ha, namenode, qjm
>            Reporter: angerszhu
>            Priority: Major
>
> For the problem mentioned in https://issues.apache.org/jira/browse/HDFS-10943 
> , I have sort the process of write and flush EditLog and some important 
> function, I found the in the class  FSEditLog class, the close() function 
> will call such process like below:
>  
> {code:java}
> waitForSyncToFinish();
> endCurrentLogSegment(true);{code}
> since we have gain the object lock in the function close(), so when  
> waitForSyncToFish() method return, it mean all logSync job has done and all 
> data in bufReady has been flushed out, and since current thread has the lock 
> of this object, when call endCurrentLogSegment(), no other thread will gain 
> the lock so they can't write new editlog into currentBuf.
> But when we don't call waitForSyncToFish() before endCurrentLogSegment(), 
> there may be some autoScheduled logSync()'s flush process is doing, since 
> this process don't need
> synchronization since it has mention in the comment of logSync() method :
>  
> {code:java}
> /**
>  * Sync all modifications done by this thread.
>  *
>  * The internal concurrency design of this class is as follows:
>  *   - Log items are written synchronized into an in-memory buffer,
>  *     and each assigned a transaction ID.
>  *   - When a thread (client) would like to sync all of its edits, logSync()
>  *     uses a ThreadLocal transaction ID to determine what edit number must
>  *     be synced to.
>  *   - The isSyncRunning volatile boolean tracks whether a sync is currently
>  *     under progress.
>  *
>  * The data is double-buffered within each edit log implementation so that
>  * in-memory writing can occur in parallel with the on-disk writing.
>  *
>  * Each sync occurs in three steps:
>  *   1. synchronized, it swaps the double buffer and sets the isSyncRunning
>  *      flag.
>  *   2. unsynchronized, it flushes the data to storage
>  *   3. synchronized, it resets the flag and notifies anyone waiting on the
>  *      sync.
>  *
>  * The lack of synchronization on step 2 allows other threads to continue
>  * to write into the memory buffer while the sync is in progress.
>  * Because this step is unsynchronized, actions that need to avoid
>  * concurrency with sync() should be synchronized and also call
>  * waitForSyncToFinish() before assuming they are running alone.
>  */
> public void logSync() {
>   long syncStart = 0;
>   // Fetch the transactionId of this thread. 
>   long mytxid = myTransactionId.get().txid;
>   
>   boolean sync = false;
>   try {
>     EditLogOutputStream logStream = null;
>     synchronized (this) {
>       try {
>         printStatistics(false);
>         // if somebody is already syncing, then wait
>         while (mytxid > synctxid && isSyncRunning) {
>           try {
>             wait(1000);
>           } catch (InterruptedException ie) {
>           }
>         }
>         //
>         // If this transaction was already flushed, then nothing to do
>         //
>         if (mytxid <= synctxid) {
>           numTransactionsBatchedInSync++;
>           if (metrics != null) {
>             // Metrics is non-null only when used inside name node
>             metrics.incrTransactionsBatchedInSync();
>           }
>           return;
>         }
>    
>         // now, this thread will do the sync
>         syncStart = txid;
>         isSyncRunning = true;
>         sync = true;
>         // swap buffers
>         try {
>           if (journalSet.isEmpty()) {
>             throw new IOException("No journals available to flush");
>           }
>           editLogStream.setReadyToFlush();
>         } catch (IOException e) {
>           final String msg =
>               "Could not sync enough journals to persistent storage " +
>               "due to " + e.getMessage() + ". " +
>               "Unsynced transactions: " + (txid - synctxid);
>           LOG.fatal(msg, new Exception());
>           synchronized(journalSetLock) {
>             IOUtils.cleanup(LOG, journalSet);
>           }
>           terminate(1, msg);
>         }
>       } finally {
>         // Prevent RuntimeException from blocking other log edit write 
>         doneWithAutoSyncScheduling();
>       }
>       //editLogStream may become null,
>       //so store a local variable for flush.
>       logStream = editLogStream;
>     }
>     
>     // do the sync
>     long start = now();
>     try {
>       if (logStream != null) {
>         logStream.flush();
>       }
>     } catch (IOException ex) {
>       synchronized (this) {
>         final String msg =
>             "Could not sync enough journals to persistent storage. "
>             + "Unsynced transactions: " + (txid - synctxid);
>         LOG.fatal(msg, new Exception());
>         synchronized(journalSetLock) {
>           IOUtils.cleanup(LOG, journalSet);
>         }
>         terminate(1, msg);
>       }
>     }
>     long elapsed = now() - start;
>     if (metrics != null) { // Metrics non-null only when used inside name node
>       metrics.addSync(elapsed);
>     }
>     
>   } finally {
>     // Prevent RuntimeException from blocking other log edit sync 
>     synchronized (this) {
>       if (sync) {
>         synctxid = syncStart;
>         for (JournalManager jm : journalSet.getJournalManagers()) {
>           /**
>            * {@link FileJournalManager#lastReadableTxId} is only meaningful
>            * for file-based journals. Therefore the interface is not added to
>            * other types of {@link JournalManager}.
>            */
>           if (jm instanceof FileJournalManager) {
>             ((FileJournalManager)jm).setLastReadableTxId(syncStart);
>           }
>         }
>         isSyncRunning = false;
>       }
>       this.notifyAll();
>    }
>   }
> }
> {code}
> So  when not call waitForSyncFinish() before endCurrentLogSegment() will came 
> to a situation that when it can't guarantee that when call 
> endCurrentLogSegment() method, there is no flush job was doing. Then in the 
> endCurrentLogSegment() method process , bufReady may haven't been flushed out 
> totally, then it swap with the bufCurrent, finally when call 
> EditLogOutputStream's close() function, there is still un-flushed bytes in 
> bufCurrent then cause the error in 
> https://issues.apache.org/jira/browse/HDFS-10943
>  
> so maybe we should add waitForSyncFinish() before endCurrentLogSegment() 
> method in rollEditLog() method in FSEditLog class ?
> {code:java}
> synchronized long rollEditLog() throws IOException {
>  LOG.info("Rolling edit logs");
>  waitForSyncToFinish();
>  endCurrentLogSegment(true);
>  
>  long nextTxId = getLastWrittenTxId() + 1;
>  startLogSegment(nextTxId, true);
>  
>  assert curSegmentTxId == nextTxId;
>  return nextTxId;
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to