KYLIN-1894 GlobalDictionary may corrupt when server suddenly crash

Signed-off-by: shaofengshi <shaofeng...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/30fb1e06
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/30fb1e06
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/30fb1e06

Branch: refs/heads/master
Commit: 30fb1e060b61a099d3cc4b018996fce2395f0c0a
Parents: 928baaf
Author: sunyerui <sunye...@gmail.com>
Authored: Fri Jul 15 12:30:06 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Mon Jul 18 18:50:36 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/dict/AppendTrieDictionary.java |  65 ++---
 .../org/apache/kylin/dict/CachedTreeMap.java    |  83 +++++-
 .../kylin/dict/AppendTrieDictionaryTest.java    |   6 +-
 .../apache/kylin/dict/CachedTreeMapTest.java    | 265 +++++++++++++++++++
 4 files changed, 364 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/30fb1e06/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
index 32038bf..4cce586 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
@@ -99,13 +99,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
         }
     }
 
-    public void update(String baseDir, int baseId, int maxId, int 
maxValueLength, int nValues, BytesConverter bytesConverter, byte[] 
dictMapBytes) throws IOException {
-        ByteArrayInputStream buf = new ByteArrayInputStream(dictMapBytes);
-        DataInputStream input = new DataInputStream(buf);
-        update(baseDir, baseId, maxId, maxValueLength, nValues, 
bytesConverter, input);
-    }
-
-    public void update(String baseDir, int baseId, int maxId, int 
maxValueLength, int nValues, BytesConverter bytesConverter, DataInput input) 
throws IOException {
+    public void update(String baseDir, int baseId, int maxId, int 
maxValueLength, int nValues, BytesConverter bytesConverter, CachedTreeMap 
dictMap) throws IOException {
         this.baseDir = baseDir;
         this.baseId = baseId;
         this.maxId = maxId;
@@ -114,11 +108,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> 
{
         this.bytesConverter = bytesConverter;
 
         int cacheSize = 
KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
-        if (dictSliceMap == null) {
-            dictSliceMap = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
-        }
-        dictSliceMap.clear();
-        ((Writable) dictSliceMap).readFields(input);
+        dictSliceMap = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
+        ((CachedTreeMap)dictSliceMap).loadEntry(dictMap);
     }
 
     public byte[] writeDictMap() throws IOException {
@@ -777,7 +768,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
 
         private AppendTrieDictionary dict;
 
-        private TreeMap<DictSliceKey, DictNode> dictSliceMap;
+        private TreeMap<DictSliceKey, DictNode> mutableDictSliceMap;
         private static int MAX_ENTRY_IN_SLICE = 10_000_000;
         private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0;
 
@@ -803,9 +794,9 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
             MAX_ENTRY_IN_SLICE = 
KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
             int cacheSize = 
KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
             // create a new cached map with baseDir
-            dictSliceMap = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).persistent(true).immutable(false).build();
+            mutableDictSliceMap = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).persistent(true).immutable(false).build();
             if (dictMapBytes != null) {
-                ((Writable) dictSliceMap).readFields(new DataInputStream(new 
ByteArrayInputStream(dictMapBytes)));
+                ((Writable) mutableDictSliceMap).readFields(new 
DataInputStream(new ByteArrayInputStream(dictMapBytes)));
             }
         }
 
@@ -819,23 +810,23 @@ public class AppendTrieDictionary<T> extends 
Dictionary<T> {
             }
             maxValueLength = Math.max(maxValueLength, value.length);
 
-            if (dictSliceMap.isEmpty()) {
+            if (mutableDictSliceMap.isEmpty()) {
                 DictNode root = new DictNode(new byte[0], false);
-                dictSliceMap.put(DictSliceKey.wrap(new byte[0]), root);
+                mutableDictSliceMap.put(DictSliceKey.wrap(new byte[0]), root);
             }
-            DictSliceKey sliceKey = 
dictSliceMap.floorKey(DictSliceKey.wrap(value));
+            DictSliceKey sliceKey = 
mutableDictSliceMap.floorKey(DictSliceKey.wrap(value));
             if (sliceKey == null) {
-                sliceKey = dictSliceMap.firstKey();
+                sliceKey = mutableDictSliceMap.firstKey();
             }
-            DictNode root = dictSliceMap.get(sliceKey);
+            DictNode root = mutableDictSliceMap.get(sliceKey);
             addValueR(root, value, 0);
             if (root.childrenCount > MAX_ENTRY_IN_SLICE * 
MAX_ENTRY_OVERHEAD_FACTOR) {
-                dictSliceMap.remove(sliceKey);
+                mutableDictSliceMap.remove(sliceKey);
                 DictNode newRoot = splitNodeTree(root);
                 DictNode.mergeSingleByteNode(root, 1);
                 DictNode.mergeSingleByteNode(newRoot, 0);
-                dictSliceMap.put(DictSliceKey.wrap(root.firstValue()), root);
-                dictSliceMap.put(DictSliceKey.wrap(newRoot.firstValue()), 
newRoot);
+                mutableDictSliceMap.put(DictSliceKey.wrap(root.firstValue()), 
root);
+                
mutableDictSliceMap.put(DictSliceKey.wrap(newRoot.firstValue()), newRoot);
             }
         }
 
