Repository: ambari Updated Branches: refs/heads/trunk 85fb356cc -> 5de82da3b
AMBARI-22202 : Hive View 2.0 in Ambari 2.5.1 does not use the specified YARN queue when using the "Upload Table" feature. (nitirajrathore) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5de82da3 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5de82da3 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5de82da3 Branch: refs/heads/trunk Commit: 5de82da3b293a0306d796d79960af45bc7c88372 Parents: 85fb356 Author: Nitiraj Singh Rathore <nitiraj.rath...@gmail.com> Authored: Thu Oct 12 12:45:56 2017 +0530 Committer: Nitiraj Singh Rathore <nitiraj.rath...@gmail.com> Committed: Thu Oct 12 12:47:12 2017 +0530 ---------------------------------------------------------------------- .../view/hive20/resources/browser/DDLProxy.java | 154 +++++++++++++------ .../hive20/resources/uploads/UploadService.java | 12 +- .../uploads/query/InsertFromQueryInput.java | 13 +- .../databases/database/tables/upload-table.js | 12 ++ 4 files changed, 140 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5de82da3/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/browser/DDLProxy.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/browser/DDLProxy.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/browser/DDLProxy.java index 77857f9..6d793d3 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/browser/DDLProxy.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/browser/DDLProxy.java @@ -19,6 +19,7 @@ package org.apache.ambari.view.hive20.resources.browser; import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Strings; @@ -54,6 +55,8 @@ import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job; import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobController; import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl; import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobResourceManager; +import org.apache.ambari.view.hive20.resources.settings.Setting; +import org.apache.ambari.view.hive20.resources.settings.SettingsResourceManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,11 +76,13 @@ public class DDLProxy { private final ViewContext context; private final TableMetaParserImpl tableMetaParser; + private SettingsResourceManager settingsResourceManager; @Inject - public DDLProxy(ViewContext context, TableMetaParserImpl tableMetaParser) { + public DDLProxy(ViewContext context, TableMetaParserImpl tableMetaParser, SettingsResourceManager settingsResourceManager) { this.context = context; this.tableMetaParser = tableMetaParser; + this.settingsResourceManager = settingsResourceManager; LOG.info("Creating DDLProxy"); } @@ -130,15 +135,19 @@ public class DDLProxy { } public Job getColumnStatsJob(final String databaseName, final String tableName, final String columnName, - JobResourceManager resourceManager) throws ServiceException { + JobResourceManager resourceManager) throws ServiceException { FetchColumnStatsQueryGenerator queryGenerator = new FetchColumnStatsQueryGenerator(databaseName, tableName, - columnName); + columnName); Optional<String> q = queryGenerator.getQuery(); String jobTitle = "Fetch column stats for " + databaseName + "." + tableName + "." + columnName; - if(q.isPresent()) { + if (q.isPresent()) { String query = q.get(); + Optional<String> settingsString = generateSettingsString(); + if (settingsString.isPresent()) { + query = settingsString.get() + query; + } return createJob(databaseName, query, jobTitle, resourceManager); - }else{ + } else { throw new ServiceException("Failed to generate job for {}" + jobTitle); } } @@ -228,32 +237,56 @@ public class DDLProxy { tableMeta.setDatabase(databaseName); } Optional<String> createTableQuery = new CreateTableQueryGenerator(tableMeta).getQuery(); - if(createTableQuery.isPresent()) { + if (createTableQuery.isPresent()) { LOG.info("generated create table query : {}", createTableQuery); return createTableQuery.get(); - }else { + } else { throw new ServiceException("could not generate create table query for database : " + databaseName + " table : " + tableMeta.getTable()); } } public Job createTable(String databaseName, TableMeta tableMeta, JobResourceManager resourceManager) throws ServiceException { String createTableQuery = this.generateCreateTableDDL(databaseName, tableMeta); + Optional<String> settingsString = generateSettingsString(); + if (settingsString.isPresent()) { + createTableQuery = settingsString.get() + createTableQuery; + } String jobTitle = "Create table " + tableMeta.getDatabase() + "." + tableMeta.getTable(); + return createJob(databaseName, createTableQuery, jobTitle, resourceManager); } + private Optional<String> generateSettingsString() { + List<Setting> settings = settingsResourceManager.getSettings(); + if (null != settings && !settings.isEmpty()) { + return Optional.of(Joiner.on(";\n").join(FluentIterable.from(settings).transform(new Function<Setting, String>() { + @Override + public String apply(Setting setting) { + return "set " + setting.getKey() + "=" + setting.getValue(); + } + }).toList()) + ";\n"/*need this ;\n at the end of last line also.*/); + } else { + return Optional.absent(); + } + } + public Job deleteTable(String databaseName, String tableName, JobResourceManager resourceManager) throws ServiceException { String deleteTableQuery = generateDeleteTableDDL(databaseName, tableName); String jobTitle = "Delete table " + databaseName + "." + tableName; + Optional<String> settingsString = generateSettingsString(); + if (settingsString.isPresent()) { + deleteTableQuery = settingsString.get() + deleteTableQuery; + } + return createJob(databaseName, deleteTableQuery, jobTitle, resourceManager); } public String generateDeleteTableDDL(String databaseName, String tableName) throws ServiceException { Optional<String> deleteTableQuery = new DeleteTableQueryGenerator(databaseName, tableName).getQuery(); - if(deleteTableQuery.isPresent()) { + if (deleteTableQuery.isPresent()) { LOG.info("deleting table {} with query {}", databaseName + "." + tableName, deleteTableQuery); return deleteTableQuery.get(); - }else{ + } else { throw new ServiceException("Failed to generate query for delete table " + databaseName + "." + tableName); } } @@ -261,6 +294,11 @@ public class DDLProxy { public Job alterTable(ViewContext context, ConnectionConfig hiveConnectionConfig, String databaseName, String oldTableName, TableMeta newTableMeta, JobResourceManager resourceManager) throws ServiceException { String alterQuery = generateAlterTableQuery(context, hiveConnectionConfig, databaseName, oldTableName, newTableMeta); String jobTitle = "Alter table " + databaseName + "." + oldTableName; + Optional<String> settingsString = generateSettingsString(); + if (settingsString.isPresent()) { + alterQuery = settingsString.get() + alterQuery; + } + return createJob(databaseName, alterQuery, jobTitle, resourceManager); } @@ -272,58 +310,72 @@ public class DDLProxy { public String generateAlterTableQuery(TableMeta oldTableMeta, TableMeta newTableMeta) throws ServiceException { AlterTableQueryGenerator queryGenerator = new AlterTableQueryGenerator(oldTableMeta, newTableMeta); Optional<String> alterQuery = queryGenerator.getQuery(); - if(alterQuery.isPresent()){ + if (alterQuery.isPresent()) { return alterQuery.get(); - }else{ + } else { throw new ServiceException("Failed to generate alter table query for table " + oldTableMeta.getDatabase() + "." + oldTableMeta.getTable() + ". No difference was found."); } } public Job renameTable(String oldDatabaseName, String oldTableName, String newDatabaseName, String newTableName, JobResourceManager resourceManager) - throws ServiceException { + throws ServiceException { RenameTableQueryGenerator queryGenerator = new RenameTableQueryGenerator(oldDatabaseName, oldTableName, - newDatabaseName, newTableName); + newDatabaseName, newTableName); Optional<String> renameTable = queryGenerator.getQuery(); - if(renameTable.isPresent()) { + if (renameTable.isPresent()) { String renameQuery = renameTable.get(); String jobTitle = "Rename table " + oldDatabaseName + "." + oldTableName + " to " + newDatabaseName + "." + - newTableName; + newTableName; + Optional<String> settingsString = generateSettingsString(); + if (settingsString.isPresent()) { + renameQuery = settingsString.get() + renameQuery; + } return createJob(oldDatabaseName, renameQuery, jobTitle, resourceManager); - }else{ + } else { throw new ServiceException("Failed to generate rename table query for table " + oldDatabaseName + "." + - oldTableName); + oldTableName); } } public Job deleteDatabase(String databaseName, JobResourceManager resourceManager) throws ServiceException { DeleteDatabaseQueryGenerator queryGenerator = new DeleteDatabaseQueryGenerator(databaseName); Optional<String> deleteDatabase = queryGenerator.getQuery(); - if(deleteDatabase.isPresent()) { + if (deleteDatabase.isPresent()) { String deleteQuery = deleteDatabase.get(); - return createJob(databaseName, deleteQuery, "Delete database " + databaseName , resourceManager); - }else{ + Optional<String> settingsString = generateSettingsString(); + if (settingsString.isPresent()) { + deleteQuery = settingsString.get() + deleteQuery; + } + + return createJob(databaseName, deleteQuery, "Delete database " + databaseName, resourceManager); + } else { throw new ServiceException("Failed to generate delete database query for database " + databaseName); } } public Job createDatabase(String databaseName, JobResourceManager resourceManager) throws ServiceException { CreateDatabaseQueryGenerator queryGenerator = new CreateDatabaseQueryGenerator(databaseName); - Optional<String> deleteDatabase = queryGenerator.getQuery(); - if(deleteDatabase.isPresent()) { - String deleteQuery = deleteDatabase.get(); - return createJob("default", deleteQuery, "CREATE DATABASE " + databaseName , resourceManager); - }else{ + Optional<String> createDatabase = queryGenerator.getQuery(); + if (createDatabase.isPresent()) { + String createQuery = createDatabase.get(); + Optional<String> settingsString = generateSettingsString(); + if (settingsString.isPresent()) { + createQuery = settingsString.get() + createQuery; + } + + return createJob("default", createQuery, "CREATE DATABASE " + databaseName, resourceManager); + } else { throw new ServiceException("Failed to generate create database query for database " + databaseName); } } - public Job createJob(String databaseName, String deleteQuery, String jobTitle, JobResourceManager resourceManager) - throws ServiceException { - LOG.info("Creating job for : {}", deleteQuery ); + public Job createJob(String databaseName, String query, String jobTitle, JobResourceManager resourceManager) + throws ServiceException { + LOG.info("Creating job for : {}", query); Map jobInfo = new HashMap<>(); jobInfo.put("title", jobTitle); - jobInfo.put("forcedContent", deleteQuery); + jobInfo.put("forcedContent", query); jobInfo.put("dataBase", databaseName); jobInfo.put("referrer", JobImpl.REFERRER.INTERNAL.name()); @@ -334,7 +386,7 @@ public class DDLProxy { LOG.info("returning job with id {} for {}", returnableJob.getId(), jobTitle); return returnableJob; } catch (Throwable e) { - LOG.error("Exception occurred while {} : {}", jobTitle, deleteQuery, e); + LOG.error("Exception occurred while {} : {}", jobTitle, query, e); throw new ServiceException(e); } } @@ -345,10 +397,14 @@ public class DDLProxy { AnalyzeTableQueryGenerator queryGenerator = new AnalyzeTableQueryGenerator(tableMeta, shouldAnalyzeColumns); Optional<String> analyzeTable = queryGenerator.getQuery(); String jobTitle = "Analyze table " + databaseName + "." + tableName; - if(analyzeTable.isPresent()) { + if (analyzeTable.isPresent()) { String query = analyzeTable.get(); + Optional<String> settingsString = generateSettingsString(); + if (settingsString.isPresent()) { + query = settingsString.get() + query; + } return createJob(databaseName, query, jobTitle, resourceManager); - }else{ + } else { throw new ServiceException("Failed to generate job for {}" + jobTitle); } } @@ -356,31 +412,30 @@ public class DDLProxy { public ColumnStats fetchColumnStats(String columnName, String jobId, ViewContext context) throws ServiceException { try { ResultsPaginationController.ResultsResponse results = ResultsPaginationController.getResult(jobId, null, null, null, null, context); - if(results.getHasResults()){ - List<String[]> rows = results.getRows(); - Map<Integer, String> headerMap = new HashMap<>(); - boolean header = true; + if (results.getHasResults()) { + List<String[]> rows = results.getRows(); + Map<Integer, String> headerMap = new HashMap<>(); + boolean header = true; ColumnStats columnStats = new ColumnStats(); - for(String[] row : rows){ - if(header){ - for(int i = 0 ; i < row.length; i++){ - if(!Strings.isNullOrEmpty(row[i])){ + for (String[] row : rows) { + if (header) { + for (int i = 0; i < row.length; i++) { + if (!Strings.isNullOrEmpty(row[i])) { headerMap.put(i, row[i].trim()); } } header = false; - } - else if(row.length > 0 ){ - if(columnName.equals(row[0])){ // the first column of the row contains column name + } else if (row.length > 0) { + if (columnName.equals(row[0])) { // the first column of the row contains column name createColumnStats(row, headerMap, columnStats); - }else if( row.length > 1 && row[0].equalsIgnoreCase("COLUMN_STATS_ACCURATE")){ + } else if (row.length > 1 && row[0].equalsIgnoreCase("COLUMN_STATS_ACCURATE")) { columnStats.setColumnStatsAccurate(row[1]); } } } return columnStats; - }else{ + } else { throw new ServiceException("Cannot find any result for this jobId: " + jobId); } } catch (HiveClientException e) { @@ -391,19 +446,20 @@ public class DDLProxy { /** * order of values in array - * row [# col_name, data_type, min, max, num_nulls, distinct_count, avg_col_len, max_col_len,num_trues,num_falses,comment] + * row [# col_name, data_type, min, max, num_nulls, distinct_count, avg_col_len, max_col_len,num_trues,num_falses,comment] * indexes : 0 1 2 3 4 5 6 7 8 9 10 + * * @param row * @param headerMap * @param columnStats * @return */ private ColumnStats createColumnStats(String[] row, Map<Integer, String> headerMap, ColumnStats columnStats) throws ServiceException { - if(null == row){ + if (null == row) { throw new ServiceException("row cannot be null."); } - for(int i = 0 ; i < row.length; i++){ - switch(headerMap.get(i)){ + for (int i = 0; i < row.length; i++) { + switch (headerMap.get(i)) { case ColumnStats.COLUMN_NAME: columnStats.setColumnName(row[i]); break; http://git-wip-us.apache.org/repos/asf/ambari/blob/5de82da3/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/UploadService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/UploadService.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/UploadService.java index 3164da0..8704440 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/UploadService.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/UploadService.java @@ -49,6 +49,7 @@ import org.apache.ambari.view.utils.ambari.AmbariApi; import org.apache.commons.io.input.ReaderInputStream; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.parquet.Strings; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; import org.json.simple.JSONObject; @@ -301,7 +302,16 @@ public class UploadService extends BaseService { try { String insertQuery = generateInsertFromQuery(input); LOG.info("insertQuery : {}", insertQuery); - + if( null != input.getGlobalSettings() && !Strings.isNullOrEmpty(input.getGlobalSettings().trim())){ + String globalSettings = input.getGlobalSettings().trim(); + if(!globalSettings.endsWith(";")){ + globalSettings += ";\n"; + }else{ + globalSettings += "\n"; + } + insertQuery = globalSettings + insertQuery; + } + LOG.info("creating job for query : {}", insertQuery); Job job = createJob(insertQuery, input.getFromDatabase(), "Insert from " + input.getFromDatabase() + "." + input.getFromTable() + " to " + input.getToDatabase() + "." + input.getToTable()); http://git-wip-us.apache.org/repos/asf/ambari/blob/5de82da3/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/query/InsertFromQueryInput.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/query/InsertFromQueryInput.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/query/InsertFromQueryInput.java index b74ba9b..02a6038 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/query/InsertFromQueryInput.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/query/InsertFromQueryInput.java @@ -28,6 +28,7 @@ public class InsertFromQueryInput { private String fromTable; private String toDatabase; private String toTable; + private String globalSettings; private List<ColumnInfo> partitionedColumns; private List<ColumnInfo> normalColumns; private Boolean unhexInsert = Boolean.FALSE; @@ -36,13 +37,15 @@ public class InsertFromQueryInput { } public InsertFromQueryInput(String fromDatabase, String fromTable, String toDatabase, String toTable, - List<ColumnInfo> partitionedColumns, List<ColumnInfo> normalColumns, Boolean unhexInsert) { + List<ColumnInfo> partitionedColumns, List<ColumnInfo> normalColumns, + String globalSettings, Boolean unhexInsert) { this.fromDatabase = fromDatabase; this.fromTable = fromTable; this.toDatabase = toDatabase; this.toTable = toTable; this.partitionedColumns = partitionedColumns; this.normalColumns = normalColumns; + this.globalSettings = globalSettings; this.unhexInsert = unhexInsert; } @@ -101,4 +104,12 @@ public class InsertFromQueryInput { public void setToTable(String toTable) { this.toTable = toTable; } + + public String getGlobalSettings() { + return globalSettings; + } + + public void setGlobalSettings(String globalSettings) { + this.globalSettings = globalSettings; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5de82da3/contrib/views/hive20/src/main/resources/ui/app/routes/databases/database/tables/upload-table.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/resources/ui/app/routes/databases/database/tables/upload-table.js b/contrib/views/hive20/src/main/resources/ui/app/routes/databases/database/tables/upload-table.js index f47d820..799a87a 100644 --- a/contrib/views/hive20/src/main/resources/ui/app/routes/databases/database/tables/upload-table.js +++ b/contrib/views/hive20/src/main/resources/ui/app/routes/databases/database/tables/upload-table.js @@ -35,6 +35,16 @@ export default NewTable.extend(UILoggerMixin, { init: function () { this._super(); }, + + afterModel(){ + return this.store.findAll('setting').then((data) => { + let localStr = ''; + data.forEach(x => { + localStr = localStr + 'set '+ x.get('key')+ '='+ x.get('value') + ';\n'; + }); + this.set('globalSettings', localStr); + }); + }, setupController(controller, model) { this._super(controller, model); this.controller.set("showUploadTableModal", false); @@ -421,6 +431,7 @@ export default NewTable.extend(UILoggerMixin, { insertIntoTable : function(tableData){ console.log("insertIntoTable"); this.pushUploadProgressInfos(this.formatMessage('hive.messages.startingToInsertRows')); + let globalSettings = this.get('globalSettings'); let partitionedColumns = tableData.get("tableMeta").columns.filter(function(column){ return column.isPartitioned; @@ -445,6 +456,7 @@ export default NewTable.extend(UILoggerMixin, { "toTable": tableData.get("tableMeta").name, "partitionedColumns": partitionedColumns, "normalColumns": normalColumns, + "globalSettings": globalSettings, "unhexInsert": tableData.fileFormatInfo.containsEndlines }); },