[ https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wellington Chevreuil updated HBASE-21503: ----------------------------------------- Attachment: HBASE-21503-master.001.patch > Replication normal source can get stuck due potential race conditions between > source wal reader and wal provider initialization threads. > ---------------------------------------------------------------------------------------------------------------------------------------- > > Key: HBASE-21503 > URL: https://issues.apache.org/jira/browse/HBASE-21503 > Project: HBase > Issue Type: Bug > Reporter: Wellington Chevreuil > Assignee: Wellington Chevreuil > Priority: Major > Attachments: HBASE-21503-master.001.patch > > > Noticed replication sources could get stuck while doing some tests that > involved RS restart. On these cases, upon RS restart, the newly created > normal source was reaching wal end and not recognising it was open for write, > what leads to remove it from source queue. Thus, no new OP get's replicated > unless this log goes to a recovery queue. > Checking this further, my understanding is that, during restart, RS will > start replication services, which inits ReplicationSourceManager and > ReplicationSources for each wal group id, in below sequence: > {noformat} > HRegionServer -> Replication.startReplicationService() -> > ReplicationSourceManager.init() -> add ReplicationSource > {noformat} > At this point, ReplicationSources have no paths yet, so WAL reader thread is > not running. ReplicationSourceManager is registered as a WAL listener, in > order to get notified whenever new wal file is available. During > ReplicationSourceManager and ReplicationSource instances creation, a > WALFileLengthProvider instance is obtained from WALProvider and cached by > both ReplicationSourceManager and ReplicationSource. The default > implementation for this WALFileLengthProvider is below, on WALProvider > interface: > {noformat} > default WALFileLengthProvider getWALFileLengthProvider() { > return path -> getWALs().stream().map(w -> > w.getLogFileSizeIfBeingWritten(path)) > .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); > } > {noformat} > Notice that if WALProvider.getWALs returns an empty list, this > WALFileLengthProvider instance is always going to return nothing. This is > relevant because when ReplicationSource finally starts > ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, > which is used by WALEntryStream (inside the wal reader) to determine if wal > is being written (and should be kept in the queue) here: > {noformat} > private void tryAdvanceEntry() throws IOException { > if (checkReader()) { > boolean beingWritten = readNextEntryAndRecordReaderPosition(); > LOG.trace("reading wal file {}. Current open for write: {}", > this.currentPath, beingWritten); > if (currentEntry == null && !beingWritten) { > // no more entries in this log file, and the file is already closed, > i.e, rolled > // Before dequeueing, we should always get one more attempt at > reading. > // This is in case more entries came in after we opened the reader, > and the log is rolled > // while we were reading. See HBASE-6758 > resetReader(); > readNextEntryAndRecordReaderPosition(); > if (currentEntry == null) { > if (checkAllBytesParsed()) { // now we're certain we're done with > this log file > dequeueCurrentLog(); > if (openNextLog()) { > readNextEntryAndRecordReaderPosition(); > } > } > } > } > ... > {noformat} > Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() > method that relies on the WALFileLengthProvider: > {noformat} > ... > #1 OptionalLong fileLength = > walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); > if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { > // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible > that we read uncommitted > // data, so we need to make sure that we do not read beyond the > committed file length. > if (LOG.isDebugEnabled()) { > LOG.debug("The provider tells us the valid length for " + currentPath > + " is " + > fileLength.getAsLong() + ", but we have advanced to " + > readerPos); > } > resetReader(); > return true; > } > if (readEntry != null) { > metrics.incrLogEditsRead(); > metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); > } > currentEntry = readEntry; // could be null > this.currentPositionOfReader = readerPos; > return fileLength.isPresent(); > ... > {noformat} > The problem can occur because when wal file is indeed created in > AbstractFSWALProvider.getWAL() method (snippet shown below), line marked as > #2 in below snippet triggers notification of registered WALListeners, > including ReplicationSourceManager, which will start > ReplicationSourceWALReader thread. If ReplicationSourceWALReader thread > reaches the point #1 from snippet above before the thread running > AbstractFSWALProvider.getWAL() method gets to point #3 from below snippet, > then WALFileLengthProvider will return empty and the wal will not be > considered as open, causing it to be dequeued: > {noformat} > @Override > public T getWAL(RegionInfo region) throws IOException { > T walCopy = wal; > if (walCopy == null) { > // only lock when need to create wal, and need to lock since > // creating hlog on fs is time consuming > synchronized (walCreateLock) { > walCopy = wal; > if (walCopy == null) { > walCopy = createWAL(); > boolean succ = false; > try { > #2 walCopy.init(); > succ = true; > } finally { > if (!succ) { > walCopy.close(); > } > } > #3 wal = walCopy; > } > } > } > return walCopy; > } > {noformat} > This can be sorted by making AbstractFSWALProvider.getWALs reuse > AbstractFSWALProvider.getWAL method to obtain the WAL instance. Do we really > have scenarios where we want to return no WAL instance? Another possibility > could be to synchronize getWALs on same lock currently used by getWAL. > Am proposing an initial patch with the 1st solution, after some tests, it > does seem to be enough to sort the problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)