Author: ivank Date: Fri Feb 24 18:08:01 2012 New Revision: 1293369 URL: http://svn.apache.org/viewvc?rev=1293369&view=rev Log: BOOKKEEPER-177: Index file is lost or some index pages aren't flushed. (sijie via ivank)
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1293369&r1=1293368&r2=1293369&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Feb 24 18:08:01 2012 @@ -44,6 +44,8 @@ Trunk (unreleased changes) BOOKKEEPER-174: Bookie can't start when replaying entries whose ledger were deleted and garbage collected. (sijie via ivank) + BOOKKEEPER-177: Index file is lost or some index pages aren't flushed. (sijie via ivank) + hedwig-server/ BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank) Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1293369&r1=1293368&r2=1293369&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Fri Feb 24 18:08:01 2012 @@ -197,42 +197,53 @@ public class Bookie extends Thread { } lastLogMark.markLog(); + + boolean flushFailed = false; try { ledgerCache.flushLedger(true); } catch (IOException e) { LOG.error("Exception flushing Ledger", e); + flushFailed = true; } try { entryLogger.flush(); } catch (IOException e) { LOG.error("Exception flushing entry logger", e); + flushFailed = true; } - lastLogMark.rollLog(); - // list the journals that have been marked - List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() { - @Override - public boolean accept(long journalId) { - if (journalId < lastLogMark.lastMark.txnLogId) { - return true; - } else { - return false; + // if flush failed, we should not roll last mark, otherwise we would + // have some ledgers are not flushed and their journal entries were lost + if (!flushFailed) { + + lastLogMark.rollLog(); + + // list the journals that have been marked + List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() { + @Override + public boolean accept(long journalId) { + if (journalId < lastLogMark.lastMark.txnLogId) { + return true; + } else { + return false; + } } - } - }); + }); - // keep MAX_BACKUP_JOURNALS journal files before marked journal - if (logs.size() >= maxBackupJournals) { - int maxIdx = logs.size() - maxBackupJournals; - for (int i=0; i<maxIdx; i++) { - long id = logs.get(i); - // make sure the journal id is smaller than marked journal id - if (id < lastLogMark.lastMark.txnLogId) { - File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn"); - journalFile.delete(); - LOG.info("garbage collected journal " + journalFile.getName()); + // keep MAX_BACKUP_JOURNALS journal files before marked journal + if (logs.size() >= maxBackupJournals) { + int maxIdx = logs.size() - maxBackupJournals; + for (int i=0; i<maxIdx; i++) { + long id = logs.get(i); + // make sure the journal id is smaller than marked journal id + if (id < lastLogMark.lastMark.txnLogId) { + File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn"); + journalFile.delete(); + LOG.info("garbage collected journal " + journalFile.getName()); + } } } + } // clear flushing flag Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=1293369&r1=1293368&r2=1293369&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java Fri Feb 24 18:08:01 2012 @@ -183,11 +183,12 @@ public class LedgerCache { try { if (lep == null) { lep = grabCleanPage(ledger, pageEntry); + // should update page before we put it into table + // otherwise we would put an empty page in it + updatePage(lep); synchronized(this) { putIntoTable(pages, lep); } - updatePage(lep); - } return lep.getOffset(offsetInPage*8); } finally { @@ -303,75 +304,9 @@ public class LedgerCache { } while(!dirtyLedgers.isEmpty()) { Long l = dirtyLedgers.removeFirst(); - LinkedList<Long> firstEntryList; - synchronized(this) { - HashMap<Long, LedgerEntryPage> pageMap = pages.get(l); - if (pageMap == null || pageMap.isEmpty()) { - continue; - } - firstEntryList = new LinkedList<Long>(); - for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) { - LedgerEntryPage lep = entry.getValue(); - if (lep.isClean()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Page is clean " + lep); - } - continue; - } - firstEntryList.add(lep.getFirstEntry()); - } - } - // Now flush all the pages of a ledger - List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size()); - FileInfo fi = null; - try { - for(Long firstEntry: firstEntryList) { - LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true); - if (lep != null) { - entries.add(lep); - } - } - Collections.sort(entries, new Comparator<LedgerEntryPage>() { - @Override - public int compare(LedgerEntryPage o1, LedgerEntryPage o2) { - return (int)(o1.getFirstEntry()-o2.getFirstEntry()); - } - }); - ArrayList<Integer> versions = new ArrayList<Integer>(entries.size()); - fi = getFileInfo(l, null); - int start = 0; - long lastOffset = -1; - for(int i = 0; i < entries.size(); i++) { - versions.add(i, entries.get(i).getVersion()); - if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) { - // send up a sequential list - int count = i - start; - if (count == 0) { - System.out.println("Count cannot possibly be zero!"); - } - writeBuffers(l, entries, fi, start, count); - start = i; - } - lastOffset = entries.get(i).getFirstEntry(); - } - if (entries.size()-start == 0 && entries.size() != 0) { - System.out.println("Nothing to write, but there were entries!"); - } - writeBuffers(l, entries, fi, start, entries.size()-start); - synchronized(this) { - for(int i = 0; i < entries.size(); i++) { - LedgerEntryPage lep = entries.get(i); - lep.setClean(versions.get(i)); - } - } - } finally { - for(LedgerEntryPage lep: entries) { - lep.releasePage(); - } - if (fi != null) { - fi.release(); - } - } + + flushLedger(l); + if (!doAll) { break; } @@ -387,6 +322,92 @@ public class LedgerCache { } } + /** + * Flush a specified ledger + * + * @param l + * Ledger Id + * @throws IOException + */ + private void flushLedger(long l) throws IOException { + LinkedList<Long> firstEntryList; + synchronized(this) { + HashMap<Long, LedgerEntryPage> pageMap = pages.get(l); + if (pageMap == null || pageMap.isEmpty()) { + return; + } + firstEntryList = new LinkedList<Long>(); + for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) { + LedgerEntryPage lep = entry.getValue(); + if (lep.isClean()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Page is clean " + lep); + } + continue; + } + firstEntryList.add(lep.getFirstEntry()); + } + } + + if (firstEntryList.size() == 0) { + LOG.debug("Nothing to flush for ledger {}.", l); + // nothing to do + return; + } + + // Now flush all the pages of a ledger + List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size()); + FileInfo fi = null; + try { + for(Long firstEntry: firstEntryList) { + LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true); + if (lep != null) { + entries.add(lep); + } + } + Collections.sort(entries, new Comparator<LedgerEntryPage>() { + @Override + public int compare(LedgerEntryPage o1, LedgerEntryPage o2) { + return (int)(o1.getFirstEntry()-o2.getFirstEntry()); + } + }); + ArrayList<Integer> versions = new ArrayList<Integer>(entries.size()); + fi = getFileInfo(l, null); + int start = 0; + long lastOffset = -1; + for(int i = 0; i < entries.size(); i++) { + versions.add(i, entries.get(i).getVersion()); + if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) { + // send up a sequential list + int count = i - start; + if (count == 0) { + System.out.println("Count cannot possibly be zero!"); + } + writeBuffers(l, entries, fi, start, count); + start = i; + } + lastOffset = entries.get(i).getFirstEntry(); + } + if (entries.size()-start == 0 && entries.size() != 0) { + System.out.println("Nothing to write, but there were entries!"); + } + writeBuffers(l, entries, fi, start, entries.size()-start); + synchronized(this) { + for(int i = 0; i < entries.size(); i++) { + LedgerEntryPage lep = entries.get(i); + lep.setClean(versions.get(i)); + } + } + } finally { + for(LedgerEntryPage lep: entries) { + lep.releasePage(); + } + if (fi != null) { + fi.release(); + } + } + } + private void writeBuffers(Long ledger, List<LedgerEntryPage> entries, FileInfo fi, int start, int count) throws IOException { Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=1293369&r1=1293368&r2=1293369&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java Fri Feb 24 18:08:01 2012 @@ -141,6 +141,12 @@ public class LedgerDescriptor { try { fi = ledgerCache.getFileInfo(ledgerId, null); long size = fi.size(); + // make sure the file size is aligned with index entry size + // otherwise we may read incorret data + if (0 != size % 8) { + LOG.warn("Index file of ledger {} is not aligned with index entry size.", ledgerId); + size = size - size % 8; + } // we may not have the last entry in the cache if (size > lastEntry*8) { ByteBuffer bb = ByteBuffer.allocate(ledgerCache.getPageSize()); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1293369&r1=1293368&r2=1293369&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Fri Feb 24 18:08:01 2012 @@ -168,6 +168,20 @@ public class ServerConfiguration extends } /** + * Set page size + * + * @see #getPageSize() + * + * @param pageSize + * Page Size + * @return Server Configuration + */ + public ServerConfiguration setPageSize(int pageSize) { + this.setProperty(PAGE_SIZE, pageSize); + return this; + } + + /** * Max journal file size * * @return max journal file size Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java?rev=1293369&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java Fri Feb 24 18:08:01 2012 @@ -0,0 +1,166 @@ +package org.apache.bookkeeper.test; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.Enumeration; +import java.util.List; + +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.BookKeeper.DigestType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.junit.Assert; +import org.junit.Test; + +/** + * This class tests that index corruption cases + */ +public class IndexCorruptionTest extends BaseTestCase { + static Logger LOG = LoggerFactory.getLogger(IndexCorruptionTest.class); + + DigestType digestType; + + int pageSize = 1024; + + public IndexCorruptionTest(DigestType digestType) { + super(1); + this.digestType = digestType; + baseConf.setPageSize(pageSize); + } + + private Thread findSyncThread() { + int threadCount = Thread.activeCount(); + Thread[] allthreads = new Thread[Thread.activeCount()]; + Thread.enumerate(allthreads); + for (Thread t : allthreads) { + if (t.getName().equals("SyncThread")) { + return t; + } + } + return null; + } + + @Test + public void testNoSuchLedger() throws Exception { + LOG.debug("Testing NoSuchLedger"); + + Thread syncThread = findSyncThread(); + assertNotNull("Not found SyncThread.", syncThread); + + syncThread.suspend(); + // Create a ledger + LedgerHandle lh = bkc.createLedger(1, 1, digestType, "".getBytes()); + + // Close the ledger which cause a readEntry(0) call + LedgerHandle newLh = bkc.openLedger(lh.getId(), digestType, "".getBytes()); + + // Create a new ledger to write entries + String dummyMsg = "NoSuchLedger"; + int numMsgs = 3; + LedgerHandle wlh = bkc.createLedger(1, 1, digestType, "".getBytes()); + for (int i=0; i<numMsgs; i++) { + wlh.addEntry(dummyMsg.getBytes()); + } + + syncThread.resume(); + + // trigger sync + Thread.sleep(2 * baseConf.getFlushInterval()); + + // restart bookies + restartBookies(); + + Enumeration<LedgerEntry> seq = wlh.readEntries(0, numMsgs - 1); + assertTrue("Enumeration of ledger entries has no element", seq.hasMoreElements() == true); + int entryId = 0; + while (seq.hasMoreElements()) { + LedgerEntry e = seq.nextElement(); + assertEquals(entryId, e.getEntryId()); + + Assert.assertArrayEquals(dummyMsg.getBytes(), e.getEntry()); + ++entryId; + } + assertEquals(entryId, numMsgs); + } + + @Test + public void testEmptyIndexPage() throws Exception { + LOG.debug("Testing EmptyIndexPage"); + + Thread syncThread = findSyncThread(); + assertNotNull("Not found SyncThread.", syncThread); + + syncThread.suspend(); + + // Create a ledger + LedgerHandle lh1 = bkc.createLedger(1, 1, digestType, "".getBytes()); + + String dummyMsg = "NoSuchLedger"; + + // write two page entries to ledger 2 + int numMsgs = 2 * pageSize / 8; + LedgerHandle lh2 = bkc.createLedger(1, 1, digestType, "".getBytes()); + for (int i=0; i<numMsgs; i++) { + lh2.addEntry(dummyMsg.getBytes()); + } + + syncThread.resume(); + + // trigger sync + Thread.sleep(2 * baseConf.getFlushInterval()); + + syncThread.suspend(); + + // Close ledger 1 which cause a readEntry(0) call + LedgerHandle newLh1 = bkc.openLedger(lh1.getId(), digestType, "".getBytes()); + + // write another 3 entries to ledger 2 + for (int i=0; i<3; i++) { + lh2.addEntry(dummyMsg.getBytes()); + } + + syncThread.resume(); + + // wait for sync again + Thread.sleep(2 * baseConf.getFlushInterval()); + + // restart bookies + restartBookies(); + + numMsgs += 3; + Enumeration<LedgerEntry> seq = lh2.readEntries(0, numMsgs - 1); + assertTrue("Enumeration of ledger entries has no element", seq.hasMoreElements() == true); + int entryId = 0; + while (seq.hasMoreElements()) { + LedgerEntry e = seq.nextElement(); + assertEquals(entryId, e.getEntryId()); + + Assert.assertArrayEquals(dummyMsg.getBytes(), e.getEntry()); + ++entryId; + } + assertEquals(entryId, numMsgs); + } +}