@@ -956,18 +947,11 @@ public class AppendTrieDictionary<T> extends 
Dictionary<T> {
         }
 
         public AppendTrieDictionary<T> build(int baseId) throws IOException {
-            ByteArrayOutputStream buf = new ByteArrayOutputStream();
-            DataOutputStream out = new DataOutputStream(buf);
-            ((Writable) dictSliceMap).write(out);
-            byte[] dictMapBytes = buf.toByteArray();
-            buf.close();
-            out.close();
-
             if (dict == null) {
                 dict = new AppendTrieDictionary<T>();
             }
-            dict.update(baseDir, baseId, maxId, maxValueLength, nValues, 
bytesConverter, dictMapBytes);
-            dict.flushIndex();
+            dict.flushIndex((CachedTreeMap) mutableDictSliceMap);
+            dict.update(baseDir, baseId, maxId, maxValueLength, nValues, 
bytesConverter, (CachedTreeMap)mutableDictSliceMap);
 
             return dict;
         }
@@ -1047,24 +1031,25 @@ public class AppendTrieDictionary<T> extends 
Dictionary<T> {
         throw new UnsupportedOperationException("AppendTrieDictionary can't 
retrive value from id");
     }
 
-    public void flushIndex() throws IOException {
-        Path filePath = new Path(baseDir + "/.index");
+    public void flushIndex(CachedTreeMap dictSliceMap) throws IOException {
+        Path filePath = new Path(dictSliceMap.getCurrentDir() + "/.index");
         Configuration conf = new Configuration();
-        try (FSDataOutputStream indexOut = (FileSystem.get(filePath.toUri(), 
conf)).create(filePath, true, 8 * 1024 * 1024, (short) 2, 8 * 1024 * 1024 * 8)) 
{
+        try (FSDataOutputStream indexOut = (FileSystem.get(filePath.toUri(), 
conf)).create(filePath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8)) 
{
             indexOut.writeInt(baseId);
             indexOut.writeInt(maxId);
             indexOut.writeInt(maxValueLength);
             indexOut.writeInt(nValues);
             indexOut.writeUTF(bytesConverter.getClass().getName());
-            ((Writable) dictSliceMap).write(indexOut);
+            dictSliceMap.write(indexOut);
         }
+        dictSliceMap.commit(false);
     }
 
     @Override
     public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, 
KylinConfig dstConfig) throws IOException {
         Configuration conf = new Configuration();
         AppendTrieDictionary newDict = new AppendTrieDictionary();
-        
newDict.update(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), 
dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, 
bytesConverter, writeDictMap());
+        
newDict.update(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), 
dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, 
bytesConverter, (CachedTreeMap)dictSliceMap);
         logger.info("Copy AppendDict from {} to {}", this.baseDir, 
newDict.baseDir);
         Path srcPath = new Path(this.baseDir);
         Path dstPath = new Path(newDict.baseDir);
@@ -1081,7 +1066,6 @@ public class AppendTrieDictionary<T> extends 
Dictionary<T> {
     @Override
     public void write(DataOutput out) throws IOException {
         out.writeUTF(baseDir);
-        flushIndex();
     }
 
     @Override
@@ -1103,7 +1087,10 @@ public class AppendTrieDictionary<T> extends 
Dictionary<T> {
                     throw new IOException(e);
                 }
             }
-            update(baseDir, baseId, maxId, maxValueLength, nValues, converter, 
input);
+            CachedTreeMap dictMap = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder()
+                    
.baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
+            dictMap.readFields(input);
+            update(baseDir, baseId, maxId, maxValueLength, nValues, converter, 
dictMap);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/30fb1e06/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
index ec29bb5..1ea3c1c 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -58,6 +59,8 @@ public class CachedTreeMap<K extends WritableComparable, V 
extends Writable> ext
     private final TreeSet<String> fileList;
     private final Configuration conf;
     private final String baseDir;
+    private final String tmpDir;
+    private final FileSystem fs;
     private final boolean persistent;
     private final boolean immutable;
     private long writeValueTime = 0;
@@ -110,7 +113,7 @@ public class CachedTreeMap<K extends WritableComparable, V 
extends Writable> ext
             return this;
         }
 
