Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 eae8c2a46 -> 50dd3a5cf


HDFS-11627. Block Storage: Cblock cache should register with flusher to upload 
blocks to containers. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/50dd3a5c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/50dd3a5c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/50dd3a5c

Branch: refs/heads/HDFS-7240
Commit: 50dd3a5cfa7727089dc47b0870fb4dd5df078310
Parents: eae8c2a
Author: Chen Liang <cli...@apache.org>
Authored: Wed Apr 26 10:36:56 2017 -0700
Committer: Chen Liang <cli...@apache.org>
Committed: Wed Apr 26 10:36:56 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/cblock/CBlockConfigKeys.java  |   4 +
 .../cblock/jscsiHelper/BlockWriterTask.java     |   8 +-
 .../cblock/jscsiHelper/CBlockTargetMetrics.java |  44 ++++++-
 .../jscsiHelper/ContainerCacheFlusher.java      |  46 ++++---
 .../cache/impl/AsyncBlockWriter.java            |   6 +-
 .../cache/impl/CBlockLocalCache.java            |   6 +-
 .../jscsiHelper/cache/impl/SyncBlockReader.java |   4 +
 .../hadoop/cblock/TestLocalBlockCache.java      | 121 +++++++++++++++++--
 8 files changed, 202 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/50dd3a5c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
