Repository: kylin Updated Branches: refs/heads/yang-m1 706481ae1 -> cacadd1d5
KYLIN-1660 Streaming/kafka config not match with table name Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cacadd1d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cacadd1d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cacadd1d Branch: refs/heads/yang-m1 Commit: cacadd1d56f3d6efb8959ae753754190614ee284 Parents: 706481a Author: Jason <jiat...@163.com> Authored: Mon May 9 18:48:42 2016 +0800 Committer: Jason <jiat...@163.com> Committed: Mon May 9 18:48:42 2016 +0800 ---------------------------------------------------------------------- .../rest/controller/StreamingController.java | 8 +++- .../kylin/rest/controller/TableController.java | 39 +++++++++++++-- webapp/app/js/controllers/cubeEdit.js | 49 +------------------ webapp/app/js/controllers/cubeSchema.js | 11 ----- webapp/app/js/controllers/cubes.js | 50 +++++++++++++++----- webapp/app/js/controllers/streamingConfig.js | 9 ++-- webapp/app/js/services/tables.js | 2 +- webapp/app/partials/cubes/cubes.html | 8 ++-- .../app/partials/tables/loadStreamingTable.html | 3 +- 9 files changed, 94 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/cacadd1d/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java index 932ad51..416b27c 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.JsonMappingException; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; @@ -109,7 +110,7 @@ public class StreamingController extends BasicController { tableDesc.setUuid(UUID.randomUUID().toString()); MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); metaMgr.saveSourceTable(tableDesc); - cubeMgmtService.syncTableToProject(new String[]{tableDesc.getName()}, project); + cubeMgmtService.syncTableToProject(new String[]{tableDesc.getIdentity()}, project); } catch (IOException e) { throw new BadRequestException("Failed to add streaming table."); } @@ -229,6 +230,11 @@ public class StreamingController extends BasicController { logger.error("Failed to deal with the request.", e); throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e); } + + String [] dbTable = HadoopUtil.parseHiveTableName(desc.getName()); + desc.setName(dbTable[1]); + desc.setDatabase(dbTable[0]); + desc.getIdentity(); return desc; } http://git-wip-us.apache.org/repos/asf/kylin/blob/cacadd1d/server/src/main/java/org/apache/kylin/rest/controller/TableController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server/src/main/java/org/apache/kylin/rest/controller/TableController.java index bd04ad8..3be769d 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/TableController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/TableController.java @@ -25,18 +25,20 @@ import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.exception.NotFoundException; import org.apache.kylin.rest.request.CardinalityRequest; import org.apache.kylin.rest.request.StreamingRequest; import org.apache.kylin.rest.response.TableDescResponse; -import org.apache.kylin.rest.service.CubeService; -import org.apache.kylin.rest.service.ModelService; -import org.apache.kylin.rest.service.ProjectService; +import org.apache.kylin.rest.service.*; import org.apache.kylin.source.hive.HiveClient; +import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -62,6 +64,10 @@ public class TableController extends BasicController { @Autowired private ProjectService projectService; @Autowired + private StreamingService streamingService; + @Autowired + private KafkaConfigService kafkaConfigService; + @Autowired private ModelService modelService; /** @@ -97,7 +103,7 @@ public class TableController extends BasicController { * @return Table metadata array * @throws IOException */ - @RequestMapping(value = "/{tableName}", method = { RequestMethod.GET }) + @RequestMapping(value = "/{tableName}/load", method = { RequestMethod.GET }) @ResponseBody public TableDesc getHiveTable(@PathVariable String tableName) { return cubeMgmtService.getMetadataManager().getTableDesc(tableName); @@ -163,7 +169,15 @@ public class TableController extends BasicController { */ private boolean unLoadHiveTable(String tableName, String project) { boolean rtn= false; - try { + int tableType = 0; + + //remove streaming info + String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); + tableName = dbTableName[0] + "." + dbTableName[1]; + TableDesc desc = cubeMgmtService.getMetadataManager().getTableDesc(tableName); + tableType = desc.getSourceType(); + + try { if (!modelService.isTableInModel(tableName, project)) { cubeMgmtService.removeTableFromProject(tableName, project); rtn = true; @@ -180,6 +194,21 @@ public class TableController extends BasicController { rtn = false; } } + + if(tableType ==1 && !projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)){ + StreamingConfig config = null; + KafkaConfig kafkaConfig = null; + try { + config = streamingService.getStreamingManager().getStreamingConfig(tableName); + kafkaConfig = kafkaConfigService.getKafkaConfig(tableName); + streamingService.dropStreamingConfig(config); + kafkaConfigService.dropKafkaConfig(kafkaConfig); + rtn = true; + } catch (Exception e) { + rtn = false; + logger.error(e.getLocalizedMessage(), e); + } + } return rtn; } http://git-wip-us.apache.org/repos/asf/kylin/blob/cacadd1d/webapp/app/js/controllers/cubeEdit.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/controllers/cubeEdit.js b/webapp/app/js/controllers/cubeEdit.js index eca6126..5894e42 100755 --- a/webapp/app/js/controllers/cubeEdit.js +++ b/webapp/app/js/controllers/cubeEdit.js @@ -20,9 +20,7 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $location, $templateCache, $interpolate, MessageService, TableService, CubeDescService, CubeService, loadingRequest, SweetAlert, $log, cubeConfig, CubeDescModel, MetaModel, TableModel, ModelDescService, modelsManager, cubesManager, ProjectModel, StreamingModel, StreamingService) { - var STREAMING_SUFFIX = "_streaming"; $scope.cubeConfig = cubeConfig; - $scope.metaModel = {}; $scope.modelsManager = modelsManager; @@ -168,9 +166,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio // ~ init if ($scope.isEdit = !!$routeParams.cubeName) { - $scope.streamingMeta = StreamingModel.createStreamingConfig(); - $scope.kafkaMeta = StreamingModel.createKafkaConfig(); - CubeDescService.query({cube_name: $routeParams.cubeName}, function (detail) { if (detail.length > 0) { $scope.cubeMetaFrame = detail[0]; @@ -180,19 +175,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio if (!modelsManager.getModels().length) { ModelDescService.query({model_name: $scope.cubeMetaFrame.model_name}, function (_model) { $scope.metaModel.model = _model; - - StreamingService.getConfig({table:$scope.metaModel.model.fact_table}, function (kfkConfigs) { - if(!!kfkConfigs[0]){ - $scope.cubeState.isStreaming = true; - } - else{ - return; - } - $scope.streamingMeta = kfkConfigs[0]; - StreamingService.getKfkConfig({kafkaConfigName:$scope.streamingMeta.name}, function (streamings) { - $scope.kafkaMeta = streamings[0]; - }) - }) }); } @@ -213,9 +195,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio //$scope.cubeMetaFrame.model_name = modelName; $scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true); - $scope.streamingMeta = StreamingModel.createStreamingConfig(); - $scope.kafkaMeta = StreamingModel.createKafkaConfig(); - } @@ -244,15 +223,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio $scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true); - //streaming meta - if($scope.cubeState.isStreaming == true){ - $scope.streamingMeta.cubeName = $scope.cubeMetaFrame.name; - $scope.streamingMeta.name = $scope.cubeMetaFrame.name+STREAMING_SUFFIX; - $scope.kafkaMeta.name = $scope.cubeMetaFrame.name+STREAMING_SUFFIX; - } - - - }; $scope.cubeResultTmpl = function (notification) { @@ -270,15 +240,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio return; } - if (!$scope.cubeState.isStreaming) { - $scope.state.streamingCube = false; - } else { - $scope.state.streamingCube = true; - $scope.state.streamingMeta = angular.toJson($scope.streamingMeta, true); - $scope.state.kafkaMeta = angular.toJson($scope.kafkaMeta, true); - } - - SweetAlert.swal({ title: '', text: 'Are you sure to save the cube ?', @@ -295,10 +256,7 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio CubeService.update({}, { cubeDescData: $scope.state.cubeSchema, cubeName: $routeParams.cubeName, - project: $scope.state.project, - streamingCube: $scope.state.streamingCube, - streamingData: $scope.state.streamingMeta, - kafkaData: $scope.state.kafkaMeta + project: $scope.state.project }, function (request) { if (request.successful) { $scope.state.cubeSchema = request.cubeDescData; @@ -337,10 +295,7 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio } else { CubeService.save({}, { cubeDescData: $scope.state.cubeSchema, - project: $scope.state.project, - streamingCube: $scope.state.streamingCube, - streamingData: $scope.state.streamingMeta, - kafkaData: $scope.state.kafkaMeta + project: $scope.state.project }, function (request) { if (request.successful) { $scope.state.cubeSchema = request.cubeDescData; http://git-wip-us.apache.org/repos/asf/kylin/blob/cacadd1d/webapp/app/js/controllers/cubeSchema.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js index be0c826..a470024 100755 --- a/webapp/app/js/controllers/cubeSchema.js +++ b/webapp/app/js/controllers/cubeSchema.js @@ -34,7 +34,6 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic if (UserService.hasRole("ROLE_ADMIN")) { $scope.wizardSteps.push({title: 'Advanced Setting', src: 'partials/cubeDesigner/advanced_settings.html', isComplete: false,form:'cube_setting_form'}); } - //$scope.wizardSteps.push({title: 'Streaming', src: 'partials/cubeDesigner/streamingConfig.html', isComplete: false,form:'cube_streaming_form'}); $scope.wizardSteps.push({title: 'Configuration Overwrites ', src: 'partials/cubeDesigner/cubeOverwriteProp.html', isComplete: false,form:'cube_overwrite_prop_form'}); $scope.wizardSteps.push({title: 'Overview', src: 'partials/cubeDesigner/overview.html', isComplete: false,form:null}); @@ -147,16 +146,6 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic } $scope.metaModel.model=modelsManager.getModel($scope.cubeMetaFrame.model_name); - StreamingService.getConfig({cubeName:$scope.cubeMetaFrame.name}, function (kfkConfigs) { - if(!!kfkConfigs[0]&&kfkConfigs[0].cubeName == $scope.cubeMetaFrame.name){ - $scope.cubeState.isStreaming = true; - $scope.streamingMeta = kfkConfigs[0]; - StreamingService.getKfkConfig({kafkaConfigName:$scope.streamingMeta.name}, function (streamings) { - $scope.kafkaMeta = streamings[0]; - }) - } - }) - } }); http://git-wip-us.apache.org/repos/asf/kylin/blob/cacadd1d/webapp/app/js/controllers/cubes.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/controllers/cubes.js b/webapp/app/js/controllers/cubes.js index 8576354..f7b25e5 100644 --- a/webapp/app/js/controllers/cubes.js +++ b/webapp/app/js/controllers/cubes.js @@ -18,15 +18,12 @@ 'use strict'; -KylinApp - .controller('CubesCtrl', function ($scope, $q, $routeParams, $location, $modal, MessageService, CubeDescService, CubeService, JobService, UserService, ProjectService, SweetAlert, loadingRequest, $log, cubeConfig, ProjectModel, ModelService, MetaModel, CubeList,modelsManager,cubesManager,StreamingList,kylinConfig) { +KylinApp.controller('CubesCtrl', function ($scope, $q, $routeParams, $location, $modal, MessageService, CubeDescService, CubeService, JobService, UserService, ProjectService, SweetAlert, loadingRequest, $log, cubeConfig, ProjectModel, ModelService, MetaModel, CubeList,modelsManager,TableService) { $scope.cubeConfig = cubeConfig; $scope.cubeList = CubeList; $scope.modelsManager = modelsManager; - //$scope.cubesManager = cubesManager; - $scope.listParams = { cubeName: $routeParams.cubeName, projectName: $routeParams.projectName @@ -82,13 +79,44 @@ KylinApp $scope.loading = true; return CubeList.list(queryParam).then(function (resp) { - angular.forEach($scope.cubeList.cubes,function(item,index){ - var result = StreamingList.checkCubeExist(item.name); - if(result.exist == true){ - item.streaming = result.streaming; - var kfkConfig = StreamingList.getKafkaConfig(result.streaming.name); - item.kfkConfig = kfkConfig; - } + angular.forEach($scope.cubeList.cubes,function(cube,index){ + cube.streaming = false; + CubeDescService.query({cube_name: cube.name}, {}, function (detail) { + if (detail.length > 0 && detail[0].hasOwnProperty("name")) { + cube.detail = detail[0]; + ModelService.list({projectName:$scope.projectModel.selectedProject,modelName:cube.detail.model_name}, function (_models) { + if(_models && _models.length){ + for(var i=0;i<=_models.length;i++){ + if(_models[i].name == cube.detail.model_name){ + cube.model = _models[i]; + var factTable = cube.model.fact_table; + TableService.get({tableName:factTable},function(table){ + if(table && table.source_type == 1){ + cube.streaming = true; + } + }) + break; + } + } + } + + }) + //cube.model = modelsManager.getModel(cube.detail.model_name); + + defer.resolve(cube.detail); + + } else { + SweetAlert.swal('Oops...', "No cube detail info loaded.", 'error'); + } + }, function (e) { + if (e.data && e.data.exception) { + var message = e.data.exception; + var msg = !!(message) ? message : 'Failed to take action.'; + SweetAlert.swal('Oops...', msg, 'error'); + } else { + SweetAlert.swal('Oops...', "Failed to take action.", 'error'); + } + }); }) $scope.loading = false; http://git-wip-us.apache.org/repos/asf/kylin/blob/cacadd1d/webapp/app/js/controllers/streamingConfig.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/controllers/streamingConfig.js b/webapp/app/js/controllers/streamingConfig.js index 7ba912f..33dae64 100644 --- a/webapp/app/js/controllers/streamingConfig.js +++ b/webapp/app/js/controllers/streamingConfig.js @@ -95,11 +95,14 @@ KylinApp.controller('streamingConfigCtrl', function ($scope,StreamingService, $q //view model if($scope.state.mode == 'view' && $scope.tableModel.selectedSrcTable.source_type==1){ var table = $scope.tableModel.selectedSrcTable; - StreamingService.getConfig({table:table.name}, function (configs) { - if(!!configs[0]&&configs[0].name.toUpperCase() == table.name.toUpperCase()){ + var streamingName = table.database+"."+table.name; + $scope.streamingMeta = {}; + $scope.kafkaMeta = {}; + StreamingService.getConfig({table:streamingName}, function (configs) { + if(!!configs[0]&&configs[0].name.toUpperCase() == streamingName.toUpperCase()){ $scope.streamingMeta = configs[0]; StreamingService.getKfkConfig({kafkaConfigName:$scope.streamingMeta.name}, function (streamings) { - if(!!streamings[0]&&streamings[0].name.toUpperCase() == table.name.toUpperCase()){ + if(!!streamings[0]&&streamings[0].name.toUpperCase() == streamingName.toUpperCase()){ $scope.kafkaMeta = streamings[0]; } }) http://git-wip-us.apache.org/repos/asf/kylin/blob/cacadd1d/webapp/app/js/services/tables.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/services/tables.js b/webapp/app/js/services/tables.js index 4199d6c..dec32bb 100755 --- a/webapp/app/js/services/tables.js +++ b/webapp/app/js/services/tables.js @@ -19,7 +19,7 @@ KylinApp.factory('TableService', ['$resource', function ($resource, config) { return $resource(Config.service.url + 'tables/:tableName/:action/:database', {}, { list: {method: 'GET', params: {}, cache: true, isArray: true}, - get: {method: 'GET', params: {}, isArray: false}, + get: {method: 'GET', params: {action:'load'}, isArray: false}, getExd: {method: 'GET', params: {action: 'exd-map'}, isArray: false}, reload: {method: 'PUT', params: {action: 'reload'}, isArray: false}, loadHiveTable: {method: 'POST', params: {}, isArray: false}, http://git-wip-us.apache.org/repos/asf/kylin/blob/cacadd1d/webapp/app/partials/cubes/cubes.html ---------------------------------------------------------------------- diff --git a/webapp/app/partials/cubes/cubes.html b/webapp/app/partials/cubes/cubes.html index 8886249..8a241f2 100644 --- a/webapp/app/partials/cubes/cubes.html +++ b/webapp/app/partials/cubes/cubes.html @@ -45,7 +45,7 @@ </th> <th>Actions</th> <th ng-if="userService.hasRole('ROLE_ADMIN')">Admins</th> - <th></th> + <th>Streaming</th> </tr> </thead> <!--Body--> @@ -109,11 +109,9 @@ </ul> </div> </td> - <td ng-if="cube.streaming"> - <label class="badge label-info" style="cursor:pointer;">STREAMING</label> + <td> + <label class="badge" ng-class="{'label-info':cube.streaming==true}" style="cursor:pointer;">{{cube.streaming}}</label> </td> - <td ng-if="!cube.streaming"> - </td> </tr> <tr ng-show="cube.showDetail"> <td colspan="9" style="padding: 10px 30px 10px 30px;"> http://git-wip-us.apache.org/repos/asf/kylin/blob/cacadd1d/webapp/app/partials/tables/loadStreamingTable.html ---------------------------------------------------------------------- diff --git a/webapp/app/partials/tables/loadStreamingTable.html b/webapp/app/partials/tables/loadStreamingTable.html index fb771b0..2f2a1ec 100644 --- a/webapp/app/partials/tables/loadStreamingTable.html +++ b/webapp/app/partials/tables/loadStreamingTable.html @@ -51,6 +51,7 @@ <div class="col-xs-6" ng-show="table.schemaChecked"> <ol class="text-info" style="margin-bottom: 30px;"> <li>Choose 'timestamp' type column for streaming table.</li> + <li>By default, system will choose 'Default' as database, you can specify database like this 'database.table'</li> <li>derived time dimensions are calculated from timestamp field to help analysis against different time granularities.</li> </ol> <form class="form-horizontal" name="form.setStreamingSchema" novalidate> @@ -59,7 +60,7 @@ <div class="col-xs-8" ng-class="{'has-error':form.setStreamingSchema.streamingObject.$invalid && (form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$submitted)}"> - <input type="text" name="streamingObject" required="" ng-model="table.name" class="form-control"/> + <input type="text" name="streamingObject" placeholder="database.table" required="" ng-model="table.name" class="form-control"/> <small class="help-block" ng-show="form.setStreamingSchema.streamingObject.$error.required&&(form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$submitted)"> Table name is required.