-        public CachedTreeMap build() {
+        public CachedTreeMap build() throws IOException {
             if (baseDir == null) {
                 throw new RuntimeException("CachedTreeMap need a baseDir to 
cache data");
             }
@@ -122,13 +125,19 @@ public class CachedTreeMap<K extends WritableComparable, 
V extends Writable> ext
         }
     }
 
-    private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> 
valueClazz, String baseDir, boolean persistent, boolean immutable) {
+    private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> 
valueClazz, String baseDir, boolean persistent, boolean immutable) throws 
IOException {
         super();
         this.keyClazz = keyClazz;
         this.valueClazz = valueClazz;
         this.fileList = new TreeSet<>();
         this.conf = new Configuration();
-        this.baseDir = baseDir;
+        if (baseDir.endsWith("/")) {
+            this.baseDir = baseDir.substring(0, baseDir.length()-1);
+        } else {
+            this.baseDir = baseDir;
+        }
+        this.tmpDir = this.baseDir + ".tmp";
+        this.fs = FileSystem.get(new Path(baseDir).toUri(), conf);
         this.persistent = persistent;
         this.immutable = immutable;
         CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new 
RemovalListener<K, V>() {
@@ -140,17 +149,27 @@ public class CachedTreeMap<K extends WritableComparable, 
V extends Writable> ext
                     writeValue(notification.getKey(), notification.getValue());
                     break;
                 case EXPLICIT:
-                    // skip delete files to recover from error during dict 
appending
-                    // deleteValue(notification.getKey());
+                    deleteValue(notification.getKey());
                     break;
                 default:
                     throw new RuntimeException("unexpected evict reason " + 
notification.getCause());
                 }
             }
