HBASE-19083 Introduce a new log writer which can write to two HDFSes
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4709f6ba Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4709f6ba Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4709f6ba Branch: refs/heads/HBASE-19064 Commit: 4709f6ba0f9773c91363caaf7248910bca32860b Parents: 6b77786 Author: zhangduo <zhang...@apache.org> Authored: Thu Jan 11 21:08:02 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Mar 7 18:12:44 2018 +0800 ---------------------------------------------------------------------- .../hbase/regionserver/wal/AsyncFSWAL.java | 21 +-- .../regionserver/wal/CombinedAsyncWriter.java | 134 ++++++++++++++++++ .../hbase/regionserver/wal/DualAsyncFSWAL.java | 67 +++++++++ .../wal/AbstractTestProtobufLog.java | 110 +++------------ .../regionserver/wal/ProtobufLogTestHelper.java | 99 ++++++++++++++ .../regionserver/wal/TestAsyncProtobufLog.java | 32 +---- .../wal/TestCombinedAsyncWriter.java | 136 +++++++++++++++++++ .../hbase/regionserver/wal/TestProtobufLog.java | 14 +- .../regionserver/wal/WriterOverAsyncWriter.java | 63 +++++++++ 9 files changed, 533 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4709f6ba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index e34818f..0bee9d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -607,12 +607,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { } } - @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { + protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException { return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup, channelClass); } + @Override + protected AsyncWriter createWriterInstance(Path path) throws IOException { + return createAsyncWriter(fs, path); + } + private void waitForSafePoint() { consumeLock.lock(); try { @@ -632,13 +636,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { } } - private long closeWriter() { - AsyncWriter oldWriter = this.writer; - if (oldWriter != null) { - long fileLength = oldWriter.getLength(); + protected final long closeWriter(AsyncWriter writer) { + if (writer != null) { + long fileLength = writer.getLength(); closeExecutor.execute(() -> { try { - oldWriter.close(); + writer.close(); } catch (IOException e) { LOG.warn("close old writer failed", e); } @@ -654,7 +657,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { throws IOException { Preconditions.checkNotNull(nextWriter); waitForSafePoint(); - long oldFileLen = closeWriter(); + long oldFileLen = closeWriter(this.writer); logRollAndSetupWalProps(oldPath, newPath, oldFileLen); this.writer = nextWriter; if (nextWriter instanceof AsyncProtobufLogWriter) { @@ -679,7 +682,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { @Override protected void doShutdown() throws IOException { waitForSafePoint(); - closeWriter(); + closeWriter(this.writer); closeExecutor.shutdown(); try { if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/4709f6ba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java new file mode 100644 index 0000000..8ecfede --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + +/** + * An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances. + */ +@InterfaceAudience.Private +public abstract class CombinedAsyncWriter implements AsyncWriter { + + private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class); + + protected final ImmutableList<AsyncWriter> writers; + + protected CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) { + this.writers = writers; + } + + @Override + public long getLength() { + return writers.get(0).getLength(); + } + + @Override + public void close() throws IOException { + Exception error = null; + for (AsyncWriter writer : writers) { + try { + writer.close(); + } catch (Exception e) { + LOG.warn("close writer failed", e); + if (error == null) { + error = e; + } + } + } + if (error != null) { + throw new IOException("Failed to close at least one writer, please see the warn log above. " + + "The cause is the first exception occured", error); + } + } + + protected abstract void doSync(CompletableFuture<Long> future); + + @Override + public CompletableFuture<Long> sync() { + CompletableFuture<Long> future = new CompletableFuture<>(); + doSync(future); + return future; + } + + @Override + public void append(Entry entry) { + writers.forEach(w -> w.append(entry)); + } + + public enum Mode { + SEQUENTIAL, PARALLEL + } + + public static CombinedAsyncWriter create(Mode mode, AsyncWriter writer, AsyncWriter... writers) { + ImmutableList<AsyncWriter> ws = + ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build(); + switch (mode) { + case SEQUENTIAL: + return new CombinedAsyncWriter(ws) { + + private void doSync(CompletableFuture<Long> future, Long length, int index) { + if (index == writers.size()) { + future.complete(length); + return; + } + writers.get(index).sync().whenComplete((len, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + doSync(future, len, index + 1); + }); + } + + @Override + protected void doSync(CompletableFuture<Long> future) { + doSync(future, null, 0); + } + }; + case PARALLEL: + return new CombinedAsyncWriter(ws) { + + @Override + protected void doSync(CompletableFuture<Long> future) { + AtomicInteger remaining = new AtomicInteger(writers.size()); + writers.forEach(w -> w.sync().whenComplete((length, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (remaining.decrementAndGet() == 0) { + future.complete(length); + } + })); + } + }; + default: + throw new IllegalArgumentException("Unknown mode: " + mode); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4709f6ba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java new file mode 100644 index 0000000..42b0dae --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + +/** + * An AsyncFSWAL which writes data to two filesystems. + */ +@InterfaceAudience.Private +public class DualAsyncFSWAL extends AsyncFSWAL { + + private final FileSystem remoteFs; + + private final Path remoteWalDir; + + public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteRootDir, + String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, + boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, + Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { + super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, + eventLoopGroup, channelClass); + this.remoteFs = remoteFs; + this.remoteWalDir = new Path(remoteRootDir, logDir); + } + + @Override + protected AsyncWriter createWriterInstance(Path path) throws IOException { + AsyncWriter localWriter = super.createWriterInstance(path); + AsyncWriter remoteWriter; + boolean succ = false; + try { + remoteWriter = createAsyncWriter(remoteFs, new Path(remoteWalDir, path.getName())); + succ = true; + } finally { + if (!succ) { + closeWriter(localWriter); + } + } + return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter, + localWriter); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4709f6ba/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java index c3f3277..5098609 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java @@ -18,33 +18,15 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.Closeable; import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -56,8 +38,8 @@ import org.junit.rules.TestName; /** * WAL tests that can be reused across providers. */ -public abstract class AbstractTestProtobufLog<W extends Closeable> { - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); +public abstract class AbstractTestProtobufLog { + protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected FileSystem fs; protected Path dir; @@ -93,14 +75,7 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> { TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); // faster failover with cluster.shutdown();fs.close() idiom - TEST_UTIL.getConfiguration() - .setInt("hbase.ipc.client.connect.max.retries", 1); - TEST_UTIL.getConfiguration().setInt( - "dfs.client.block.recovery.retries", 1); - TEST_UTIL.getConfiguration().setInt( - "hbase.ipc.client.connection.maxidletime", 500); - TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, - SampleRegionWALCoprocessor.class.getName()); + TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); TEST_UTIL.startMiniDFSCluster(3); } @@ -131,77 +106,24 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> { * @throws IOException */ private void doRead(boolean withTrailer) throws IOException { - final int columnCount = 5; - final int recordCount = 5; - final TableName tableName = - TableName.valueOf("tablename"); - final byte[] row = Bytes.toBytes("row"); + int columnCount = 5; + int recordCount = 5; + TableName tableName = TableName.valueOf("tablename"); + byte[] row = Bytes.toBytes("row"); long timestamp = System.currentTimeMillis(); Path path = new Path(dir, "tempwal"); // delete the log if already exists, for test only fs.delete(path, true); - W writer = null; - ProtobufLogReader reader = null; - try { - HRegionInfo hri = new HRegionInfo(tableName, - HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HTableDescriptor htd = new HTableDescriptor(tableName); - fs.mkdirs(dir); - // Write log in pb format. - writer = createWriter(path); - for (int i = 0; i < recordCount; ++i) { - WALKeyImpl key = new WALKeyImpl( - hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID); - WALEdit edit = new WALEdit(); - for (int j = 0; j < columnCount; ++j) { - if (i == 0) { - htd.addFamily(new HColumnDescriptor("column" + j)); - } - String value = i + "" + j; - edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value))); - } - append(writer, new WAL.Entry(key, edit)); - } - sync(writer); - if (withTrailer) writer.close(); - - // Now read the log using standard means. - reader = (ProtobufLogReader) wals.createReader(fs, path); - if (withTrailer) { - assertNotNull(reader.trailer); - } else { - assertNull(reader.trailer); - } - for (int i = 0; i < recordCount; ++i) { - WAL.Entry entry = reader.next(); - assertNotNull(entry); - assertEquals(columnCount, entry.getEdit().size()); - assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()); - assertEquals(tableName, entry.getKey().getTableName()); - int idx = 0; - for (Cell val : entry.getEdit().getCells()) { - assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), - val.getRowLength())); - String value = i + "" + idx; - assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val)); - idx++; - } - } - WAL.Entry entry = reader.next(); - assertNull(entry); - } finally { - if (writer != null) { - writer.close(); - } - if (reader != null) { - reader.close(); + fs.mkdirs(dir); + try (WALProvider.Writer writer = createWriter(path)) { + ProtobufLogTestHelper.doWrite(writer, withTrailer, tableName, columnCount, recordCount, row, + timestamp); + try (ProtobufLogReader reader = (ProtobufLogReader) wals.createReader(fs, path)) { + ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row, + timestamp); } } } - protected abstract W createWriter(Path path) throws IOException; - - protected abstract void append(W writer, WAL.Entry entry) throws IOException; - - protected abstract void sync(W writer) throws IOException; + protected abstract WALProvider.Writer createWriter(Path path) throws IOException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/4709f6ba/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java new file mode 100644 index 0000000..ecd8e6c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.stream.IntStream; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; + +/** + * Helper class for testing protobuf log. + */ +final class ProtobufLogTestHelper { + + private ProtobufLogTestHelper() { + } + + private static byte[] toValue(int prefix, int suffix) { + return Bytes.toBytes(prefix + "-" + suffix); + } + + private static RegionInfo toRegionInfo(TableName tableName) { + return RegionInfoBuilder.newBuilder(tableName).setRegionId(1024).build(); + } + + public static void doWrite(WALProvider.Writer writer, boolean withTrailer, TableName tableName, + int columnCount, int recordCount, byte[] row, long timestamp) throws IOException { + RegionInfo hri = toRegionInfo(tableName); + for (int i = 0; i < recordCount; i++) { + WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp, + HConstants.DEFAULT_CLUSTER_ID); + WALEdit edit = new WALEdit(); + int prefix = i; + IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j)) + .map(value -> new KeyValue(row, row, row, timestamp, value)).forEachOrdered(edit::add); + writer.append(new WAL.Entry(key, edit)); + } + writer.sync(); + if (withTrailer) { + writer.close(); + } + } + + public static void doRead(ProtobufLogReader reader, boolean withTrailer, TableName tableName, + int columnCount, int recordCount, byte[] row, long timestamp) throws IOException { + if (withTrailer) { + assertNotNull(reader.trailer); + } else { + assertNull(reader.trailer); + } + RegionInfo hri = toRegionInfo(tableName); + for (int i = 0; i < recordCount; ++i) { + WAL.Entry entry = reader.next(); + assertNotNull(entry); + assertEquals(columnCount, entry.getEdit().size()); + assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()); + assertEquals(tableName, entry.getKey().getTableName()); + int idx = 0; + for (Cell val : entry.getEdit().getCells()) { + assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), + val.getRowLength())); + assertArrayEquals(toValue(i, idx), CellUtil.cloneValue(val)); + idx++; + } + } + assertNull(reader.next()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4709f6ba/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java index 0ea75b6..7626dcf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java @@ -18,29 +18,24 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.ExecutionException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALProvider; -import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; @Category({ RegionServerTests.class, MediumTests.class }) -public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.AsyncWriter> { +public class TestAsyncProtobufLog extends AbstractTestProtobufLog { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -64,25 +59,8 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As } @Override - protected AsyncWriter createWriter(Path path) throws IOException { - return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), fs, path, false, - EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); - } - - @Override - protected void append(AsyncWriter writer, Entry entry) throws IOException { - writer.append(entry); - } - - @Override - protected void sync(AsyncWriter writer) throws IOException { - try { - writer.sync().get(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } catch (ExecutionException e) { - Throwables.propagateIfPossible(e.getCause()); - throw new IOException(e.getCause()); - } + protected Writer createWriter(Path path) throws IOException { + return new WriterOverAsyncWriter(AsyncFSWALProvider.createAsyncWriter( + TEST_UTIL.getConfiguration(), fs, path, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4709f6ba/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java new file mode 100644 index 0000000..d74f9d8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; + +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestCombinedAsyncWriter { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static EventLoopGroup EVENT_LOOP_GROUP; + + private static Class<? extends Channel> CHANNEL_CLASS; + + private static WALFactory WALS; + + @Rule + public final TestName name = new TestName(); + + @Parameter + public CombinedAsyncWriter.Mode mode; + + @Parameters(name = "{index}: mode={0}") + public static List<Object[]> params() { + return Arrays.asList(new Object[] { CombinedAsyncWriter.Mode.SEQUENTIAL }, + new Object[] { CombinedAsyncWriter.Mode.PARALLEL }); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + EVENT_LOOP_GROUP = new NioEventLoopGroup(); + CHANNEL_CLASS = NioSocketChannel.class; + UTIL.startMiniDFSCluster(3); + UTIL.getTestFileSystem().mkdirs(UTIL.getDataTestDirOnTestFS()); + WALS = + new WALFactory(UTIL.getConfiguration(), null, TestCombinedAsyncWriter.class.getSimpleName()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (WALS != null) { + WALS.close(); + } + EVENT_LOOP_GROUP.shutdownGracefully().syncUninterruptibly(); + UTIL.shutdownMiniDFSCluster(); + } + + @Test + public void testWithTrailer() throws IOException { + doTest(true); + } + + @Test + public void testWithoutTrailer() throws IOException { + doTest(false); + } + + private Path getPath(int index) throws IOException { + String methodName = name.getMethodName().replaceAll("[^A-Za-z0-9_-]", "_"); + return new Path(UTIL.getDataTestDirOnTestFS(), methodName + "-" + index); + } + + private void doTest(boolean withTrailer) throws IOException { + int columnCount = 5; + int recordCount = 5; + TableName tableName = TableName.valueOf("tablename"); + byte[] row = Bytes.toBytes("row"); + long timestamp = System.currentTimeMillis(); + Path path1 = getPath(1); + Path path2 = getPath(2); + FileSystem fs = UTIL.getTestFileSystem(); + Configuration conf = UTIL.getConfiguration(); + try ( + AsyncWriter writer1 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path1, false, + EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); + AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false, + EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); + CombinedAsyncWriter writer = CombinedAsyncWriter.create(mode, writer1, writer2)) { + ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName, + columnCount, recordCount, row, timestamp); + try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) { + ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row, + timestamp); + } + try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path2)) { + ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row, + timestamp); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4709f6ba/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java index 080b5be..d429a01 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java @@ -23,14 +23,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.wal.FSHLogProvider; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.junit.ClassRule; import org.junit.experimental.categories.Category; @Category({ RegionServerTests.class, MediumTests.class }) -public class TestProtobufLog extends AbstractTestProtobufLog<WALProvider.Writer> { +public class TestProtobufLog extends AbstractTestProtobufLog { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -40,14 +38,4 @@ public class TestProtobufLog extends AbstractTestProtobufLog<WALProvider.Writer> protected Writer createWriter(Path path) throws IOException { return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false); } - - @Override - protected void append(Writer writer, Entry entry) throws IOException { - writer.append(entry); - } - - @Override - protected void sync(Writer writer) throws IOException { - writer.sync(); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4709f6ba/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java new file mode 100644 index 0000000..da70ee5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; + +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + +class WriterOverAsyncWriter implements WALProvider.Writer { + + private final WALProvider.AsyncWriter asyncWriter; + + public WriterOverAsyncWriter(AsyncWriter asyncWriter) { + this.asyncWriter = asyncWriter; + } + + @Override + public void close() throws IOException { + asyncWriter.close(); + } + + @Override + public long getLength() { + return asyncWriter.getLength(); + } + + @Override + public void sync() throws IOException { + try { + asyncWriter.sync().get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } catch (ExecutionException e) { + Throwables.propagateIfPossible(e.getCause(), IOException.class); + throw new IOException(e.getCause()); + } + } + + @Override + public void append(Entry entry) throws IOException { + asyncWriter.append(entry); + } +}