index b1fba41..74f5dc6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
@@ -156,6 +156,10 @@ public final class CBlockConfigKeys {
   public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT =
       5;
 
+  // LevelDB cache file uses an off-heap cache in LevelDB of 256 MB.
+  public static final String DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY =
+      "dfs.cblock.cache.leveldb.cache.size.mb";
+  public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256;
 
   private CBlockConfigKeys() {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50dd3a5c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
index 310dcca..6b5416b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
@@ -79,6 +79,7 @@ public class BlockWriterTask implements Runnable {
       incTryCount();
       Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID());
       client = flusher.getXceiverClientManager().acquireClient(pipeline);
+      containerName = pipeline.getContainerName();
       byte[] keybuf = Longs.toByteArray(block.getBlockID());
       byte[] data;
       long startTime = Time.monotonicNow();
@@ -97,11 +98,16 @@ public class BlockWriterTask implements Runnable {
 
       flusher.incrementRemoteIO();
 
-    } catch (IOException ex) {
+    } catch (Exception ex) {
       flusher.getLOG().error("Writing of block failed, We have attempted " +
               "to write this block {} times to the container {}.Trace ID:{}",
           this.getTryCount(), containerName, "", ex);
       writeRetryBlock(block);
+      if (ex instanceof IOException) {
+        flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();
+      } else {
+        flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks();
+      }
     } finally {
       flusher.incFinishCount(fileName);
       if(client != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50dd3a5c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
index 9ba63ee..1174c33 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
@@ -34,19 +34,26 @@ import org.apache.hadoop.metrics2.lib.MutableRate;
  * as well as the latency time of read and write ops.
  */
 public class CBlockTargetMetrics {
-  // Counter based Metrics
+  // IOPS based Metrics
   @Metric private MutableCounterLong numReadOps;
   @Metric private MutableCounterLong numWriteOps;
   @Metric private MutableCounterLong numReadCacheHits;
   @Metric private MutableCounterLong numReadCacheMiss;
-  @Metric private MutableCounterLong numReadLostBlocks;
-  @Metric private MutableCounterLong numBlockBufferFlush;
+
+  // Cblock internal Metrics
   @Metric private MutableCounterLong numDirectBlockWrites;
-  @Metric private MutableCounterLong numFailedDirectBlockWrites;
+  @Metric private MutableCounterLong numBlockBufferFlush;
   @Metric private MutableCounterLong numDirtyLogBlockRead;
-  @Metric private MutableCounterLong numBytesDirtyLogRead;
   @Metric private MutableCounterLong numDirtyLogBlockUpdated;
+  @Metric private MutableCounterLong numBytesDirtyLogRead;
   @Metric private MutableCounterLong numBytesDirtyLogWritten;
+
+  // Failure Metrics
+  @Metric private MutableCounterLong numReadLostBlocks;
+  @Metric private MutableCounterLong numFailedReadBlocks;
+  @Metric private MutableCounterLong numWriteIOExceptionRetryBlocks;
+  @Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks;
+  @Metric private MutableCounterLong numFailedDirectBlockWrites;
   @Metric private MutableCounterLong numFailedDirtyBlockFlushes;
 
   // Latency based Metrics
@@ -91,10 +98,22 @@ public class CBlockTargetMetrics {
     numDirectBlockWrites.incr();
   }
 
+  public void incNumWriteIOExceptionRetryBlocks() {
+    numWriteIOExceptionRetryBlocks.incr();
+  }
+
+  public void incNumWriteGenericExceptionRetryBlocks() {
+    numWriteGenericExceptionRetryBlocks.incr();
+  }
+
   public void incNumFailedDirectBlockWrites() {
     numFailedDirectBlockWrites.incr();
   }
 
+  public void incNumFailedReadBlocks() {
+    numFailedReadBlocks.incr();
+  }
+
   public void incNumBlockBufferFlush() {
     numBlockBufferFlush.incr();
   }
@@ -179,6 +198,21 @@ public class CBlockTargetMetrics {
   }
 
   @VisibleForTesting
+  public long getNumFailedReadBlocks() {
+    return numFailedReadBlocks.value();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteIOExceptionRetryBlocks() {
+    return numWriteIOExceptionRetryBlocks.value();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteGenericExceptionRetryBlocks() {
+    return numWriteGenericExceptionRetryBlocks.value();
+  }
+
+  @VisibleForTesting
   public long getNumBlockBufferFlush() {
     return numBlockBufferFlush.value();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50dd3a5c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
index 148734f..905d9ba 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
 import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.utils.LevelDBStore;
@@ -77,8 +78,10 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys
     .DFS_CBLOCK_CACHE_THREAD_PRIORITY;
 import static org.apache.hadoop.cblock.CBlockConfigKeys
     .DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT;
-import static 
org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT;
 
 /**
  * Class that writes to remote containers.
@@ -96,6 +99,7 @@ public class ContainerCacheFlusher implements Runnable {
   private final XceiverClientManager xceiverClientManager;
   private final CBlockTargetMetrics metrics;
   private AtomicBoolean shutdown;
+  private final long levelDBCacheSize;
 
   private final ConcurrentMap<String, FinishCounter> finishCountMap;
 
@@ -117,6 +121,8 @@ public class ContainerCacheFlusher implements Runnable {
         DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
     int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
         DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
+    levelDBCacheSize = 
config.getInt(DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY,
+        DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT) * OzoneConsts.MB;
 
     LOG.info("Cache: Core Pool Size: {}", corePoolSize);
     LOG.info("Cache: Keep Alive: {}", keepAlive);
@@ -146,17 +152,14 @@ public class ContainerCacheFlusher implements Runnable {
     this.remoteIO = new AtomicLong();
 
     this.finishCountMap = new ConcurrentHashMap<>();
-    checkExisitingDirtyLog(config);
   }
 
-  private void checkExisitingDirtyLog(Configuration config) {
-    File dbPath = Paths.get(config.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
-        DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT)).toFile();
+  private void checkExistingDirtyLog(File dbPath) {
     if (!dbPath.exists()) {
-      LOG.info("No existing dirty log found at {}", dbPath);
+      LOG.debug("No existing dirty log found at {}", dbPath);
       return;
     }
-    LOG.info("Need to check and requeue existing dirty log {}", dbPath);
+    LOG.debug("Need to check and requeue existing dirty log {}", dbPath);
     HashMap<String, ArrayList<String>> allFiles = new HashMap<>();
     traverse(dbPath, allFiles);
     for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) {
@@ -237,11 +240,10 @@ public class ContainerCacheFlusher implements Runnable {
    * Opens a DB if needed or returns a handle to an already open DB.
    *
    * @param dbPath -- dbPath
-   * @param cacheSize - cacheSize
    * @return the levelDB on the given path.
    * @throws IOException
    */
-  public synchronized LevelDBStore openDB(String dbPath, int cacheSize)
+  public synchronized LevelDBStore openDB(String dbPath)
       throws IOException {
     if (dbMap.containsKey(dbPath)) {
       RefCountedDB refDB = dbMap.get(dbPath);
@@ -249,7 +251,7 @@ public class ContainerCacheFlusher implements Runnable {
       return refDB.db;
     } else {
       Options options = new Options();
-      options.cacheSize(cacheSize * (1024L * 1024L));
+      options.cacheSize(levelDBCacheSize);
       options.createIfMissing(true);
       LevelDBStore cacheDB = new LevelDBStore(
           new File(getDBFileName(dbPath)), options);
@@ -260,14 +262,19 @@ public class ContainerCacheFlusher implements Runnable {
   }
 
   /**
-   * Updates the contianer map. This data never changes so we will update this
+   * Updates the container map. This data never changes so we will update this
    * during restarts and it should not hurt us.
    *
+   * Once a CBlockLocalCache cache is registered, requeue dirty/retry log files
+   * for the volume
+   *
    * @param dbPath - DbPath
-   * @param containerList - Contianer List.
+   * @param containerList - Container List.
    */
   public void register(String dbPath, Pipeline[] containerList) {
+    File dbFile = Paths.get(dbPath).toFile();
     pipelineMap.put(dbPath, containerList);
+    checkExistingDirtyLog(dbFile);
   }
 
   private String getDBFileName(String dbPath) {
@@ -363,7 +370,7 @@ public class ContainerCacheFlusher implements Runnable {
         }
         finishCountMap.put(message.getFileName(),
             new FinishCounter(blockCount, message.getDbPath(),
-                message.getFileName()));
+                message.getFileName(), this));
         // should be flip instead of rewind, because we also need to make sure
         // the end position is correct.
         blockIDBuffer.flip();
@@ -473,14 +480,17 @@ public class ContainerCacheFlusher implements Runnable {
     private final String dirtyLogPath;
     private final AtomicLong currentCount;
     private AtomicBoolean fileDeleted;
+    private final ContainerCacheFlusher flusher;
 
     FinishCounter(long expectedCount, String dbPath,
-        String dirtyLogPath) {
+        String dirtyLogPath, ContainerCacheFlusher flusher) throws IOException 
{
       this.expectedCount = expectedCount;
       this.dbPath = dbPath;
       this.dirtyLogPath = dirtyLogPath;
       this.currentCount = new AtomicLong(0);
       this.fileDeleted = new AtomicBoolean(false);
+      this.flusher = flusher;
+      this.flusher.openDB(dbPath);
     }
 
     public boolean isFileDeleted() {
@@ -494,6 +504,7 @@ public class ContainerCacheFlusher implements Runnable {
         LOG.debug(
             "Deleting {} with count {} {}", filePath, count, expectedCount);
         try {
+          flusher.closeDB(dbPath);
           Path path = Paths.get(filePath);
           Files.delete(path);
           // the following part tries to remove the directory if it is empty
@@ -504,9 +515,8 @@ public class ContainerCacheFlusher implements Runnable {
             Files.delete(parent);
           }*/
           fileDeleted.set(true);
-        } catch (IOException e) {
-          LOG.error(
-              "Error deleting dirty log file {} {}", filePath, e.toString());
+        } catch (Exception e) {
+          LOG.error("Error deleting dirty log file:" + filePath, e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50dd3a5c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
index 9a72f51..1273cd2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
@@ -179,11 +179,11 @@ public class AsyncBlockWriter {
             block.getBlockID(), endTime - startTime, datahash);
       }
       block.clearData();
-      if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
-        writeBlockBufferToFile(blockIDBuffer);
-      }
       parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated();
       blockIDBuffer.putLong(block.getBlockID());
+      if (blockIDBuffer.remaining() == 0) {
+        writeBlockBufferToFile(blockIDBuffer);
+      }
     } else {
       Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
       String containerName = pipeline.getContainerName();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50dd3a5c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
index 576338e..bd034c3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
@@ -69,10 +69,9 @@ public class CBlockLocalCache implements CacheModule {
 
   private final Configuration conf;
   /**
-   * LevelDB cache file, we use an off-heap cache in LevelDB for 256 MB for 
now.
+   * LevelDB cache file.
    */
   private final LevelDBStore cacheDB;
-  private final int cacheSizeMb = 256;
 
   /**
    * Asyncblock writer updates the cacheDB and writes the blocks async to
@@ -158,9 +157,10 @@ public class CBlockLocalCache implements CacheModule {
       throw new IllegalArgumentException("Unable to create paths. Path: " +
           dbPath);
     }
-    cacheDB = flusher.openDB(dbPath.toString(), cacheSizeMb);
+    cacheDB = flusher.openDB(dbPath.toString());
     this.containerList = containerPipelines.toArray(new
         Pipeline[containerPipelines.size()]);
+    flusher.register(dbPath.toString(), containerList);
     this.ipAddressString = getHostIP();
     this.tracePrefix = ipAddressString + ":" + this.volumeName;
     this.volumeSize = volumeSize;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50dd3a5c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java
index 19e3756..533e919 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java
@@ -136,6 +136,10 @@ public class SyncBlockReader {
           .acquireClient(parentCache.getPipeline(blockID));
       LogicalBlock block = getBlockFromContainer(blockID, client);
       return block;
+    } catch (Exception ex) {
+      parentCache.getTargetMetrics().incNumFailedReadBlocks();
+      LOG.error("read failed for BlockId: {}", blockID, ex);
+      throw ex;
     } finally {
       if (client != null) {
         parentCache.getClientManager().releaseClient(client);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50dd3a5c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
index 63ae921..e578b6e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
@@ -518,9 +518,7 @@ public class TestLocalBlockCache {
     // Create a new config so that this tests write metafile to new location
     OzoneConfiguration flushTestConfig = new OzoneConfiguration();
     URL p = flushTestConfig.getClass().getResource("");
-    String path = p.getPath().concat(
-        TestOzoneContainer.class.getSimpleName()
-            + "/testEmptyBlockBufferHandling");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
     flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
     flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
     flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
@@ -528,6 +526,8 @@ public class TestLocalBlockCache {
     String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
     String userName = "user" + RandomStringUtils.randomNumeric(4);
     String data = RandomStringUtils.random(4 * KB);
+    List<Pipeline> pipelines = getContainerPipeline(10);
+
     CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
     ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
         xceiverClientManager, metrics);
@@ -535,7 +535,7 @@ public class TestLocalBlockCache {
         .setConfiguration(flushTestConfig)
         .setVolumeName(volumeName)
         .setUserName(userName)
-        .setPipelines(getContainerPipeline(10))
+        .setPipelines(pipelines)
         .setClientManager(xceiverClientManager)
         .setBlockSize(4 * KB)
         .setVolumeSize(50 * GB)
@@ -565,9 +565,21 @@ public class TestLocalBlockCache {
     ContainerCacheFlusher newFlusher =
         new ContainerCacheFlusher(flushTestConfig,
             xceiverClientManager, newMetrics);
-    Thread fllushListenerThread = new Thread(newFlusher);
-    fllushListenerThread.setDaemon(true);
-    fllushListenerThread.start();
+    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(newFlusher)
+        .setCBlockTargetMetrics(newMetrics)
+        .build();
+    newCache.start();
+    Thread flushListenerThread = new Thread(newFlusher);
+    flushListenerThread.setDaemon(true);
+    flushListenerThread.start();
 
     Thread.sleep(5000);
     Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(),
@@ -575,9 +587,104 @@ public class TestLocalBlockCache {
     Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
             * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
     // Now shutdown again, nothing should be flushed
+    newCache.close();
     newFlusher.shutdown();
     Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated());
     Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
     Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes());
   }
+
+  /**
+   * This test writes some block to the cache and then shuts down the cache
+   * The cache is then restarted with "short.circuit.io" disable to check
+   * that the blocks are read correctly from the container.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testContainerWrites() throws IOException,
+      InterruptedException, TimeoutException {
+    // Create a new config so that this tests write metafile to new location
+    OzoneConfiguration flushTestConfig = new OzoneConfiguration();
+    URL p = flushTestConfig.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+
+    XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+
+    int numUniqueBlocks = 4;
+    String[] data = new String[numUniqueBlocks];
+    String[] dataHash = new String[numUniqueBlocks];
+    for (int i = 0; i < numUniqueBlocks; i++) {
+      data[i] = RandomStringUtils.random(4 * KB);
+      dataHash[i] = DigestUtils.sha256Hex(data[i]);
+    }
+
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
+        xcm, metrics);
+    List<Pipeline> pipelines = getContainerPipeline(10);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xcm)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.start();
+    Thread fllushListenerThread = new Thread(flusher);
+    fllushListenerThread.setDaemon(true);
+    fllushListenerThread.start();
+    Assert.assertTrue(cache.isShortCircuitIOEnabled());
+    // Write data to the cache
+    for (int i = 0; i < 512; i++) {
+      cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
+    }
+    // Close the cache and flush the data to the containers
+    cache.close();
+    Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(512, metrics.getNumWriteOps());
+    Thread.sleep(5000);
+    flusher.shutdown();
+    Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
+    Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
+
+    // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
+    flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
+    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher newFlusher =
+        new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
+    CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
+        .setConfiguration(flushTestConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xcm)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(newFlusher)
+        .setCBlockTargetMetrics(newMetrics)
+        .build();
+    newCache.start();
+    Assert.assertFalse(newCache.isShortCircuitIOEnabled());
+    // this read will be from the container, also match the hash
+    for (int i = 0; i < 512; i++) {
+      LogicalBlock block = newCache.get(i);
+      String readHash = DigestUtils.sha256Hex(block.getData().array());
+      Assert.assertEquals("File content does not match, for index:"
+          + i, dataHash[i % numUniqueBlocks], readHash);
+    }
+    Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
+    Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
+    newFlusher.shutdown();
+    newCache.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to