[ https://issues.apache.org/jira/browse/KYLIN-3485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16571672#comment-16571672 ]
ASF GitHub Bot commented on KYLIN-3485: --------------------------------------- shaofengshi closed pull request #191: KYLIN-3485 Make unloading table more flexible URL: https://github.com/apache/kylin/pull/191 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java index 2c5a9226ba..f79d0f0a7d 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java @@ -19,6 +19,7 @@ package org.apache.kylin.source; import java.io.Closeable; +import java.io.IOException; import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; @@ -55,4 +56,9 @@ * For testing purpose. */ ISampleDataDeployer getSampleDataDeployer(); + + /** + * Unload table. + */ + void unloadTable(String tableName, String project) throws IOException; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index 786daa6a17..3c661f2aca 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -32,6 +32,7 @@ import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeManager; @@ -53,7 +54,6 @@ import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.msg.Message; import org.apache.kylin.rest.msg.MsgPicker; @@ -62,11 +62,11 @@ import org.apache.kylin.rest.response.TableSnapshotResponse; import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.IReadableTable.TableSignature; +import org.apache.kylin.source.ISource; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourceManager; import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob; import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob; -import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -277,8 +277,6 @@ public boolean unloadHiveTable(String tableName, String project) throws IOExcept return false; } - tableType = desc.getSourceType(); - if (!modelService.isTableInModel(desc, project)) { removeTableFromProject(tableName, project); rtn = true; @@ -293,20 +291,9 @@ public boolean unloadHiveTable(String tableName, String project) throws IOExcept metaMgr.removeSourceTable(tableName, project); // remove streaming info - if (tableType == 1) { - StreamingConfig config = null; - KafkaConfig kafkaConfig = null; - try { - config = streamingService.getStreamingManager().getStreamingConfig(tableName); - kafkaConfig = kafkaConfigService.getKafkaConfig(tableName, project); - streamingService.dropStreamingConfig(config, project); - kafkaConfigService.dropKafkaConfig(kafkaConfig, project); - rtn = true; - } catch (Exception e) { - rtn = false; - logger.error(e.getLocalizedMessage(), e); - } - } + SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv()); + ISource source = sourceManager.getCachedSource(desc); + source.unloadTable(tableName, project); return rtn; } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index daf93d344b..938114c2d6 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -82,6 +82,11 @@ public ISampleDataDeployer getSampleDataDeployer() { return new HiveMetadataExplorer(); } + @Override + public void unloadTable(String tableName, String project) throws IOException { + + } + @Override public void close() throws IOException { // not needed diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java index ae3bbc5108..37d119eefe 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java @@ -67,6 +67,11 @@ public ISampleDataDeployer getSampleDataDeployer() { return new JdbcExplorer(); } + @Override + public void unloadTable(String tableName, String project) throws IOException { + + } + @Override public void close() throws IOException { // not needed diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 70d37aadd1..264f2ce8a6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -37,6 +37,7 @@ import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.streaming.StreamingConfig; +import org.apache.kylin.metadata.streaming.StreamingManager; import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.ISampleDataDeployer; import org.apache.kylin.source.ISource; @@ -246,6 +247,19 @@ public ISampleDataDeployer getSampleDataDeployer() { throw new UnsupportedOperationException(); } + @Override + public void unloadTable(String tableName, String project) throws IOException { + StreamingConfig config; + KafkaConfig kafkaConfig; + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig); + KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig); + config = streamingManager.getStreamingConfig(tableName); + kafkaConfig = kafkaConfigManager.getKafkaConfig(tableName); + streamingManager.removeStreamingConfig(config); + kafkaConfigManager.removeKafkaConfig(kafkaConfig); + } + @Override public void close() throws IOException { // not needed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make unloading table more flexible > ---------------------------------- > > Key: KYLIN-3485 > URL: https://issues.apache.org/jira/browse/KYLIN-3485 > Project: Kylin > Issue Type: Improvement > Components: RDBMS Source, REST Service, Streaming > Reporter: rongchuan.jin > Assignee: rongchuan.jin > Priority: Minor > Fix For: v2.5.0 > > > Now Kylin use fixed code to unload streaming table , it only works with > Kafka.If I want to use other streaming datasource to extend Kylin like Aliyun > Loghub Service.It seems to be difficult. > I would like contribute improvment to make unloading table more flexible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)