[ 
https://issues.apache.org/jira/browse/KYLIN-3485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16571672#comment-16571672
 ] 

ASF GitHub Bot commented on KYLIN-3485:
---------------------------------------

shaofengshi closed pull request #191: KYLIN-3485 Make unloading table more 
flexible
URL: https://github.com/apache/kylin/pull/191
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java 
b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 2c5a9226ba..f79d0f0a7d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.source;
 
 import java.io.Closeable;
+import java.io.IOException;
 
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -55,4 +56,9 @@
      * For testing purpose.
      */
     ISampleDataDeployer getSampleDataDeployer();
+
+    /**
+     * Unload table.
+     */
+    void unloadTable(String tableName, String project) throws IOException;
 }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 786daa6a17..3c661f2aca 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -32,6 +32,7 @@
 
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeManager;
@@ -53,7 +54,6 @@
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
@@ -62,11 +62,11 @@
 import org.apache.kylin.rest.response.TableSnapshotResponse;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -277,8 +277,6 @@ public boolean unloadHiveTable(String tableName, String 
project) throws IOExcept
             return false;
         }
 
-        tableType = desc.getSourceType();
-
         if (!modelService.isTableInModel(desc, project)) {
             removeTableFromProject(tableName, project);
             rtn = true;
@@ -293,20 +291,9 @@ public boolean unloadHiveTable(String tableName, String 
project) throws IOExcept
         metaMgr.removeSourceTable(tableName, project);
 
         // remove streaming info
-        if (tableType == 1) {
-            StreamingConfig config = null;
-            KafkaConfig kafkaConfig = null;
-            try {
-                config = 
streamingService.getStreamingManager().getStreamingConfig(tableName);
-                kafkaConfig = kafkaConfigService.getKafkaConfig(tableName, 
project);
-                streamingService.dropStreamingConfig(config, project);
-                kafkaConfigService.dropKafkaConfig(kafkaConfig, project);
-                rtn = true;
-            } catch (Exception e) {
-                rtn = false;
-                logger.error(e.getLocalizedMessage(), e);
-            }
-        }
+        SourceManager sourceManager = 
SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
+        ISource source = sourceManager.getCachedSource(desc);
+        source.unloadTable(tableName, project);
         return rtn;
     }
 
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index daf93d344b..938114c2d6 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -82,6 +82,11 @@ public ISampleDataDeployer getSampleDataDeployer() {
         return new HiveMetadataExplorer();
     }
 
+    @Override
+    public void unloadTable(String tableName, String project) throws 
IOException {
+
+    }
+
     @Override
     public void close() throws IOException {
         // not needed
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index ae3bbc5108..37d119eefe 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -67,6 +67,11 @@ public ISampleDataDeployer getSampleDataDeployer() {
         return new JdbcExplorer();
     }
 
+    @Override
+    public void unloadTable(String tableName, String project) throws 
IOException {
+
+    }
+
     @Override
     public void close() throws IOException {
         // not needed
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 70d37aadd1..264f2ce8a6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -37,6 +37,7 @@
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.ISampleDataDeployer;
 import org.apache.kylin.source.ISource;
@@ -246,6 +247,19 @@ public ISampleDataDeployer getSampleDataDeployer() {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void unloadTable(String tableName, String project) throws 
IOException {
+        StreamingConfig config;
+        KafkaConfig kafkaConfig;
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        StreamingManager streamingManager = 
StreamingManager.getInstance(kylinConfig);
+        KafkaConfigManager kafkaConfigManager = 
KafkaConfigManager.getInstance(kylinConfig);
+        config = streamingManager.getStreamingConfig(tableName);
+        kafkaConfig = kafkaConfigManager.getKafkaConfig(tableName);
+        streamingManager.removeStreamingConfig(config);
+        kafkaConfigManager.removeKafkaConfig(kafkaConfig);
+    }
+
     @Override
     public void close() throws IOException {
         // not needed


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make unloading table more flexible
> ----------------------------------
>
>                 Key: KYLIN-3485
>                 URL: https://issues.apache.org/jira/browse/KYLIN-3485
>             Project: Kylin
>          Issue Type: Improvement
>          Components: RDBMS Source, REST Service, Streaming
>            Reporter: rongchuan.jin
>            Assignee: rongchuan.jin
>            Priority: Minor
>             Fix For: v2.5.0
>
>
> Now Kylin use fixed code to unload streaming table , it only works with 
> Kafka.If I want to use other streaming datasource to extend Kylin like Aliyun 
> Loghub Service.It seems to be difficult.
> I would like contribute improvment to make unloading table more flexible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to