-        }).maximumSize(maxCount);
-        // For immutable values, use soft reference to free memory when gc, 
and just load again when need it
+        });
+        // For immutable values, load all values as much as possible, and 
evict by soft reference to free memory when gc
         if (this.immutable) {
             builder.softValues();
+        } else {
+            builder.maximumSize(maxCount);
+            // For mutable map, copy all data into tmp and modify on tmp data, 
avoiding suddenly server crash made data corrupt
+            if (fs.exists(new Path(tmpDir))) {
+                fs.delete(new Path(tmpDir), true);
+            }
+            if (fs.exists(new Path(this.baseDir))) {
+                FileUtil.copy(fs, new Path(this.baseDir), fs, new 
Path(tmpDir), false, true, conf);
+            } else {
+                fs.mkdirs(new Path(this.baseDir));
+            }
         }
         this.valueCache = builder.build(new CacheLoader<K, V>() {
             @Override
@@ -163,10 +182,47 @@ public class CachedTreeMap<K extends WritableComparable, 
V extends Writable> ext
     }
 
     private String generateFileName(K key) {
-        String file = baseDir + "/cached_" + key.toString();
+        String file = (immutable ? baseDir : tmpDir) + "/cached_" + 
key.toString();
         return file;
     }
 
+    public String getCurrentDir() {
+        return immutable ? baseDir : tmpDir;
+    }
+
+    public void commit(boolean stillMutable) throws IOException {
+        assert !immutable : "Only support commit method with immutable false";
+
+        Path basePath = new Path(baseDir);
+        Path backupPath = new Path(baseDir+".bak");
+        Path tmpPath = new Path(tmpDir);
+        try {
+            fs.rename(basePath, backupPath);
+        } catch (IOException e) {
+            logger.info("CachedTreeMap commit backup basedir failed, " + e, e);
+            throw e;
+        }
+
+        try {
+            if (stillMutable) {
+                FileUtil.copy(fs, tmpPath, fs, basePath, false, true, conf);
+            } else {
+                fs.rename(tmpPath, basePath);
+            }
+            fs.delete(backupPath, true);
+        } catch (IOException e) {
+            fs.rename(backupPath, basePath);
+            logger.info("CachedTreeMap commit move/copy tmpdir failed, " + e, 
e);
+            throw e;
+        }
+    }
+
+    public void loadEntry(CachedTreeMap other) {
+        for (Object key : other.keySet()) {
+            super.put((K)key, null);
+        }
+    }
+
     private void writeValue(K key, V value) {
         if (immutable) {
             return;
@@ -174,10 +230,10 @@ public class CachedTreeMap<K extends WritableComparable, 
V extends Writable> ext
         long t0 = System.currentTimeMillis();
         String fileName = generateFileName(key);
         Path filePath = new Path(fileName);
-        try (FSDataOutputStream out = (FileSystem.get(filePath.toUri(), 
conf)).create(filePath, true, BUFFER_SIZE, (short) 2, BUFFER_SIZE * 8)) {
+        try (FSDataOutputStream out = fs.create(filePath, true, BUFFER_SIZE, 
(short) 5, BUFFER_SIZE * 8)) {
             value.write(out);
             if (!persistent) {
-                FileSystem.get(filePath.toUri(), conf).deleteOnExit(filePath);
+                fs.deleteOnExit(filePath);
             }
         } catch (Exception e) {
             logger.error(String.format("write value into %s exception: %s", 
fileName, e), e);
@@ -192,7 +248,7 @@ public class CachedTreeMap<K extends WritableComparable, V 
extends Writable> ext
         long t0 = System.currentTimeMillis();
         String fileName = generateFileName(key);
         Path filePath = new Path(fileName);
-        try (FSDataInputStream input = (FileSystem.get(filePath.toUri(), 
conf)).open(filePath, BUFFER_SIZE)) {
+        try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) {
             V value = valueClazz.newInstance();
             value.readFields(input);
             return value;
@@ -211,7 +267,6 @@ public class CachedTreeMap<K extends WritableComparable, V 
extends Writable> ext
         String fileName = generateFileName(key);
         Path filePath = new Path(fileName);
         try {
-            FileSystem fs = FileSystem.get(filePath.toUri(), conf);
             if (fs.exists(filePath)) {
                 fs.delete(filePath, true);
             }
@@ -224,6 +279,7 @@ public class CachedTreeMap<K extends WritableComparable, V 
extends Writable> ext
 
     @Override
     public V put(K key, V value) {
+        assert !immutable : "Only support put method with immutable false";
         super.put(key, null);
         valueCache.put(key, value);
         return null;
@@ -245,6 +301,7 @@ public class CachedTreeMap<K extends WritableComparable, V 
extends Writable> ext
 
     @Override
     public V remove(Object key) {
+        assert !immutable : "Only support remove method with immutable false";
         super.remove(key);
         valueCache.invalidate(key);
         return null;
@@ -300,6 +357,7 @@ public class CachedTreeMap<K extends WritableComparable, V 
extends Writable> ext
 
         @Override
         public void remove() {
+            assert !immutable : "Only support remove method with immutable 
false";
             keyIterator.remove();
             valueCache.invalidate(currentKey);
         }
@@ -344,7 +402,6 @@ public class CachedTreeMap<K extends WritableComparable, V 
extends Writable> ext
             for (String file : fileList) {
                 try {
                     Path filePath = new Path(file);
-                    FileSystem fs = FileSystem.get(filePath.toUri(), conf);
                     fs.delete(filePath, true);
                 } catch (Throwable t) {
                     //do nothing?

http://git-wip-us.apache.org/repos/asf/kylin/blob/30fb1e06/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
 
b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index 0ea5ebe..b81a439 100644
--- 
a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ 
b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -56,7 +56,7 @@ public class AppendTrieDictionaryTest {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         config.setAppendDictEntrySize(50000);
         config.setAppendDictCacheSize(3);
-        config.setProperty("kylin.hdfs.working.dir", "/tmp");
+        config.setProperty("kylin.hdfs.working.dir", "/tmp/kylin_append_dict");
     }
 
     @AfterClass
@@ -122,7 +122,7 @@ public class AppendTrieDictionaryTest {
     @Test
     public void testHugeKeySet() throws IOException {
         BytesConverter converter = new StringBytesConverter();
-        AppendTrieDictionary.Builder<String> b = 
AppendTrieDictionary.Builder.create("/tmp");
+        AppendTrieDictionary.Builder<String> b = 
AppendTrieDictionary.Builder.create("/tmp/kylin_append_dict");
         AppendTrieDictionary<String> dict = null;
 
         InputStream is = new 
FileInputStream("src/test/resources/dict/huge_key");
@@ -152,7 +152,7 @@ public class AppendTrieDictionaryTest {
         }
         BytesConverter converter = new StringBytesConverter();
 
-        AppendTrieDictionary.Builder<String> b = 
AppendTrieDictionary.Builder.create("/tmp");
+        AppendTrieDictionary.Builder<String> b = 
AppendTrieDictionary.Builder.create("/tmp/kylin_append_dict");
         AppendTrieDictionary<String> dict = null;
         TreeMap<Integer, String> checkMap = new TreeMap<>();
         int firstAppend = rnd.nextInt(strList.size() / 2);

http://git-wip-us.apache.org/repos/asf/kylin/blob/30fb1e06/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java 
b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
new file mode 100644
index 0000000..d2af621
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
@@ -0,0 +1,265 @@
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by sunyerui on 16/7/12.
+ */
+public class CachedTreeMapTest {
+
+    public static class Key implements WritableComparable {
+        int keyInt;
+
+        public static Key of(int keyInt) {
+            Key newKey = new Key();
+            newKey.keyInt = keyInt;
+            return newKey;
+        }
+
+        @Override
+        public int compareTo(Object o) {
+            return keyInt - ((Key)o).keyInt;
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            out.writeInt(keyInt);
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException {
+            keyInt = in.readInt();
+        }
+
+        @Override
+        public String toString() {
+            return String.valueOf(keyInt);
+        }
+    }
+
+    public static boolean VALUE_WRITE_ERROR_TOGGLE = false;
+    public static class Value implements Writable {
+        String valueStr;
+
+        public static Value of(String valueStr) {
+            Value newValue = new Value();
+            newValue.valueStr = valueStr;
+            return newValue;
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            if (VALUE_WRITE_ERROR_TOGGLE) {
+                out.write(new byte[0]);
+                return;
+            }
+            out.writeUTF(valueStr);
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException {
+            valueStr = in.readUTF();
+        }
+    }
+
+    public static class CachedFileFilter implements FileFilter {
+        @Override
+        public boolean accept(File pathname) {
+            return pathname.getName().startsWith("cached_");
+        }
+    }
+
+    public static final String baseDir = "/tmp/kylin_cachedtreemap_test/";
+    public static final String backupDir = 
"/tmp/kylin_cachedtreemap_test.bak/";
+    public static final String tmpDir = "/tmp/kylin_cachedtreemap_test.tmp/";
+
+    @After
+    public void afterTest() {
+        File dir = new File(baseDir);
+        if (dir.exists()) {
+            for (File f : dir.listFiles()) {
+                f.delete();
+            }
+            dir.delete();
+        }
+
+        dir = new File(tmpDir);
+        if (dir.exists()) {
+            for (File f : dir.listFiles()) {
+                f.delete();
+            }
+            dir.delete();
+        }
+
+        dir = new File(backupDir);
+        if (dir.exists()) {
+            for (File f : dir.listFiles()) {
+                f.delete();
+            }
+            dir.delete();
+        }
+
+        VALUE_WRITE_ERROR_TOGGLE = false;
+    }
+
+    @Test
+    public void testCachedTreeMap() throws IOException {
+        CachedTreeMap map = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        map.put(Key.of(1), Value.of("a"));
+        map.put(Key.of(2), Value.of("b"));
+        map.put(Key.of(3), Value.of("c"));
+        map.put(Key.of(4), Value.of("d"));
+        map.put(Key.of(5), Value.of("e"));
+
+        File dir = new File(tmpDir);
+        assertEquals(3, dir.listFiles(new CachedFileFilter()).length);
+
+        DataOutputStream out = new DataOutputStream(new 
FileOutputStream(tmpDir+"/.index"));
+        map.write(out);
+        out.flush();
+        out.close();
+        map.commit(false);
+
+        dir = new File(baseDir);
+        assertEquals(5, dir.listFiles(new CachedFileFilter()).length);
+
+        DataInputStream in = new DataInputStream(new 
FileInputStream(baseDir+".index"));
+        CachedTreeMap map2 = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        map2.readFields(in);
+        assertEquals(5, map2.size());
+        assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr);
+
+        try {
+            map2.put(Key.of(6), Value.of("f"));
+            fail("Should be error when put value into immutable map");
+        } catch (AssertionError error) {
+        }
+
+        assertFalse(new File(tmpDir).exists());
+        assertFalse(new File(backupDir).exists());
+    }
+
+    @Test
+    public void testWriteFailed() throws IOException {
+        // normal case
+        CachedTreeMap map = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        map.put(Key.of(1), Value.of("a"));
+        map.put(Key.of(2), Value.of("b"));
+        map.put(Key.of(3), Value.of("c"));
+        map.remove(Key.of(3));
+        map.put(Key.of(4), Value.of("d"));
+
+        DataOutputStream out = new DataOutputStream(new 
FileOutputStream(tmpDir+".index"));
+        map.write(out);
+        out.flush();
+        out.close();
+        map.commit(false);
+
+        DataInputStream in = new DataInputStream(new 
FileInputStream(baseDir+".index"));
+        CachedTreeMap map2 = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        map2.readFields(in);
+        assertEquals(3, map2.size());
+        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
+
+        // suppose write value failed and didn't commit data
+        map = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        VALUE_WRITE_ERROR_TOGGLE = true;
+        map.put(Key.of(1), Value.of("aa"));
+        map.put(Key.of(2), Value.of("bb"));
+        VALUE_WRITE_ERROR_TOGGLE = false;
+        map.put(Key.of(3), Value.of("cc"));
+        map.put(Key.of(4), Value.of("dd"));
+        out = new DataOutputStream(new FileOutputStream(tmpDir+".index"));
+        map.write(out);
+        out.flush();
+        out.close();
+        // suppose write value failed and didn't commit data
+        //map.commit(false);
+
+        // read map data should not be modified
+        in = new DataInputStream(new FileInputStream(baseDir+".index"));
+        map2 = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        map2.readFields(in);
+        assertEquals(3, map2.size());
+        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
+
+        assertTrue(new File(tmpDir).exists());
+        assertFalse(new File(backupDir).exists());
+    }
+
+    @Test
+    public void testCommit() throws IOException {
+        CachedTreeMap map = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        map.put(Key.of(1), Value.of("a"));
+        map.put(Key.of(2), Value.of("b"));
+        map.put(Key.of(3), Value.of("c"));
+        map.put(Key.of(4), Value.of("d"));
+
+        DataOutputStream out = new DataOutputStream(new 
FileOutputStream(tmpDir+".index"));
+        map.write(out);
+        out.flush();
+        out.close();
+        map.commit(true);
+
+        assertTrue(new File(tmpDir).exists());
+        assertFalse(new File(backupDir).exists());
+
+        DataInputStream in = new DataInputStream(new 
FileInputStream(baseDir+".index"));
+        CachedTreeMap map2 = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        map2.readFields(in);
+        assertEquals(4, map2.size());
+        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
+
+        // continue modify map, but not commit
+        map.put(Key.of(1), Value.of("aa"));
+        map.put(Key.of(2), Value.of("bb"));
+        map.put(Key.of(3), Value.of("cc"));
+        map.put(Key.of(5), Value.of("e"));
+        map.put(Key.of(6), Value.of("f"));
+        out = new DataOutputStream(new FileOutputStream(tmpDir+".index"));
+        map.write(out);
+        out.flush();
+        out.close();
+
+        assertTrue(new File(tmpDir).exists());
+        assertEquals(6, new File(tmpDir).listFiles(new 
CachedFileFilter()).length);
+        assertEquals(4, new File(baseDir).listFiles(new 
CachedFileFilter()).length);
+
+        in = new DataInputStream(new FileInputStream(baseDir+".index"));
+        map2 = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        map2.readFields(in);
+        assertEquals(4, map2.size());
+        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
+
+        // commit data
+        map.commit(false);
+        assertFalse(new File(tmpDir).exists());
+        assertEquals(6, new File(baseDir).listFiles(new 
CachedFileFilter()).length);
+
+        in = new DataInputStream(new FileInputStream(baseDir+".index"));
+        map2 = 
CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+                
.persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+        map2.readFields(in);
+        assertEquals(6, map2.size());
+        assertEquals("aa", ((Value)map2.get(Key.of(1))).valueStr);
+        assertEquals("f", ((Value)map2.get(Key.of(6))).valueStr);
+    }
+}
+

Reply via email to