http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java index 1b248e5..28ea238 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableExtDesc.java @@ -38,6 +38,21 @@ import com.fasterxml.jackson.annotation.JsonProperty; @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class TableExtDesc extends RootPersistentEntity { + public static String concatRawResourcePath(String nameOnPath) { + return ResourceStore.TABLE_EXD_RESOURCE_ROOT + "/" + nameOnPath + ".json"; + } + + public static String concatResourcePath(String tableIdentity, String prj) { + return concatRawResourcePath(TableDesc.makeResourceName(tableIdentity, prj)); + } + + // returns <table, project> + public static Pair<String, String> parseResourcePath(String path) { + return TableDesc.parseResourcePath(path); + } + + // ============================================================================ + @JsonProperty("table_name") private String tableIdentity; @JsonProperty("last_build_job_id") @@ -65,24 +80,13 @@ public class TableExtDesc extends RootPersistentEntity { public TableExtDesc() { } - public String getResourcePath() { - return concatResourcePath(getIdentity(), project); - } - - public static String concatRawResourcePath(String nameOnPath) { - return ResourceStore.TABLE_EXD_RESOURCE_ROOT + "/" + nameOnPath + ".json"; - } - - public static String concatResourcePath(String tableIdentity, String prj) { - if (prj == null) - return ResourceStore.TABLE_EXD_RESOURCE_ROOT + "/" + tableIdentity + ".json"; - else - return ResourceStore.TABLE_EXD_RESOURCE_ROOT + "/" + tableIdentity + "--" + prj + ".json"; + @Override + public String resourceName() { + return TableDesc.makeResourceName(getIdentity(), getProject()); } - - // returns <table, project> - public static Pair<String, String> parseResourcePath(String path) { - return TableDesc.parseResourcePath(path); + + public String getResourcePath() { + return concatResourcePath(getIdentity(), getProject()); } public String getProject() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java index 7b1f840..0029de2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java @@ -88,7 +88,7 @@ public class ProjectInstance extends RootPersistentEntity { private LinkedHashMap<String, String> overrideKylinProps; public String getResourcePath() { - return concatResourcePath(name); + return concatResourcePath(resourceName()); } public static String concatResourcePath(String projectName) { http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index 4622f35..1c0254e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -70,12 +70,11 @@ public class ProjectManager { // project name => ProjrectInstance private CaseInsensitiveStringCache<ProjectInstance> projectMap; + private CachedCrudAssist<ProjectInstance> crud; - // protects concurrent operations around the projectMap, to avoid for example - // writing a project in the middle of reloading it (dirty read) + // protects concurrent operations around the cached map, to avoid for example + // writing an entity in the middle of reloading it (dirty read) private AutoReadWriteLock prjMapLock = new AutoReadWriteLock(); - - private CachedCrudAssist<ProjectInstance> crud; private ProjectManager(KylinConfig config) throws IOException { logger.info("Initializing ProjectManager with metadata url " + config); @@ -85,13 +84,14 @@ public class ProjectManager { this.crud = new CachedCrudAssist<ProjectInstance>(getStore(), ResourceStore.PROJECT_RESOURCE_ROOT, ProjectInstance.class, projectMap) { @Override - protected void initEntityAfterReload(ProjectInstance prj) { + protected ProjectInstance initEntityAfterReload(ProjectInstance prj, String resourceName) { prj.init(); + return prj; } }; + // touch lower level metadata before registering my listener crud.reloadAll(); - Broadcaster.getInstance(config).registerListener(new ProjectSyncListener(), "project"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java index 9fd6ede..335d3c8 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java @@ -34,6 +34,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; /** */ +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class StreamingConfig extends RootPersistentEntity { @@ -47,6 +48,11 @@ public class StreamingConfig extends RootPersistentEntity { @JsonProperty("type") private String type = STREAMING_TYPE_KAFKA; + @Override + public String resourceName() { + return name; + } + public String getType() { return type; } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java index b5d7e37..d720585 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java @@ -24,12 +24,12 @@ import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.common.util.AutoReadWriteLock; +import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; +import org.apache.kylin.metadata.cachesync.CachedCrudAssist; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +38,9 @@ import org.slf4j.LoggerFactory; */ public class StreamingManager { + @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class); - public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class); - public static StreamingManager getInstance(KylinConfig config) { return config.getManager(StreamingManager.class); } @@ -52,29 +51,43 @@ public class StreamingManager { } // ============================================================================ - + private KylinConfig config; // name ==> StreamingConfig private CaseInsensitiveStringCache<StreamingConfig> streamingMap; + private CachedCrudAssist<StreamingConfig> crud; + private AutoReadWriteLock lock = new AutoReadWriteLock(); private StreamingManager(KylinConfig config) throws IOException { this.config = config; this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming"); - + this.crud = new CachedCrudAssist<StreamingConfig>(getStore(), ResourceStore.STREAMING_RESOURCE_ROOT, + StreamingConfig.class, streamingMap) { + @Override + protected StreamingConfig initEntityAfterReload(StreamingConfig t, String resourceName) { + return t; // noop + } + }; + // touch lower level metadata before registering my listener - reloadAllStreaming(); + crud.reloadAll(); Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming"); } private class StreamingSyncListener extends Broadcaster.Listener { @Override - public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { - if (event == Event.DROP) - removeStreamingLocal(cacheKey); - else - reloadStreamingConfigLocal(cacheKey); + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) + throws IOException { + String streamingName = cacheKey; + + try (AutoLock l = lock.lockForWrite()) { + if (event == Event.DROP) + streamingMap.removeLocal(streamingName); + else + crud.reloadQuietly(streamingName); + } } } @@ -83,129 +96,57 @@ public class StreamingManager { } public StreamingConfig getStreamingConfig(String name) { - return streamingMap.get(name); + try (AutoLock l = lock.lockForRead()) { + return streamingMap.get(name); + } } public List<StreamingConfig> listAllStreaming() { - return new ArrayList<>(streamingMap.values()); - } - - /** - * Reload StreamingConfig from resource store It will be triggered by an desc - * update event. - * - * @param name - * @throws IOException - */ - public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException { - - // Save Source - String path = StreamingConfig.concatResourcePath(name); - - // Reload the StreamingConfig - StreamingConfig ndesc = loadStreamingConfigAt(path); - - // Here replace the old one - streamingMap.putLocal(ndesc.getName(), ndesc); - return ndesc; - } - - // remove streamingConfig - public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException { - String path = streamingConfig.getResourcePath(); - getStore().deleteResource(path); - streamingMap.remove(streamingConfig.getName()); - } - - public StreamingConfig getConfig(String name) { - name = name.toUpperCase(); - return streamingMap.get(name); - } - - public void removeStreamingLocal(String streamingName) { - streamingMap.removeLocal(streamingName); - } - - /** - * Update CubeDesc with the input. Broadcast the event into cluster - * - * @param desc - * @return - * @throws IOException - */ - public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException { - // Validate CubeDesc - if (desc.getUuid() == null || desc.getName() == null) { - throw new IllegalArgumentException("SteamingConfig Illegal."); - } - String name = desc.getName(); - if (!streamingMap.containsKey(name)) { - throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist."); + try (AutoLock l = lock.lockForRead()) { + return new ArrayList<>(streamingMap.values()); } - - // Save Source - String path = desc.getResourcePath(); - getStore().putResource(path, desc, STREAMING_SERIALIZER); - - // Reload the StreamingConfig - StreamingConfig ndesc = loadStreamingConfigAt(path); - // Here replace the old one - streamingMap.put(ndesc.getName(), desc); - - return ndesc; } - - public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException { - if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) { - throw new IllegalArgumentException(); - } - - if (streamingMap.containsKey(streamingConfig.getName())) - throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists"); - - String path = StreamingConfig.concatResourcePath(streamingConfig.getName()); - getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER); - streamingMap.put(streamingConfig.getName(), streamingConfig); - return streamingConfig; + + // for test + List<StreamingConfig> reloadAll() throws IOException { + try (AutoLock l = lock.lockForWrite()) { + crud.reloadAll(); + return listAllStreaming(); + } } - private StreamingConfig loadStreamingConfigAt(String path) throws IOException { - ResourceStore store = getStore(); - StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER); + public StreamingConfig createStreamingConfig(StreamingConfig streamingConfig) throws IOException { + try (AutoLock l = lock.lockForWrite()) { + if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) { + throw new IllegalArgumentException(); + } + if (streamingMap.containsKey(streamingConfig.resourceName())) + throw new IllegalArgumentException( + "StreamingConfig '" + streamingConfig.getName() + "' already exists"); - if (StringUtils.isBlank(streamingDesc.getName())) { - throw new IllegalStateException("StreamingConfig name must not be blank"); + streamingConfig.updateRandomUuid(); + + return crud.save(streamingConfig); } - return streamingDesc; } - private void reloadAllStreaming() throws IOException { - ResourceStore store = getStore(); - logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT)); - - streamingMap.clear(); - - List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX); - for (String path : paths) { - StreamingConfig streamingConfig; - try { - streamingConfig = loadStreamingConfigAt(path); - } catch (Exception e) { - logger.error("Error loading streaming desc " + path, e); - continue; - } - if (path.equals(streamingConfig.getResourcePath()) == false) { - logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath()); - continue; + public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException { + try (AutoLock l = lock.lockForWrite()) { + if (desc.getUuid() == null || desc.getName() == null) { + throw new IllegalArgumentException("SteamingConfig Illegal."); } - if (streamingMap.containsKey(streamingConfig.getName())) { - logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path); - continue; + if (!streamingMap.containsKey(desc.resourceName())) { + throw new IllegalArgumentException("StreamingConfig '" + desc.getName() + "' does not exist."); } - streamingMap.putLocal(streamingConfig.getName(), streamingConfig); + return crud.save(desc); } + } - logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)"); + public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException { + try (AutoLock l = lock.lockForWrite()) { + crud.delete(streamingConfig); + } } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java index f2baf29..663816c 100644 --- a/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/metadata/TempStatementManagerTest.java @@ -46,21 +46,21 @@ public class TempStatementManagerTest extends LocalFileMetadataTestCase { public void testAddTempStatement() throws IOException { TempStatementManager manager = TempStatementManager.getInstance(getTestConfig()); manager.updateTempStatement("temp_table3", "AAAAA"); - Assert.assertEquals(3, manager.listAllTempStatement().size()); + Assert.assertEquals(3, manager.reloadAllTempStatement().size()); } @Test public void testRemoveTempStatement() throws IOException { TempStatementManager manager = TempStatementManager.getInstance(getTestConfig()); manager.removeTempStatement("temp_table1"); - Assert.assertEquals(1, manager.listAllTempStatement().size()); + Assert.assertEquals(1, manager.reloadAllTempStatement().size()); } @Test public void testUpdateTempStatement() throws IOException { TempStatementManager manager = TempStatementManager.getInstance(getTestConfig()); manager.updateTempStatement("temp_table1", "AAAAA"); - Assert.assertEquals(2, manager.listAllTempStatement().size()); + Assert.assertEquals(2, manager.reloadAllTempStatement().size()); Assert.assertEquals("AAAAA", manager.getTempStatement("temp_table1")); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/StreamingManagerTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/StreamingManagerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/StreamingManagerTest.java new file mode 100644 index 0000000..798deb0 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/StreamingManagerTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.streaming; + +import java.io.IOException; +import java.util.List; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class StreamingManagerTest extends LocalFileMetadataTestCase { + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testBasics() throws IOException { + StreamingManager mgr = StreamingManager.getInstance(getTestConfig()); + List<StreamingConfig> origin = mgr.listAllStreaming(); + + // test create + { + StreamingConfig streamingConfig = new StreamingConfig(); + streamingConfig.setName("name for test"); + streamingConfig.setType("type for test"); + mgr.createStreamingConfig(streamingConfig); + List<StreamingConfig> reloadAll = mgr.reloadAll(); + Assert.assertTrue(origin.size() + 1 == reloadAll.size()); + } + + // test update + { + StreamingConfig streamingConfig = mgr.getStreamingConfig("name for test"); + streamingConfig.setType("updated type"); + mgr.updateStreamingConfig(streamingConfig); + List<StreamingConfig> reloadAll = mgr.reloadAll(); + Assert.assertTrue(origin.size() + 1 == reloadAll.size()); + streamingConfig = mgr.getStreamingConfig("name for test"); + Assert.assertEquals("updated type", streamingConfig.getType()); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java index 14ef524..9fbb7f3 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java @@ -50,16 +50,27 @@ import com.google.common.collect.Lists; @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class HybridInstance extends RootPersistentEntity implements IRealization { + private final static Logger logger = LoggerFactory.getLogger(HybridInstance.class); + + public static HybridInstance create(KylinConfig config, String name, List<RealizationEntry> realizationEntries) { + HybridInstance hybridInstance = new HybridInstance(); + + hybridInstance.setConfig(config); + hybridInstance.setName(name); + hybridInstance.setRealizationEntries(realizationEntries); + hybridInstance.updateRandomUuid(); + + return hybridInstance; + } + + // ============================================================================ + @JsonIgnore private KylinConfig config; @JsonProperty("name") private String name; - public void setRealizationEntries(List<RealizationEntry> realizationEntries) { - this.realizationEntries = realizationEntries; - } - @JsonProperty("realizations") private List<RealizationEntry> realizationEntries; @@ -75,21 +86,17 @@ public class HybridInstance extends RootPersistentEntity implements IRealization private long dateRangeEnd; private boolean isReady = false; - private final static Logger logger = LoggerFactory.getLogger(HybridInstance.class); - + @Override + public String resourceName() { + return name; + } + public List<RealizationEntry> getRealizationEntries() { return realizationEntries; } - public static HybridInstance create(KylinConfig config, String name, List<RealizationEntry> realizationEntries) { - HybridInstance hybridInstance = new HybridInstance(); - - hybridInstance.setConfig(config); - hybridInstance.setName(name); - hybridInstance.setRealizationEntries(realizationEntries); - hybridInstance.updateRandomUuid(); - - return hybridInstance; + public void setRealizationEntries(List<RealizationEntry> realizationEntries) { + this.realizationEntries = realizationEntries; } private void init() { http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java index 37f4aff..1e56c73 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java @@ -21,13 +21,15 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.AutoReadWriteLock; +import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; +import org.apache.kylin.metadata.cachesync.CachedCrudAssist; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; @@ -43,9 +45,10 @@ import com.google.common.collect.Lists; /** */ public class HybridManager implements IRealizationProvider { - public static final Serializer<HybridInstance> HYBRID_SERIALIZER = new JsonSerializer<HybridInstance>(HybridInstance.class); private static final Logger logger = LoggerFactory.getLogger(HybridManager.class); + + public static final Serializer<HybridInstance> HYBRID_SERIALIZER = new JsonSerializer<>(HybridInstance.class); public static HybridManager getInstance(KylinConfig config) { return config.getManager(HybridManager.class); @@ -61,14 +64,24 @@ public class HybridManager implements IRealizationProvider { private KylinConfig config; private CaseInsensitiveStringCache<HybridInstance> hybridMap; + private CachedCrudAssist<HybridInstance> crud; + private AutoReadWriteLock lock = new AutoReadWriteLock(); - private HybridManager(KylinConfig config) throws IOException { - logger.info("Initializing HybridManager with config " + config); - this.config = config; + private HybridManager(KylinConfig cfg) throws IOException { + logger.info("Initializing HybridManager with config " + cfg); + this.config = cfg; this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid"); + this.crud = new CachedCrudAssist<HybridInstance>(getStore(), ResourceStore.HYBRID_RESOURCE_ROOT, + HybridInstance.class, hybridMap) { + @Override + protected HybridInstance initEntityAfterReload(HybridInstance hybridInstance, String resourceName) { + hybridInstance.setConfig(config); + return hybridInstance; // noop + } + }; // touch lower level metadata before registering my listener - reloadAllHybridInstance(); + crud.reloadAll(); Broadcaster.getInstance(config).registerListener(new HybridSyncListener(), "hybrid", "cube"); } @@ -76,89 +89,57 @@ public class HybridManager implements IRealizationProvider { @Override public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException { - for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) { - if (real instanceof HybridInstance) { - reloadHybridInstance(real.getName()); + try (AutoLock l = lock.lockForWrite()) { + for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) { + if (real instanceof HybridInstance) { + crud.reloadQuietly(real.getName()); + } } } } @Override - public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) + throws IOException { if ("hybrid".equals(entity)) { String hybridName = cacheKey; - if (event == Event.DROP) - hybridMap.removeLocal(hybridName); - else - reloadHybridInstance(hybridName); + try (AutoLock l = lock.lockForWrite()) { + if (event == Event.DROP) + hybridMap.removeLocal(hybridName); + else + crud.reloadQuietly(hybridName); + } - for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.HYBRID, hybridName)) { + for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.HYBRID, + hybridName)) { broadcaster.notifyProjectSchemaUpdate(prj.getName()); } } else if ("cube".equals(entity)) { String cubeName = cacheKey; - for (HybridInstance hybrid : getHybridInstancesByChild(RealizationType.CUBE, cubeName)) { - reloadHybridInstance(hybrid.getName()); + try (AutoLock l = lock.lockForWrite()) { + for (HybridInstance hybrid : getHybridInstancesByChild(RealizationType.CUBE, cubeName)) { + crud.reloadQuietly(hybrid.getName()); + } } } } } - public void reloadAllHybridInstance() throws IOException { - ResourceStore store = getStore(); - List<String> paths = store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json"); - - hybridMap.clear(); - logger.debug("Loading Hybrid from folder " + store.getReadableResourcePath(ResourceStore.HYBRID_RESOURCE_ROOT)); - - for (String path : paths) { - reloadHybridInstanceAt(path); - } - - logger.debug("Loaded " + paths.size() + " Hybrid(s)"); - } - public List<HybridInstance> getHybridInstancesByChild(RealizationType type, String realizationName) { - List<HybridInstance> result = Lists.newArrayList(); - for (HybridInstance hybridInstance : hybridMap.values()) { - for (RealizationEntry realizationEntry : hybridInstance.getRealizationEntries()) { - if (realizationEntry.getType() == type && realizationEntry.getRealization().equalsIgnoreCase(realizationName)) { - result.add(hybridInstance); + try (AutoLock l = lock.lockForRead()) { + List<HybridInstance> result = Lists.newArrayList(); + for (HybridInstance hybridInstance : hybridMap.values()) { + for (RealizationEntry realizationEntry : hybridInstance.getRealizationEntries()) { + if (realizationEntry.getType() == type + && realizationEntry.getRealization().equalsIgnoreCase(realizationName)) { + result.add(hybridInstance); + } } - } - - } - - return result; - } - public void reloadHybridInstance(String name) { - reloadHybridInstanceAt(HybridInstance.concatResourcePath(name)); - } - - private synchronized HybridInstance reloadHybridInstanceAt(String path) { - ResourceStore store = getStore(); - - HybridInstance hybridInstance = null; - try { - hybridInstance = store.getResource(path, HybridInstance.class, HYBRID_SERIALIZER); - hybridInstance.setConfig(config); - - if (hybridInstance.getRealizationEntries() == null || hybridInstance.getRealizationEntries().size() == 0) { - throw new IllegalStateException("HybridInstance must have realization entries, " + path); } - if (StringUtils.isBlank(hybridInstance.getName())) - throw new IllegalStateException("HybridInstance name must not be blank, at " + path); - - final String name = hybridInstance.getName(); - hybridMap.putLocal(name, hybridInstance); - - return hybridInstance; - } catch (Exception e) { - logger.error("Error during load hybrid instance " + path, e); - return null; + return result; } } @@ -173,11 +154,27 @@ public class HybridManager implements IRealizationProvider { } public Collection<HybridInstance> listHybridInstances() { - return hybridMap.values(); + try (AutoLock l = lock.lockForRead()) { + return hybridMap.values(); + } } public HybridInstance getHybridInstance(String name) { - return hybridMap.get(name); + try (AutoLock l = lock.lockForRead()) { + return hybridMap.get(name); + } + } + + public HybridInstance reloadHybridInstance(String name) { + try (AutoLock l = lock.lockForWrite()) { + return crud.reload(name); + } + } + + public void reloadAllHybridInstance() throws IOException { + try (AutoLock l = lock.lockForWrite()) { + crud.reloadAll(); + } } private ResourceStore getStore() { http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index c8aee5d..872deed 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -182,7 +182,7 @@ public class SparkCubing extends AbstractApplication { private void writeDictionary(Dataset<Row> intermediateTable, String cubeName, String segmentId) throws Exception { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); final CubeManager cubeManager = CubeManager.getInstance(kylinConfig); - final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); + final CubeInstance cubeInstance = cubeManager.getCube(cubeName); final String[] columns = intermediateTable.columns(); final CubeSegment seg = cubeInstance.getSegmentById(segmentId); final CubeDesc cubeDesc = cubeInstance.getDescriptor(); @@ -249,7 +249,7 @@ public class SparkCubing extends AbstractApplication { } private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception { - CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); + CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); CubeDesc cubeDesc = cubeInstance.getDescriptor(); CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler(); http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json b/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json index 8d534e8..3bf27f9 100644 --- a/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json +++ b/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table1.json @@ -1,4 +1,5 @@ { + "uuid" : "da93ed8c-aed7-4a98-ba05-28c89cfa8ee2", "session_id" : "DEFAULT_SESSION", "statement_id": "temp_table1", "statement": "as (select * from fact_table1)" http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json b/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json index f4f5fad..b2b6a79 100644 --- a/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json +++ b/examples/test_case_data/localmeta/temp_statement/DEFAULT_SESSION/temp_table2.json @@ -1,4 +1,5 @@ { + "uuid" : "9f37ed8c-aed7-4a98-ba05-28c89cfac870", "session_id" : "DEFAULT_SESSION", "statement_id": "temp_table2", "statement": "as (select * from fact_table2)" http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 239b4af..d338332 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -131,7 +131,7 @@ public class BuildCubeWithStream { BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0); brokerConfig.setHost(localIp); kafkaConfig.setTopic(topicName); - KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig); + KafkaConfigManager.getInstance(kylinConfig).updateKafkaConfig(kafkaConfig); startEmbeddedKafka(topicName, brokerConfig); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java index afa914c..cc4b736 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/ModelController.java @@ -206,7 +206,7 @@ public class ModelController extends BasicController { newModelDesc = modelService.createModelDesc(project, newModelDesc); //reload avoid shallow - metaManager.reloadDataModelDescAt(DataModelDesc.concatResourcePath(newModelName)); + metaManager.reloadDataModel(newModelName); } catch (IOException e) { throw new InternalErrorException("failed to clone DataModelDesc", e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java b/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java index 260cbc0..32c7339 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java @@ -75,7 +75,7 @@ public class KafkaConfigService extends BasicService { if (getKafkaManager().getKafkaConfig(config.getName()) != null) { throw new BadRequestException(String.format(msg.getKAFKA_CONFIG_ALREADY_EXIST(), config.getName())); } - getKafkaManager().createKafkaConfig(config.getName(), config); + getKafkaManager().createKafkaConfig(config); return config; } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java index 1f907f8..d4d7cc7 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java @@ -41,7 +41,7 @@ public class StreamingService extends BasicService { if (StringUtils.isEmpty(table)) { streamingConfigs = getStreamingManager().listAllStreaming(); } else { - StreamingConfig config = getStreamingManager().getConfig(table); + StreamingConfig config = getStreamingManager().getStreamingConfig(table); if (config != null) { streamingConfigs.add(config); } @@ -73,7 +73,7 @@ public class StreamingService extends BasicService { if (getStreamingManager().getStreamingConfig(config.getName()) != null) { throw new BadRequestException(String.format(msg.getSTREAMING_CONFIG_ALREADY_EXIST(), config.getName())); } - StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config); + StreamingConfig streamingConfig = getStreamingManager().createStreamingConfig(config); return streamingConfig; } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java index 5e451b4..ad6a46a 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java @@ -24,12 +24,12 @@ import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.common.util.AutoReadWriteLock; +import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; +import org.apache.kylin.metadata.cachesync.CachedCrudAssist; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; @@ -39,10 +39,9 @@ import org.slf4j.LoggerFactory; */ public class KafkaConfigManager { + @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(KafkaConfigManager.class); - public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class); - public static KafkaConfigManager getInstance(KylinConfig config) { return config.getManager(KafkaConfigManager.class); } @@ -55,27 +54,39 @@ public class KafkaConfigManager { // ============================================================================ private KylinConfig config; - + // name ==> StreamingConfig private CaseInsensitiveStringCache<KafkaConfig> kafkaMap; - + private CachedCrudAssist<KafkaConfig> crud; + private AutoReadWriteLock lock = new AutoReadWriteLock(); + private KafkaConfigManager(KylinConfig config) throws IOException { this.config = config; this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, "kafka"); + this.crud = new CachedCrudAssist<KafkaConfig>(getStore(), ResourceStore.KAFKA_RESOURCE_ROOT, KafkaConfig.class, + kafkaMap) { + @Override + protected KafkaConfig initEntityAfterReload(KafkaConfig t, String resourceName) { + return t; // noop + } + }; // touch lower level metadata before registering my listener - reloadAllKafkaConfig(); + crud.reloadAll(); Broadcaster.getInstance(config).registerListener(new KafkaSyncListener(), "kafka"); } private class KafkaSyncListener extends Broadcaster.Listener { @Override - public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { - if (event == Event.DROP) - removeKafkaConfigLocal(cacheKey); - else - reloadKafkaConfigLocal(cacheKey); + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) + throws IOException { + try (AutoLock l = lock.lockForWrite()) { + if (event == Event.DROP) + kafkaMap.removeLocal(cacheKey); + else + crud.reloadQuietly(cacheKey); + } } } @@ -83,85 +94,45 @@ public class KafkaConfigManager { return ResourceStore.getStore(this.config); } - public List<KafkaConfig> listAllKafkaConfigs() { - return new ArrayList(kafkaMap.values()); + public KafkaConfig getKafkaConfig(String name) { + try (AutoLock l = lock.lockForRead()) { + return kafkaMap.get(name); + } } - /** - * Reload KafkaConfig from resource store It will be triggered by an desc - * update event. - * - * @param name - * @throws IOException - */ - public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException { - - // Save Source - String path = KafkaConfig.concatResourcePath(name); - - // Reload the KafkaConfig - KafkaConfig ndesc = loadKafkaConfigAt(path); - - // Here replace the old one - kafkaMap.putLocal(ndesc.getName(), ndesc); - return ndesc; + public List<KafkaConfig> listAllKafkaConfigs() { + try (AutoLock l = lock.lockForRead()) { + return new ArrayList(kafkaMap.values()); + } } - public boolean createKafkaConfig(String name, KafkaConfig config) { + public boolean createKafkaConfig(KafkaConfig kafkaConfig) throws IOException { + try (AutoLock l = lock.lockForWrite()) { - if (config == null || StringUtils.isEmpty(config.getName())) { - throw new IllegalArgumentException(); - } + if (kafkaMap.containsKey(kafkaConfig.resourceName())) + throw new IllegalArgumentException("KafkaConfig '" + kafkaConfig.getName() + "' already exists"); + + kafkaConfig.updateRandomUuid(); + checkKafkaConfig(kafkaConfig); - if (kafkaMap.containsKey(config.getName())) - throw new IllegalArgumentException("KafkaConfig '" + config.getName() + "' already exists"); - try { - getStore().putResource(KafkaConfig.concatResourcePath(name), config, KafkaConfig.SERIALIZER); - kafkaMap.put(config.getName(), config); + crud.save(kafkaConfig); return true; - } catch (IOException e) { - logger.error("error save resource name:" + name, e); - throw new RuntimeException("error save resource name:" + name, e); } } - public KafkaConfig updateKafkaConfig(KafkaConfig desc) throws IOException { - // Validate KafkaConfig - if (desc.getUuid() == null || desc.getName() == null) { - throw new IllegalArgumentException(); - } - String name = desc.getName(); - if (!kafkaMap.containsKey(name)) { - throw new IllegalArgumentException("KafkaConfig '" + name + "' does not exist."); - } - - // Save Source - String path = desc.getResourcePath(); - getStore().putResource(path, desc, KAFKA_SERIALIZER); + public KafkaConfig updateKafkaConfig(KafkaConfig kafkaConfig) throws IOException { + try (AutoLock l = lock.lockForWrite()) { - // Reload the KafkaConfig - KafkaConfig ndesc = loadKafkaConfigAt(path); - // Here replace the old one - kafkaMap.put(ndesc.getName(), desc); + if (!kafkaMap.containsKey(kafkaConfig.resourceName())) + throw new IllegalArgumentException("KafkaConfig '" + kafkaConfig.getName() + "' does not exist."); - return ndesc; - } - - private KafkaConfig loadKafkaConfigAt(String path) throws IOException { - ResourceStore store = getStore(); - KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class, KAFKA_SERIALIZER); - - if (StringUtils.isBlank(kafkaConfig.getName())) { - throw new IllegalStateException("KafkaConfig name must not be blank"); + checkKafkaConfig(kafkaConfig); + + return crud.save(kafkaConfig); } - return kafkaConfig; } - public KafkaConfig getKafkaConfig(String name) { - return kafkaMap.get(name); - } - - public void saveKafkaConfig(KafkaConfig kafkaConfig) throws IOException { + private void checkKafkaConfig(KafkaConfig kafkaConfig) { if (kafkaConfig == null || StringUtils.isEmpty(kafkaConfig.getName())) { throw new IllegalArgumentException(); } @@ -173,50 +144,13 @@ public class KafkaConfigManager { if (kafkaConfig.getKafkaClusterConfigs() == null || kafkaConfig.getKafkaClusterConfigs().size() == 0) { throw new IllegalArgumentException("No cluster info"); } - - String path = KafkaConfig.concatResourcePath(kafkaConfig.getName()); - getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER); } // remove kafkaConfig public void removeKafkaConfig(KafkaConfig kafkaConfig) throws IOException { - String path = kafkaConfig.getResourcePath(); - getStore().deleteResource(path); - kafkaMap.remove(kafkaConfig.getName()); - } - - private void removeKafkaConfigLocal(String name) { - kafkaMap.remove(name); - } - - private void reloadAllKafkaConfig() throws IOException { - ResourceStore store = getStore(); - logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT)); - - kafkaMap.clear(); - - List<String> paths = store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX); - for (String path : paths) { - KafkaConfig kafkaConfig; - try { - kafkaConfig = loadKafkaConfigAt(path); - } catch (Exception e) { - logger.error("Error loading kafkaConfig desc " + path, e); - continue; - } - if (path.equals(kafkaConfig.getResourcePath()) == false) { - logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at " + kafkaConfig.getResourcePath()); - continue; - } - if (kafkaMap.containsKey(kafkaConfig.getName())) { - logger.error("Dup KafkaConfig name '" + kafkaConfig.getName() + "' on path " + path); - continue; - } - - kafkaMap.putLocal(kafkaConfig.getName(), kafkaConfig); + try (AutoLock l = lock.lockForWrite()) { + crud.delete(kafkaConfig); } - - logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index 82b8902..696c20c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -30,14 +30,15 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.source.kafka.TimedJsonStreamParser; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kylin.source.kafka.TimedJsonStreamParser; /** */ +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class KafkaConfig extends RootPersistentEntity { @@ -70,6 +71,11 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("parserProperties") private String parserProperties; + @Override + public String resourceName() { + return name; + } + public String getResourcePath() { return concatResourcePath(name); }