http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java
index 1b248e5..28ea238 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java
@@ -38,6 +38,21 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
 public class TableExtDesc extends RootPersistentEntity {
 
+    public static String concatRawResourcePath(String nameOnPath) {
+        return ResourceStore.TABLE_EXD_RESOURCE_ROOT + "/" + nameOnPath + 
".json";
+    }
+
+    public static String concatResourcePath(String tableIdentity, String prj) {
+        return concatRawResourcePath(TableDesc.makeResourceName(tableIdentity, 
prj));
+    }
+
+    // returns <table, project>
+    public static Pair<String, String> parseResourcePath(String path) {
+        return TableDesc.parseResourcePath(path);
+    }
+    
+    // 
============================================================================
+
     @JsonProperty("table_name")
     private String tableIdentity;
     @JsonProperty("last_build_job_id")
@@ -65,24 +80,13 @@ public class TableExtDesc extends RootPersistentEntity {
     public TableExtDesc() {
     }
 
-    public String getResourcePath() {
-        return concatResourcePath(getIdentity(), project);
-    }
-
-    public static String concatRawResourcePath(String nameOnPath) {
-        return ResourceStore.TABLE_EXD_RESOURCE_ROOT + "/" + nameOnPath + 
".json";
-    }
-
-    public static String concatResourcePath(String tableIdentity, String prj) {
-        if (prj == null)
-            return ResourceStore.TABLE_EXD_RESOURCE_ROOT + "/" + tableIdentity 
+ ".json";
-        else
-            return ResourceStore.TABLE_EXD_RESOURCE_ROOT + "/" + tableIdentity 
+ "--" + prj + ".json";
+    @Override
+    public String resourceName() {
+        return TableDesc.makeResourceName(getIdentity(), getProject());
     }
-
-    // returns <table, project>
-    public static Pair<String, String> parseResourcePath(String path) {
-        return TableDesc.parseResourcePath(path);
+    
+    public String getResourcePath() {
+        return concatResourcePath(getIdentity(), getProject());
     }
 
     public String getProject() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 7b1f840..0029de2 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -88,7 +88,7 @@ public class ProjectInstance extends RootPersistentEntity {
     private LinkedHashMap<String, String> overrideKylinProps;
 
     public String getResourcePath() {
-        return concatResourcePath(name);
+        return concatResourcePath(resourceName());
     }
 
     public static String concatResourcePath(String projectName) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 4622f35..1c0254e 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -70,12 +70,11 @@ public class ProjectManager {
     
     // project name => ProjrectInstance
     private CaseInsensitiveStringCache<ProjectInstance> projectMap;
+    private CachedCrudAssist<ProjectInstance> crud;
     
-    // protects concurrent operations around the projectMap, to avoid for 
example
-    // writing a project in the middle of reloading it (dirty read)
+    // protects concurrent operations around the cached map, to avoid for 
example
+    // writing an entity in the middle of reloading it (dirty read)
     private AutoReadWriteLock prjMapLock = new AutoReadWriteLock();
-    
-    private CachedCrudAssist<ProjectInstance> crud;
 
     private ProjectManager(KylinConfig config) throws IOException {
         logger.info("Initializing ProjectManager with metadata url " + config);
@@ -85,13 +84,14 @@ public class ProjectManager {
         this.crud = new CachedCrudAssist<ProjectInstance>(getStore(), 
ResourceStore.PROJECT_RESOURCE_ROOT,
                 ProjectInstance.class, projectMap) {
             @Override
-            protected void initEntityAfterReload(ProjectInstance prj) {
+            protected ProjectInstance initEntityAfterReload(ProjectInstance 
prj, String resourceName) {
                 prj.init();
+                return prj;
             }
         };
 
+        // touch lower level metadata before registering my listener
         crud.reloadAll();
-        
         Broadcaster.getInstance(config).registerListener(new 
ProjectSyncListener(), "project");
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
index 9fd6ede..335d3c8 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
@@ -34,6 +34,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
 public class StreamingConfig extends RootPersistentEntity {
 
@@ -47,6 +48,11 @@ public class StreamingConfig extends RootPersistentEntity {
     @JsonProperty("type")
     private String type = STREAMING_TYPE_KAFKA;
 
+    @Override
+    public String resourceName() {
+        return name;
+    }
+    
     public String getType() {
         return type;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
index b5d7e37..d720585 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
@@ -24,12 +24,12 @@ import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.common.util.AutoReadWriteLock;
+import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,10 +38,9 @@ import org.slf4j.LoggerFactory;
  */
 public class StreamingManager {
 
+    @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(StreamingManager.class);
 
-    public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new 
JsonSerializer<StreamingConfig>(StreamingConfig.class);
-
     public static StreamingManager getInstance(KylinConfig config) {
         return config.getManager(StreamingManager.class);
     }
@@ -52,29 +51,43 @@ public class StreamingManager {
     }
 
     // 
============================================================================
-    
+
     private KylinConfig config;
 
     // name ==> StreamingConfig
     private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
+    private CachedCrudAssist<StreamingConfig> crud;
+    private AutoReadWriteLock lock = new AutoReadWriteLock();
 
     private StreamingManager(KylinConfig config) throws IOException {
         this.config = config;
         this.streamingMap = new 
CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
-        
+        this.crud = new CachedCrudAssist<StreamingConfig>(getStore(), 
ResourceStore.STREAMING_RESOURCE_ROOT,
+                StreamingConfig.class, streamingMap) {
+            @Override
+            protected StreamingConfig initEntityAfterReload(StreamingConfig t, 
String resourceName) {
+                return t; // noop
+            }
+        };
+
         // touch lower level metadata before registering my listener
-        reloadAllStreaming();
+        crud.reloadAll();
         Broadcaster.getInstance(config).registerListener(new 
StreamingSyncListener(), "streaming");
     }
 
     private class StreamingSyncListener extends Broadcaster.Listener {
 
         @Override
-        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
-            if (event == Event.DROP)
-                removeStreamingLocal(cacheKey);
-            else
-                reloadStreamingConfigLocal(cacheKey);
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                throws IOException {
+            String streamingName = cacheKey;
+
+            try (AutoLock l = lock.lockForWrite()) {
+                if (event == Event.DROP)
+                    streamingMap.removeLocal(streamingName);
+                else
+                    crud.reloadQuietly(streamingName);
+            }
         }
     }
 
@@ -83,129 +96,57 @@ public class StreamingManager {
     }
 
     public StreamingConfig getStreamingConfig(String name) {
-        return streamingMap.get(name);
+        try (AutoLock l = lock.lockForRead()) {
+            return streamingMap.get(name);
+        }
     }
 
     public List<StreamingConfig> listAllStreaming() {
-        return new ArrayList<>(streamingMap.values());
-    }
-
-    /**
-     * Reload StreamingConfig from resource store It will be triggered by an 
desc
-     * update event.
-     *
-     * @param name
-     * @throws IOException
-     */
-    public StreamingConfig reloadStreamingConfigLocal(String name) throws 
IOException {
-
-        // Save Source
-        String path = StreamingConfig.concatResourcePath(name);
-
-        // Reload the StreamingConfig
-        StreamingConfig ndesc = loadStreamingConfigAt(path);
-
-        // Here replace the old one
-        streamingMap.putLocal(ndesc.getName(), ndesc);
-        return ndesc;
-    }
-
-    // remove streamingConfig
-    public void removeStreamingConfig(StreamingConfig streamingConfig) throws 
IOException {
-        String path = streamingConfig.getResourcePath();
-        getStore().deleteResource(path);
-        streamingMap.remove(streamingConfig.getName());
-    }
-
-    public StreamingConfig getConfig(String name) {
-        name = name.toUpperCase();
-        return streamingMap.get(name);
-    }
-
-    public void removeStreamingLocal(String streamingName) {
-        streamingMap.removeLocal(streamingName);
-    }
-
-    /**
-     * Update CubeDesc with the input. Broadcast the event into cluster
-     *
-     * @param desc
-     * @return
-     * @throws IOException
-     */
-    public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws 
IOException {
-        // Validate CubeDesc
-        if (desc.getUuid() == null || desc.getName() == null) {
-            throw new IllegalArgumentException("SteamingConfig Illegal.");
-        }
-        String name = desc.getName();
-        if (!streamingMap.containsKey(name)) {
-            throw new IllegalArgumentException("StreamingConfig '" + name + "' 
does not exist.");
+        try (AutoLock l = lock.lockForRead()) {
+            return new ArrayList<>(streamingMap.values());
         }
-
-        // Save Source
-        String path = desc.getResourcePath();
-        getStore().putResource(path, desc, STREAMING_SERIALIZER);
-
-        // Reload the StreamingConfig
-        StreamingConfig ndesc = loadStreamingConfigAt(path);
-        // Here replace the old one
-        streamingMap.put(ndesc.getName(), desc);
-
-        return ndesc;
     }
-
-    public StreamingConfig saveStreamingConfig(StreamingConfig 
streamingConfig) throws IOException {
-        if (streamingConfig == null || 
StringUtils.isEmpty(streamingConfig.getName())) {
-            throw new IllegalArgumentException();
-        }
-
-        if (streamingMap.containsKey(streamingConfig.getName()))
-            throw new IllegalArgumentException("StreamingConfig '" + 
streamingConfig.getName() + "' already exists");
-
-        String path = 
StreamingConfig.concatResourcePath(streamingConfig.getName());
-        getStore().putResource(path, streamingConfig, 
StreamingConfig.SERIALIZER);
-        streamingMap.put(streamingConfig.getName(), streamingConfig);
-        return streamingConfig;
+    
+    // for test
+    List<StreamingConfig> reloadAll() throws IOException {
+        try (AutoLock l = lock.lockForWrite()) {
+            crud.reloadAll();
+            return listAllStreaming();
+        }        
     }
 
-    private StreamingConfig loadStreamingConfigAt(String path) throws 
IOException {
-        ResourceStore store = getStore();
-        StreamingConfig streamingDesc = store.getResource(path, 
StreamingConfig.class, STREAMING_SERIALIZER);
+    public StreamingConfig createStreamingConfig(StreamingConfig 
streamingConfig) throws IOException {
+        try (AutoLock l = lock.lockForWrite()) {
+            if (streamingConfig == null || 
StringUtils.isEmpty(streamingConfig.getName())) {
+                throw new IllegalArgumentException();
+            }
+            if (streamingMap.containsKey(streamingConfig.resourceName()))
+                throw new IllegalArgumentException(
+                        "StreamingConfig '" + streamingConfig.getName() + "' 
already exists");
 
-        if (StringUtils.isBlank(streamingDesc.getName())) {
-            throw new IllegalStateException("StreamingConfig name must not be 
blank");
+            streamingConfig.updateRandomUuid();
+            
+            return crud.save(streamingConfig);
         }
-        return streamingDesc;
     }
 
-    private void reloadAllStreaming() throws IOException {
-        ResourceStore store = getStore();
-        logger.info("Reloading Streaming Metadata from folder " + 
store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
-
-        streamingMap.clear();
-
-        List<String> paths = 
store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, 
MetadataConstants.FILE_SURFIX);
-        for (String path : paths) {
-            StreamingConfig streamingConfig;
-            try {
-                streamingConfig = loadStreamingConfigAt(path);
-            } catch (Exception e) {
-                logger.error("Error loading streaming desc " + path, e);
-                continue;
-            }
-            if (path.equals(streamingConfig.getResourcePath()) == false) {
-                logger.error("Skip suspicious desc at " + path + ", " + 
streamingConfig + " should be at " + streamingConfig.getResourcePath());
-                continue;
+    public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws 
IOException {
+        try (AutoLock l = lock.lockForWrite()) {
+            if (desc.getUuid() == null || desc.getName() == null) {
+                throw new IllegalArgumentException("SteamingConfig Illegal.");
             }
-            if (streamingMap.containsKey(streamingConfig.getName())) {
-                logger.error("Dup StreamingConfig name '" + 
streamingConfig.getName() + "' on path " + path);
-                continue;
+            if (!streamingMap.containsKey(desc.resourceName())) {
+                throw new IllegalArgumentException("StreamingConfig '" + 
desc.getName() + "' does not exist.");
             }
 
-            streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
+            return crud.save(desc);
         }
+    }
 
-        logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
+    public void removeStreamingConfig(StreamingConfig streamingConfig) throws 
IOException {
+        try (AutoLock l = lock.lockForWrite()) {
+            crud.delete(streamingConfig);
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java
index f2baf29..663816c 100644
--- 
a/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java
@@ -46,21 +46,21 @@ public class TempStatementManagerTest extends 
LocalFileMetadataTestCase {
     public void testAddTempStatement() throws IOException {
         TempStatementManager manager = 
TempStatementManager.getInstance(getTestConfig());
         manager.updateTempStatement("temp_table3", "AAAAA");
-        Assert.assertEquals(3, manager.listAllTempStatement().size());
+        Assert.assertEquals(3, manager.reloadAllTempStatement().size());
     }
 
     @Test
     public void testRemoveTempStatement() throws IOException {
         TempStatementManager manager = 
TempStatementManager.getInstance(getTestConfig());
         manager.removeTempStatement("temp_table1");
-        Assert.assertEquals(1, manager.listAllTempStatement().size());
+        Assert.assertEquals(1, manager.reloadAllTempStatement().size());
     }
 
     @Test
     public void testUpdateTempStatement() throws IOException {
         TempStatementManager manager = 
TempStatementManager.getInstance(getTestConfig());
         manager.updateTempStatement("temp_table1", "AAAAA");
-        Assert.assertEquals(2, manager.listAllTempStatement().size());
+        Assert.assertEquals(2, manager.reloadAllTempStatement().size());
         Assert.assertEquals("AAAAA", manager.getTempStatement("temp_table1"));
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/StreamingManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/StreamingManagerTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/StreamingManagerTest.java
new file mode 100644
index 0000000..798deb0
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/StreamingManagerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.streaming;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StreamingManagerTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testBasics() throws IOException {
+        StreamingManager mgr = StreamingManager.getInstance(getTestConfig());
+        List<StreamingConfig> origin = mgr.listAllStreaming();
+
+        // test create
+        {
+            StreamingConfig streamingConfig = new StreamingConfig();
+            streamingConfig.setName("name for test");
+            streamingConfig.setType("type for test");
+            mgr.createStreamingConfig(streamingConfig);
+            List<StreamingConfig> reloadAll = mgr.reloadAll();
+            Assert.assertTrue(origin.size() + 1 == reloadAll.size());
+        }
+
+        // test update
+        {
+            StreamingConfig streamingConfig = mgr.getStreamingConfig("name for 
test");
+            streamingConfig.setType("updated type");
+            mgr.updateStreamingConfig(streamingConfig);
+            List<StreamingConfig> reloadAll = mgr.reloadAll();
+            Assert.assertTrue(origin.size() + 1 == reloadAll.size());
+            streamingConfig = mgr.getStreamingConfig("name for test");
+            Assert.assertEquals("updated type", streamingConfig.getType());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 14ef524..9fbb7f3 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -50,16 +50,27 @@ import com.google.common.collect.Lists;
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
 public class HybridInstance extends RootPersistentEntity implements 
IRealization {
 
+    private final static Logger logger = 
LoggerFactory.getLogger(HybridInstance.class);
+
+    public static HybridInstance create(KylinConfig config, String name, 
List<RealizationEntry> realizationEntries) {
+        HybridInstance hybridInstance = new HybridInstance();
+
+        hybridInstance.setConfig(config);
+        hybridInstance.setName(name);
+        hybridInstance.setRealizationEntries(realizationEntries);
+        hybridInstance.updateRandomUuid();
+
+        return hybridInstance;
+    }
+    
+    // 
============================================================================
+
     @JsonIgnore
     private KylinConfig config;
 
     @JsonProperty("name")
     private String name;
 
-    public void setRealizationEntries(List<RealizationEntry> 
realizationEntries) {
-        this.realizationEntries = realizationEntries;
-    }
-
     @JsonProperty("realizations")
     private List<RealizationEntry> realizationEntries;
 
@@ -75,21 +86,17 @@ public class HybridInstance extends RootPersistentEntity 
implements IRealization
     private long dateRangeEnd;
     private boolean isReady = false;
 
-    private final static Logger logger = 
LoggerFactory.getLogger(HybridInstance.class);
-
+    @Override
+    public String resourceName() {
+        return name;
+    }
+    
     public List<RealizationEntry> getRealizationEntries() {
         return realizationEntries;
     }
 
-    public static HybridInstance create(KylinConfig config, String name, 
List<RealizationEntry> realizationEntries) {
-        HybridInstance hybridInstance = new HybridInstance();
-
-        hybridInstance.setConfig(config);
-        hybridInstance.setName(name);
-        hybridInstance.setRealizationEntries(realizationEntries);
-        hybridInstance.updateRandomUuid();
-
-        return hybridInstance;
+    public void setRealizationEntries(List<RealizationEntry> 
realizationEntries) {
+        this.realizationEntries = realizationEntries;
     }
 
     private void init() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java 
b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 37f4aff..1e56c73 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -21,13 +21,15 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.AutoReadWriteLock;
+import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
@@ -43,9 +45,10 @@ import com.google.common.collect.Lists;
 /**
  */
 public class HybridManager implements IRealizationProvider {
-    public static final Serializer<HybridInstance> HYBRID_SERIALIZER = new 
JsonSerializer<HybridInstance>(HybridInstance.class);
 
     private static final Logger logger = 
LoggerFactory.getLogger(HybridManager.class);
+    
+    public static final Serializer<HybridInstance> HYBRID_SERIALIZER = new 
JsonSerializer<>(HybridInstance.class);
 
     public static HybridManager getInstance(KylinConfig config) {
         return config.getManager(HybridManager.class);
@@ -61,14 +64,24 @@ public class HybridManager implements IRealizationProvider {
     private KylinConfig config;
 
     private CaseInsensitiveStringCache<HybridInstance> hybridMap;
+    private CachedCrudAssist<HybridInstance> crud;
+    private AutoReadWriteLock lock = new AutoReadWriteLock();
 
-    private HybridManager(KylinConfig config) throws IOException {
-        logger.info("Initializing HybridManager with config " + config);
-        this.config = config;
+    private HybridManager(KylinConfig cfg) throws IOException {
+        logger.info("Initializing HybridManager with config " + cfg);
+        this.config = cfg;
         this.hybridMap = new 
CaseInsensitiveStringCache<HybridInstance>(config, "hybrid");
+        this.crud = new CachedCrudAssist<HybridInstance>(getStore(), 
ResourceStore.HYBRID_RESOURCE_ROOT,
+                HybridInstance.class, hybridMap) {
+            @Override
+            protected HybridInstance initEntityAfterReload(HybridInstance 
hybridInstance, String resourceName) {
+                hybridInstance.setConfig(config);
+                return hybridInstance; // noop
+            }
+        };
 
         // touch lower level metadata before registering my listener
-        reloadAllHybridInstance();
+        crud.reloadAll();
         Broadcaster.getInstance(config).registerListener(new 
HybridSyncListener(), "hybrid", "cube");
     }
 
@@ -76,89 +89,57 @@ public class HybridManager implements IRealizationProvider {
 
         @Override
         public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {
-            for (IRealization real : 
ProjectManager.getInstance(config).listAllRealizations(project)) {
-                if (real instanceof HybridInstance) {
-                    reloadHybridInstance(real.getName());
+            try (AutoLock l = lock.lockForWrite()) {
+                for (IRealization real : 
ProjectManager.getInstance(config).listAllRealizations(project)) {
+                    if (real instanceof HybridInstance) {
+                        crud.reloadQuietly(real.getName());
+                    }
                 }
             }
         }
 
         @Override
-        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                throws IOException {
             if ("hybrid".equals(entity)) {
                 String hybridName = cacheKey;
 
-                if (event == Event.DROP)
-                    hybridMap.removeLocal(hybridName);
-                else
-                    reloadHybridInstance(hybridName);
+                try (AutoLock l = lock.lockForWrite()) {
+                    if (event == Event.DROP)
+                        hybridMap.removeLocal(hybridName);
+                    else
+                        crud.reloadQuietly(hybridName);
+                }
 
-                for (ProjectInstance prj : 
ProjectManager.getInstance(config).findProjects(RealizationType.HYBRID, 
hybridName)) {
+                for (ProjectInstance prj : 
ProjectManager.getInstance(config).findProjects(RealizationType.HYBRID,
+                        hybridName)) {
                     broadcaster.notifyProjectSchemaUpdate(prj.getName());
                 }
             } else if ("cube".equals(entity)) {
                 String cubeName = cacheKey;
-                for (HybridInstance hybrid : 
getHybridInstancesByChild(RealizationType.CUBE, cubeName)) {
-                    reloadHybridInstance(hybrid.getName());
+                try (AutoLock l = lock.lockForWrite()) {
+                    for (HybridInstance hybrid : 
getHybridInstancesByChild(RealizationType.CUBE, cubeName)) {
+                        crud.reloadQuietly(hybrid.getName());
+                    }
                 }
             }
         }
     }
 
-    public void reloadAllHybridInstance() throws IOException {
-        ResourceStore store = getStore();
-        List<String> paths = 
store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json");
-
-        hybridMap.clear();
-        logger.debug("Loading Hybrid from folder " + 
store.getReadableResourcePath(ResourceStore.HYBRID_RESOURCE_ROOT));
-
-        for (String path : paths) {
-            reloadHybridInstanceAt(path);
-        }
-
-        logger.debug("Loaded " + paths.size() + " Hybrid(s)");
-    }
-
     public List<HybridInstance> getHybridInstancesByChild(RealizationType 
type, String realizationName) {
-        List<HybridInstance> result = Lists.newArrayList();
-        for (HybridInstance hybridInstance : hybridMap.values()) {
-            for (RealizationEntry realizationEntry : 
hybridInstance.getRealizationEntries()) {
-                if (realizationEntry.getType() == type && 
realizationEntry.getRealization().equalsIgnoreCase(realizationName)) {
-                    result.add(hybridInstance);
+        try (AutoLock l = lock.lockForRead()) {
+            List<HybridInstance> result = Lists.newArrayList();
+            for (HybridInstance hybridInstance : hybridMap.values()) {
+                for (RealizationEntry realizationEntry : 
hybridInstance.getRealizationEntries()) {
+                    if (realizationEntry.getType() == type
+                            && 
realizationEntry.getRealization().equalsIgnoreCase(realizationName)) {
+                        result.add(hybridInstance);
+                    }
                 }
-            }
-
-        }
-
-        return result;
-    }
 
-    public void reloadHybridInstance(String name) {
-        reloadHybridInstanceAt(HybridInstance.concatResourcePath(name));
-    }
-
-    private synchronized HybridInstance reloadHybridInstanceAt(String path) {
-        ResourceStore store = getStore();
-
-        HybridInstance hybridInstance = null;
-        try {
-            hybridInstance = store.getResource(path, HybridInstance.class, 
HYBRID_SERIALIZER);
-            hybridInstance.setConfig(config);
-
-            if (hybridInstance.getRealizationEntries() == null || 
hybridInstance.getRealizationEntries().size() == 0) {
-                throw new IllegalStateException("HybridInstance must have 
realization entries, " + path);
             }
 
-            if (StringUtils.isBlank(hybridInstance.getName()))
-                throw new IllegalStateException("HybridInstance name must not 
be blank, at " + path);
-
-            final String name = hybridInstance.getName();
-            hybridMap.putLocal(name, hybridInstance);
-
-            return hybridInstance;
-        } catch (Exception e) {
-            logger.error("Error during load hybrid instance " + path, e);
-            return null;
+            return result;
         }
     }
 
@@ -173,11 +154,27 @@ public class HybridManager implements 
IRealizationProvider {
     }
 
     public Collection<HybridInstance> listHybridInstances() {
-        return hybridMap.values();
+        try (AutoLock l = lock.lockForRead()) {
+            return hybridMap.values();
+        }
     }
 
     public HybridInstance getHybridInstance(String name) {
-        return hybridMap.get(name);
+        try (AutoLock l = lock.lockForRead()) {
+            return hybridMap.get(name);
+        }
+    }
+    
+    public HybridInstance reloadHybridInstance(String name) {
+        try (AutoLock l = lock.lockForWrite()) {
+            return crud.reload(name);
+        }
+    }
+    
+    public void reloadAllHybridInstance() throws IOException {
+        try (AutoLock l = lock.lockForWrite()) {
+            crud.reloadAll();
+        }
     }
 
     private ResourceStore getStore() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index c8aee5d..872deed 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -182,7 +182,7 @@ public class SparkCubing extends AbstractApplication {
     private void writeDictionary(Dataset<Row> intermediateTable, String 
cubeName, String segmentId) throws Exception {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-        final CubeInstance cubeInstance = 
cubeManager.reloadCubeLocal(cubeName);
+        final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
         final String[] columns = intermediateTable.columns();
         final CubeSegment seg = cubeInstance.getSegmentById(segmentId);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -249,7 +249,7 @@ public class SparkCubing extends AbstractApplication {
     }
 
     private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> 
rowJavaRDD, final String cubeName, String segmentId) throws Exception {
-        CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         CubeDesc cubeDesc = cubeInstance.getDescriptor();
         CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler();

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json
----------------------------------------------------------------------
diff --git 
a/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json
 
b/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json
index 8d534e8..3bf27f9 100644
--- 
a/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json
+++ 
b/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json
@@ -1,4 +1,5 @@
 {
+  "uuid" : "da93ed8c-aed7-4a98-ba05-28c89cfa8ee2",
   "session_id" : "DEFAULT_SESSION",
   "statement_id": "temp_table1",
   "statement": "as (select * from fact_table1)"

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json
----------------------------------------------------------------------
diff --git 
a/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json
 
b/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json
index f4f5fad..b2b6a79 100644
--- 
a/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json
+++ 
b/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json
@@ -1,4 +1,5 @@
 {
+  "uuid" : "9f37ed8c-aed7-4a98-ba05-28c89cfac870",
   "session_id" : "DEFAULT_SESSION",
   "statement_id": "temp_table2",
   "statement": "as (select * from fact_table2)"

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 239b4af..d338332 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -131,7 +131,7 @@ public class BuildCubeWithStream {
         BrokerConfig brokerConfig = 
kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0);
         brokerConfig.setHost(localIp);
         kafkaConfig.setTopic(topicName);
-        
KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig);
+        
KafkaConfigManager.getInstance(kylinConfig).updateKafkaConfig(kafkaConfig);
 
         startEmbeddedKafka(topicName, brokerConfig);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java
index afa914c..cc4b736 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java
@@ -206,7 +206,7 @@ public class ModelController extends BasicController {
             newModelDesc = modelService.createModelDesc(project, newModelDesc);
 
             //reload avoid shallow
-            
metaManager.reloadDataModelDescAt(DataModelDesc.concatResourcePath(newModelName));
+            metaManager.reloadDataModel(newModelName);
         } catch (IOException e) {
             throw new InternalErrorException("failed to clone DataModelDesc", 
e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
 
b/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
index 260cbc0..32c7339 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
@@ -75,7 +75,7 @@ public class KafkaConfigService extends BasicService {
         if (getKafkaManager().getKafkaConfig(config.getName()) != null) {
             throw new 
BadRequestException(String.format(msg.getKAFKA_CONFIG_ALREADY_EXIST(), 
config.getName()));
         }
-        getKafkaManager().createKafkaConfig(config.getName(), config);
+        getKafkaManager().createKafkaConfig(config);
         return config;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index 1f907f8..d4d7cc7 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -41,7 +41,7 @@ public class StreamingService extends BasicService {
         if (StringUtils.isEmpty(table)) {
             streamingConfigs = getStreamingManager().listAllStreaming();
         } else {
-            StreamingConfig config = getStreamingManager().getConfig(table);
+            StreamingConfig config = 
getStreamingManager().getStreamingConfig(table);
             if (config != null) {
                 streamingConfigs.add(config);
             }
@@ -73,7 +73,7 @@ public class StreamingService extends BasicService {
         if (getStreamingManager().getStreamingConfig(config.getName()) != 
null) {
             throw new 
BadRequestException(String.format(msg.getSTREAMING_CONFIG_ALREADY_EXIST(), 
config.getName()));
         }
-        StreamingConfig streamingConfig = 
getStreamingManager().saveStreamingConfig(config);
+        StreamingConfig streamingConfig = 
getStreamingManager().createStreamingConfig(config);
         return streamingConfig;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 5e451b4..ad6a46a 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -24,12 +24,12 @@ import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.common.util.AutoReadWriteLock;
+import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
@@ -39,10 +39,9 @@ import org.slf4j.LoggerFactory;
  */
 public class KafkaConfigManager {
 
+    @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(KafkaConfigManager.class);
 
-    public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new 
JsonSerializer<KafkaConfig>(KafkaConfig.class);
-    
     public static KafkaConfigManager getInstance(KylinConfig config) {
         return config.getManager(KafkaConfigManager.class);
     }
@@ -55,27 +54,39 @@ public class KafkaConfigManager {
     // 
============================================================================
 
     private KylinConfig config;
-    
+
     // name ==> StreamingConfig
     private CaseInsensitiveStringCache<KafkaConfig> kafkaMap;
-    
+    private CachedCrudAssist<KafkaConfig> crud;
+    private AutoReadWriteLock lock = new AutoReadWriteLock();
+
     private KafkaConfigManager(KylinConfig config) throws IOException {
         this.config = config;
         this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, 
"kafka");
+        this.crud = new CachedCrudAssist<KafkaConfig>(getStore(), 
ResourceStore.KAFKA_RESOURCE_ROOT, KafkaConfig.class,
+                kafkaMap) {
+            @Override
+            protected KafkaConfig initEntityAfterReload(KafkaConfig t, String 
resourceName) {
+                return t; // noop
+            }
+        };
 
         // touch lower level metadata before registering my listener
-        reloadAllKafkaConfig();
+        crud.reloadAll();
         Broadcaster.getInstance(config).registerListener(new 
KafkaSyncListener(), "kafka");
     }
 
     private class KafkaSyncListener extends Broadcaster.Listener {
 
         @Override
-        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
-            if (event == Event.DROP)
-                removeKafkaConfigLocal(cacheKey);
-            else
-                reloadKafkaConfigLocal(cacheKey);
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                throws IOException {
+            try (AutoLock l = lock.lockForWrite()) {
+                if (event == Event.DROP)
+                    kafkaMap.removeLocal(cacheKey);
+                else
+                    crud.reloadQuietly(cacheKey);
+            }
         }
     }
 
@@ -83,85 +94,45 @@ public class KafkaConfigManager {
         return ResourceStore.getStore(this.config);
     }
 
-    public List<KafkaConfig> listAllKafkaConfigs() {
-        return new ArrayList(kafkaMap.values());
+    public KafkaConfig getKafkaConfig(String name) {
+        try (AutoLock l = lock.lockForRead()) {
+            return kafkaMap.get(name);
+        }
     }
 
-    /**
-     * Reload KafkaConfig from resource store It will be triggered by an desc
-     * update event.
-     *
-     * @param name
-     * @throws IOException
-     */
-    public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException {
-
-        // Save Source
-        String path = KafkaConfig.concatResourcePath(name);
-
-        // Reload the KafkaConfig
-        KafkaConfig ndesc = loadKafkaConfigAt(path);
-
-        // Here replace the old one
-        kafkaMap.putLocal(ndesc.getName(), ndesc);
-        return ndesc;
+    public List<KafkaConfig> listAllKafkaConfigs() {
+        try (AutoLock l = lock.lockForRead()) {
+            return new ArrayList(kafkaMap.values());
+        }
     }
 
-    public boolean createKafkaConfig(String name, KafkaConfig config) {
+    public boolean createKafkaConfig(KafkaConfig kafkaConfig) throws 
IOException {
+        try (AutoLock l = lock.lockForWrite()) {
 
-        if (config == null || StringUtils.isEmpty(config.getName())) {
-            throw new IllegalArgumentException();
-        }
+            if (kafkaMap.containsKey(kafkaConfig.resourceName()))
+                throw new IllegalArgumentException("KafkaConfig '" + 
kafkaConfig.getName() + "' already exists");
+
+            kafkaConfig.updateRandomUuid();
+            checkKafkaConfig(kafkaConfig);
 
-        if (kafkaMap.containsKey(config.getName()))
-            throw new IllegalArgumentException("KafkaConfig '" + 
config.getName() + "' already exists");
-        try {
-            getStore().putResource(KafkaConfig.concatResourcePath(name), 
config, KafkaConfig.SERIALIZER);
-            kafkaMap.put(config.getName(), config);
+            crud.save(kafkaConfig);
             return true;
-        } catch (IOException e) {
-            logger.error("error save resource name:" + name, e);
-            throw new RuntimeException("error save resource name:" + name, e);
         }
     }
 
-    public KafkaConfig updateKafkaConfig(KafkaConfig desc) throws IOException {
-        // Validate KafkaConfig
-        if (desc.getUuid() == null || desc.getName() == null) {
-            throw new IllegalArgumentException();
-        }
-        String name = desc.getName();
-        if (!kafkaMap.containsKey(name)) {
-            throw new IllegalArgumentException("KafkaConfig '" + name + "' 
does not exist.");
-        }
-
-        // Save Source
-        String path = desc.getResourcePath();
-        getStore().putResource(path, desc, KAFKA_SERIALIZER);
+    public KafkaConfig updateKafkaConfig(KafkaConfig kafkaConfig) throws 
IOException {
+        try (AutoLock l = lock.lockForWrite()) {
 
-        // Reload the KafkaConfig
-        KafkaConfig ndesc = loadKafkaConfigAt(path);
-        // Here replace the old one
-        kafkaMap.put(ndesc.getName(), desc);
+            if (!kafkaMap.containsKey(kafkaConfig.resourceName()))
+                throw new IllegalArgumentException("KafkaConfig '" + 
kafkaConfig.getName() + "' does not exist.");
 
-        return ndesc;
-    }
-
-    private KafkaConfig loadKafkaConfigAt(String path) throws IOException {
-        ResourceStore store = getStore();
-        KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class, 
KAFKA_SERIALIZER);
-
-        if (StringUtils.isBlank(kafkaConfig.getName())) {
-            throw new IllegalStateException("KafkaConfig name must not be 
blank");
+            checkKafkaConfig(kafkaConfig);
+            
+            return crud.save(kafkaConfig);
         }
-        return kafkaConfig;
     }
 
-    public KafkaConfig getKafkaConfig(String name) {
-        return kafkaMap.get(name);
-    }
-
-    public void saveKafkaConfig(KafkaConfig kafkaConfig) throws IOException {
+    private void checkKafkaConfig(KafkaConfig kafkaConfig) {
         if (kafkaConfig == null || StringUtils.isEmpty(kafkaConfig.getName())) 
{
             throw new IllegalArgumentException();
         }
@@ -173,50 +144,13 @@ public class KafkaConfigManager {
         if (kafkaConfig.getKafkaClusterConfigs() == null || 
kafkaConfig.getKafkaClusterConfigs().size() == 0) {
             throw new IllegalArgumentException("No cluster info");
         }
-
-        String path = KafkaConfig.concatResourcePath(kafkaConfig.getName());
-        getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
     }
 
     // remove kafkaConfig
     public void removeKafkaConfig(KafkaConfig kafkaConfig) throws IOException {
-        String path = kafkaConfig.getResourcePath();
-        getStore().deleteResource(path);
-        kafkaMap.remove(kafkaConfig.getName());
-    }
-
-    private void removeKafkaConfigLocal(String name) {
-        kafkaMap.remove(name);
-    }
-
-    private void reloadAllKafkaConfig() throws IOException {
-        ResourceStore store = getStore();
-        logger.info("Reloading Kafka Metadata from folder " + 
store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
-
-        kafkaMap.clear();
-
-        List<String> paths = 
store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT, 
MetadataConstants.FILE_SURFIX);
-        for (String path : paths) {
-            KafkaConfig kafkaConfig;
-            try {
-                kafkaConfig = loadKafkaConfigAt(path);
-            } catch (Exception e) {
-                logger.error("Error loading kafkaConfig desc " + path, e);
-                continue;
-            }
-            if (path.equals(kafkaConfig.getResourcePath()) == false) {
-                logger.error("Skip suspicious desc at " + path + ", " + 
kafkaConfig + " should be at " + kafkaConfig.getResourcePath());
-                continue;
-            }
-            if (kafkaMap.containsKey(kafkaConfig.getName())) {
-                logger.error("Dup KafkaConfig name '" + kafkaConfig.getName() 
+ "' on path " + path);
-                continue;
-            }
-
-            kafkaMap.putLocal(kafkaConfig.getName(), kafkaConfig);
+        try (AutoLock l = lock.lockForWrite()) {
+            crud.delete(kafkaConfig);
         }
-
-        logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 82b8902..696c20c 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -30,14 +30,15 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.source.kafka.TimedJsonStreamParser;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonManagedReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kylin.source.kafka.TimedJsonStreamParser;
 
 /**
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
 public class KafkaConfig extends RootPersistentEntity {
 
@@ -70,6 +71,11 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("parserProperties")
     private String parserProperties;
 
+    @Override
+    public String resourceName() {
+        return name;
+    }
+    
     public String getResourcePath() {
         return concatResourcePath(name);
     }

Reply via email to