http://git-wip-us.apache.org/repos/asf/carbondata/blob/66df989a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java new file mode 100644 index 0000000..fbb93b6 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.model; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.Maps; +import org.apache.carbondata.common.Strings; +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.constants.LoggerAction; +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.sort.SortScopeOptions; +import org.apache.carbondata.processing.util.TableOptionConstant; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +/** + * Builder for {@link CarbonLoadModel} + */ +@InterfaceAudience.Developer +public class CarbonLoadModelBuilder { + + private CarbonTable table; + + public CarbonLoadModelBuilder(CarbonTable table) { + this.table = table; + } + + /** + * build CarbonLoadModel for data loading + * @param options Load options from user input + * @return a new CarbonLoadModel instance + */ + public CarbonLoadModel build( + Map<String, String> options) throws InvalidLoadOptionException, IOException { + Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options); + optionsFinal.put("sort_scope", "no_sort"); + if (!options.containsKey("fileheader")) { + List<CarbonColumn> csvHeader = table.getCreateOrderColumn(table.getTableName()); + String[] columns = new String[csvHeader.size()]; + for (int i = 0; i < columns.length; i++) { + columns[i] = csvHeader.get(i).getColName(); + } + optionsFinal.put("fileheader", Strings.mkString(columns, ",")); + } + CarbonLoadModel model = new CarbonLoadModel(); + + // we have provided 'fileheader', so it hadoopConf can be null + build(options, optionsFinal, model, null); + + // set default values + model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT); + model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT); + model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false"))); + model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null)); + try { + model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options, "dictport", "-1"))); + } catch (NumberFormatException e) { + throw new InvalidLoadOptionException(e.getMessage()); + } + return model; + } + + /** + * build CarbonLoadModel for data loading + * @param options Load options from user input + * @param optionsFinal Load options that populated with default values for optional options + * @param carbonLoadModel The output load model + * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in + * user provided load options + */ + public void build( + Map<String, String> options, + Map<String, String> optionsFinal, + CarbonLoadModel carbonLoadModel, + Configuration hadoopConf) throws InvalidLoadOptionException, IOException { + carbonLoadModel.setTableName(table.getTableName()); + carbonLoadModel.setDatabaseName(table.getDatabaseName()); + carbonLoadModel.setTablePath(table.getTablePath()); + carbonLoadModel.setTableName(table.getTableName()); + CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table); + // Need to fill dimension relation + carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema); + String sort_scope = optionsFinal.get("sort_scope"); + String single_pass = optionsFinal.get("single_pass"); + String bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable"); + String bad_records_action = optionsFinal.get("bad_records_action"); + String bad_record_path = optionsFinal.get("bad_record_path"); + String global_sort_partitions = optionsFinal.get("global_sort_partitions"); + String timestampformat = optionsFinal.get("timestampformat"); + String dateFormat = optionsFinal.get("dateformat"); + String delimeter = optionsFinal.get("delimiter"); + String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1"); + String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2"); + String all_dictionary_path = optionsFinal.get("all_dictionary_path"); + String column_dict = optionsFinal.get("columndict"); + validateDateTimeFormat(timestampformat, "TimestampFormat"); + validateDateTimeFormat(dateFormat, "DateFormat"); + validateSortScope(sort_scope); + + if (Boolean.parseBoolean(bad_records_logger_enable) || + LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) { + bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path); + if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { + throw new InvalidLoadOptionException("Invalid bad records location."); + } + } + carbonLoadModel.setBadRecordsLocation(bad_record_path); + + validateGlobalSortPartitions(global_sort_partitions); + carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar"), "\\")); + carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar"), "\"")); + carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar"), "#")); + + // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, + // we should use table schema to generate file header. + String fileHeader = optionsFinal.get("fileheader"); + String headerOption = options.get("header"); + if (headerOption != null) { + if (!headerOption.equalsIgnoreCase("true") && + !headerOption.equalsIgnoreCase("false")) { + throw new InvalidLoadOptionException( + "'header' option should be either 'true' or 'false'."); + } + // whether the csv file has file header, the default value is true + if (Boolean.valueOf(headerOption)) { + if (!StringUtils.isEmpty(fileHeader)) { + throw new InvalidLoadOptionException( + "When 'header' option is true, 'fileheader' option is not required."); + } + } else { + if (StringUtils.isEmpty(fileHeader)) { + List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName()); + String[] columnNames = new String[columns.size()]; + for (int i = 0; i < columnNames.length; i++) { + columnNames[i] = columns.get(i).getColName(); + } + fileHeader = Strings.mkString(columnNames, ","); + } + } + } + + carbonLoadModel.setTimestampformat(timestampformat); + carbonLoadModel.setDateFormat(dateFormat); + carbonLoadModel.setDefaultTimestampFormat( + CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)); + + carbonLoadModel.setDefaultDateFormat( + CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)); + + carbonLoadModel.setSerializationNullFormat( + TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + + optionsFinal.get("serialization_null_format")); + + carbonLoadModel.setBadRecordsLoggerEnable( + TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + bad_records_logger_enable); + + carbonLoadModel.setBadRecordsAction( + TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + bad_records_action.toUpperCase()); + + carbonLoadModel.setIsEmptyDataBadRecord( + DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + + optionsFinal.get("is_empty_data_bad_record")); + + carbonLoadModel.setSkipEmptyLine(optionsFinal.get("skip_empty_line")); + + carbonLoadModel.setSortScope(sort_scope); + carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb")); + carbonLoadModel.setGlobalSortPartitions(global_sort_partitions); + carbonLoadModel.setUseOnePass(Boolean.parseBoolean(single_pass)); + + if (delimeter.equalsIgnoreCase(complex_delimeter_level1) || + complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) || + delimeter.equalsIgnoreCase(complex_delimeter_level2)) { + throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same"); + } else { + carbonLoadModel.setComplexDelimiterLevel1( + CarbonUtil.delimiterConverter(complex_delimeter_level1)); + carbonLoadModel.setComplexDelimiterLevel2( + CarbonUtil.delimiterConverter(complex_delimeter_level2)); + } + // set local dictionary path, and dictionary file extension + carbonLoadModel.setAllDictPath(all_dictionary_path); + carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)); + carbonLoadModel.setCsvHeader(fileHeader); + carbonLoadModel.setColDictFilePath(column_dict); + carbonLoadModel.setCsvHeaderColumns( + LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf)); + + int validatedMaxColumns = validateMaxColumns( + carbonLoadModel.getCsvHeaderColumns(), + optionsFinal.get("maxcolumns")); + + carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns)); + carbonLoadModel.readAndSetLoadMetadataDetails(); + } + + private int validateMaxColumns(String[] csvHeaders, String maxColumns) + throws InvalidLoadOptionException { + /* + User configures both csvheadercolumns, maxcolumns, + if csvheadercolumns >= maxcolumns, give error + if maxcolumns > threashold, give error + User configures csvheadercolumns + if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1 + if csvheadercolumns >= threashold, give error + User configures nothing + if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1 + if csvheadercolumns >= threashold, give error + */ + int columnCountInSchema = csvHeaders.length; + int maxNumberOfColumnsForParsing = 0; + Integer maxColumnsInt = getMaxColumnValue(maxColumns); + if (maxColumnsInt != null) { + if (columnCountInSchema >= maxColumnsInt) { + throw new InvalidLoadOptionException( + "csv headers should be less than the max columns " + maxColumnsInt); + } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { + throw new InvalidLoadOptionException( + "max columns cannot be greater than the threshold value: " + + CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING); + } else { + maxNumberOfColumnsForParsing = maxColumnsInt; + } + } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { + throw new InvalidLoadOptionException( + "csv header columns should be less than max threashold: " + + CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING); + } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { + maxNumberOfColumnsForParsing = columnCountInSchema + 1; + } else { + maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING; + } + return maxNumberOfColumnsForParsing; + } + + private Integer getMaxColumnValue(String maxColumn) { + return (maxColumn == null) ? null : Integer.parseInt(maxColumn); + } + + /** + * validates both timestamp and date for illegal values + */ + private void validateDateTimeFormat(String dateTimeLoadFormat, String dateTimeLoadOption) + throws InvalidLoadOptionException { + // allowing empty value to be configured for dateformat option. + if (dateTimeLoadFormat != null && !dateTimeLoadFormat.trim().equalsIgnoreCase("")) { + try { + new SimpleDateFormat(dateTimeLoadFormat); + } catch (IllegalArgumentException e) { + throw new InvalidLoadOptionException( + "Error: Wrong option: " + dateTimeLoadFormat + " is provided for option " + + dateTimeLoadOption); + } + } + } + + private void validateSortScope(String sortScope) throws InvalidLoadOptionException { + if (sortScope != null) { + // Don't support use global sort on partitioned table. + if (table.getPartitionInfo(table.getTableName()) != null && + sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString())) { + throw new InvalidLoadOptionException("Don't support use global sort on partitioned table."); + } + } + } + + private void validateGlobalSortPartitions(String globalSortPartitions) + throws InvalidLoadOptionException { + if (globalSortPartitions != null) { + try { + int num = Integer.parseInt(globalSortPartitions); + if (num <= 0) { + throw new InvalidLoadOptionException("'GLOBAL_SORT_PARTITIONS' should be greater than 0"); + } + } catch (NumberFormatException e) { + throw new InvalidLoadOptionException(e.getMessage()); + } + } + } + + /** + * check whether using default value or not + */ + private String checkDefaultValue(String value, String defaultValue) { + if (StringUtils.isEmpty(value)) { + return defaultValue; + } else { + return value; + } + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/66df989a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java new file mode 100644 index 0000000..8ec93a9 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.model; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.common.Maps; +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +/** + * Provide utilities to populate loading options + */ +@InterfaceAudience.Developer +public class LoadOption { + + private static LogService LOG = LogServiceFactory.getLogService(LoadOption.class.getName()); + + /** + * Based on the input options, fill and return data loading options with default value + */ + public static Map<String, String> fillOptionWithDefaultValue( + Map<String, String> options) throws InvalidLoadOptionException { + Map<String, String> optionsFinal = new HashMap<>(); + optionsFinal.put("delimiter", Maps.getOrDefault(options, "delimiter", ",")); + optionsFinal.put("quotechar", Maps.getOrDefault(options, "quotechar", "\"")); + optionsFinal.put("fileheader", Maps.getOrDefault(options, "fileheader", "")); + optionsFinal.put("commentchar", Maps.getOrDefault(options, "commentchar", "#")); + optionsFinal.put("columndict", Maps.getOrDefault(options, "columndict", null)); + + optionsFinal.put( + "escapechar", + CarbonLoaderUtil.getEscapeChar(Maps.getOrDefault(options,"escapechar", "\\"))); + + optionsFinal.put( + "serialization_null_format", + Maps.getOrDefault(options, "serialization_null_format", "\\N")); + + optionsFinal.put( + "bad_records_logger_enable", + Maps.getOrDefault( + options, + "bad_records_logger_enable", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))); + + String badRecordActionValue = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); + + optionsFinal.put( + "bad_records_action", + Maps.getOrDefault( + options, + "bad_records_action", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + badRecordActionValue))); + + optionsFinal.put( + "is_empty_data_bad_record", + Maps.getOrDefault( + options, + "is_empty_data_bad_record", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))); + + optionsFinal.put( + "skip_empty_line", + Maps.getOrDefault( + options, + "skip_empty_line", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE))); + + optionsFinal.put( + "all_dictionary_path", + Maps.getOrDefault(options, "all_dictionary_path", "")); + + optionsFinal.put( + "complex_delimiter_level_1", + Maps.getOrDefault(options,"complex_delimiter_level_1", "\\$")); + + optionsFinal.put( + "complex_delimiter_level_2", + Maps.getOrDefault(options, "complex_delimiter_level_2", "\\:")); + + optionsFinal.put( + "dateformat", + Maps.getOrDefault( + options, + "dateformat", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))); + + optionsFinal.put( + "timestampformat", + Maps.getOrDefault( + options, + "timestampformat", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT))); + + optionsFinal.put( + "global_sort_partitions", + Maps.getOrDefault( + options, + "global_sort_partitions", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, + null))); + + optionsFinal.put("maxcolumns", Maps.getOrDefault(options, "maxcolumns", null)); + + optionsFinal.put( + "batch_sort_size_inmb", + Maps.getOrDefault( + options, + "batch_sort_size_inmb", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, + CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))); + + optionsFinal.put( + "bad_record_path", + Maps.getOrDefault( + options, + "bad_record_path", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, + CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))); + + String useOnePass = Maps.getOrDefault( + options, + "single_pass", + CarbonProperties.getInstance().getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, + CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim().toLowerCase(); + + boolean singlePass; + + if (useOnePass.equalsIgnoreCase("true")) { + singlePass = true; + } else { + // when single_pass = false and if either alldictionarypath + // or columnDict is configured the do not allow load + if (StringUtils.isNotEmpty(optionsFinal.get("all_dictionary_path")) || + StringUtils.isNotEmpty(optionsFinal.get("columndict"))) { + throw new InvalidLoadOptionException( + "Can not use all_dictionary_path or columndict without single_pass."); + } else { + singlePass = false; + } + } + + optionsFinal.put("single_pass", String.valueOf(singlePass)); + return optionsFinal; + } + + /** + * Return CSV header field names + */ + public static String[] getCsvHeaderColumns( + CarbonLoadModel carbonLoadModel, + Configuration hadoopConf) throws IOException { + String delimiter; + if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) { + delimiter = CarbonCommonConstants.COMMA; + } else { + delimiter = CarbonUtil.delimiterConverter(carbonLoadModel.getCsvDelimiter()); + } + String csvFile = null; + String csvHeader = carbonLoadModel.getCsvHeader(); + String[] csvColumns; + if (StringUtils.isBlank(csvHeader)) { + // read header from csv file + csvFile = carbonLoadModel.getFactFilePath().split(",")[0]; + csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf); + if (StringUtils.isBlank(csvHeader)) { + throw new CarbonDataLoadingException("First line of the csv is not valid."); + } + String[] headers = csvHeader.toLowerCase().split(delimiter); + csvColumns = new String[headers.length]; + for (int i = 0; i < csvColumns.length; i++) { + csvColumns[i] = headers[i].replaceAll("\"", "").trim(); + } + } else { + String[] headers = csvHeader.toLowerCase().split(CarbonCommonConstants.COMMA); + csvColumns = new String[headers.length]; + for (int i = 0; i < csvColumns.length; i++) { + csvColumns[i] = headers[i].trim(); + } + } + + if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns, + carbonLoadModel.getCarbonDataLoadSchema())) { + if (csvFile == null) { + LOG.error("CSV header in DDL is not proper." + + " Column names in schema and CSV header are not the same."); + throw new CarbonDataLoadingException( + "CSV header in DDL is not proper. Column names in schema and CSV header are " + + "not the same."); + } else { + LOG.error( + "CSV header in input file is not proper. Column names in schema and csv header are not " + + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile)); + throw new CarbonDataLoadingException( + "CSV header in input file is not proper. Column names in schema and csv header are not " + + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile)); + } + } + return csvColumns; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/66df989a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index e9bd3b8..6876355 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -16,14 +16,9 @@ */ package org.apache.carbondata.processing.util; -import java.io.BufferedWriter; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; import java.net.InetAddress; import java.net.UnknownHostException; -import java.nio.charset.Charset; -import java.text.SimpleDateFormat; import java.util.*; import org.apache.carbondata.common.logging.LogService; @@ -40,9 +35,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.impl.FileFactory.FileType; -import org.apache.carbondata.core.fileoperations.AtomicFileOperations; -import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; -import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -58,9 +50,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.merger.NodeBlockRelation; import org.apache.carbondata.processing.merger.NodeMultiBlockRelation; -import static org.apache.carbondata.core.enums.EscapeSequences.*; -import com.google.gson.Gson; +import static org.apache.carbondata.core.enums.EscapeSequences.*; public final class CarbonLoaderUtil { @@ -344,48 +335,6 @@ public final class CarbonLoaderUtil { loadMetadataDetails.setLoadStartTime(loadStartTime); } - public static void writeLoadMetadata(AbsoluteTableIdentifier identifier, - List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { - String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); - - DataOutputStream dataOutputStream; - Gson gsonObjectToWrite = new Gson(); - BufferedWriter brWriter = null; - - AtomicFileOperations writeOperation = - new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation)); - - try { - dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); - brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); - - String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); - brWriter.write(metadataInstance); - } finally { - try { - if (null != brWriter) { - brWriter.flush(); - } - } catch (Exception e) { - LOGGER.error("error in flushing "); - - } - CarbonUtil.closeStreams(brWriter); - writeOperation.close(); - } - - } - - public static String readCurrentTime() { - SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP); - String date = null; - - date = sdf.format(new Date()); - - return date; - } - public static boolean isValidEscapeSequence(String escapeChar) { return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) || escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) || @@ -514,17 +463,6 @@ public final class CarbonLoaderUtil { } /** - * This method will divide the blocks among the nodes as per the data locality - * - * @param blockInfos - * @return - */ - public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) { - // -1 if number of nodes has to be decided based on block location information - return nodeBlockMapping(blockInfos, -1); - } - - /** * the method returns the number of required executors * * @param blockInfos @@ -899,25 +837,6 @@ public final class CarbonLoaderUtil { CarbonUtil.checkAndCreateFolder(segmentFolder); } - /** - * This will update the old table status details before clean files to the latest table status. - * @param oldList - * @param newList - * @return - */ - public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew( - LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) { - - List<LoadMetadataDetails> newListMetadata = - new ArrayList<LoadMetadataDetails>(Arrays.asList(newList)); - for (LoadMetadataDetails oldSegment : oldList) { - if ("false".equalsIgnoreCase(oldSegment.getVisibility())) { - newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false"); - } - } - return newListMetadata; - } - /* * This method will add data size and index size into tablestatus for each segment. And also * returns the size of the segment. http://git-wip-us.apache.org/repos/asf/carbondata/blob/66df989a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java deleted file mode 100644 index c00cc86..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.util; - -import java.io.IOException; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.indexstore.PartitionSpec; -import org.apache.carbondata.core.locks.CarbonLockFactory; -import org.apache.carbondata.core.locks.ICarbonLock; -import org.apache.carbondata.core.locks.LockUsage; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.SegmentFileStore; -import org.apache.carbondata.core.mutate.CarbonUpdateUtil; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentStatus; -import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -public final class DeleteLoadFolders { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(DeleteLoadFolders.class.getName()); - - private DeleteLoadFolders() { - - } - - /** - * returns segment path - * - * @param identifier - * @param oneLoad - * @return - */ - private static String getSegmentPath(AbsoluteTableIdentifier identifier, - LoadMetadataDetails oneLoad) { - String segmentId = oneLoad.getLoadName(); - return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); - } - - public static void physicalFactAndMeasureMetadataDeletion( - AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete, - List<PartitionSpec> specs) { - LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); - for (LoadMetadataDetails oneLoad : currentDetails) { - if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { - try { - if (oneLoad.getSegmentFile() != null) { - SegmentFileStore - .deleteSegment(absoluteTableIdentifier.getTablePath(), oneLoad.getSegmentFile(), - specs); - } else { - String path = getSegmentPath(absoluteTableIdentifier, oneLoad); - boolean status = false; - if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { - CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); - CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { - - @Override public boolean accept(CarbonFile file) { - return (CarbonTablePath.isCarbonDataFile(file.getName()) || - CarbonTablePath.isCarbonIndexFile(file.getName())); - } - }); - - //if there are no fact and msr metadata files present then no need to keep - //entry in metadata. - if (filesToBeDeleted.length == 0) { - status = true; - } else { - - for (CarbonFile eachFile : filesToBeDeleted) { - if (!eachFile.delete()) { - LOGGER.warn("Unable to delete the file as per delete command " + eachFile - .getAbsolutePath()); - status = false; - } else { - status = true; - } - } - } - // need to delete the complete folder. - if (status) { - if (!file.delete()) { - LOGGER.warn("Unable to delete the folder as per delete command " + file - .getAbsolutePath()); - } - } - - } else { - LOGGER.warn("Files are not found in segment " + path - + " it seems, files are already being deleted"); - } - - } - } catch (IOException e) { - LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName()); - } - } - } - } - - private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad, - boolean isForceDelete) { - if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() || - SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || - SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() || - SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus()) - && oneLoad.getVisibility().equalsIgnoreCase("true")) { - if (isForceDelete) { - return true; - } - long deletionTime = oneLoad.getModificationOrdeletionTimesStamp(); - - return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime); - - } - - return false; - } - - private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad, - boolean isForceDelete) { - if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() || - SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) { - if (isForceDelete) { - return true; - } - long deletionTime = oneLoad.getModificationOrdeletionTimesStamp(); - - return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime); - - } - - return false; - } - - private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId, - String metadataPath) { - LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); - for (LoadMetadataDetails oneLoad : currentDetails) { - if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) { - return oneLoad; - } - } - return null; - } - - public static boolean deleteLoadFoldersFromFileSystem( - AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, - LoadMetadataDetails[] details, String metadataPath) { - boolean isDeleted = false; - if (details != null && details.length != 0) { - for (LoadMetadataDetails oneLoad : details) { - if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { - ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); - try { - if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS - || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) { - if (segmentLock.lockWithRetries(1, 5)) { - LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName()); - LoadMetadataDetails currentDetails = - getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath); - if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails, - isForceDelete)) { - oneLoad.setVisibility("false"); - isDeleted = true; - LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); - } - } else { - LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName()); - return isDeleted; - } - } else { - oneLoad.setVisibility("false"); - isDeleted = true; - LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); - } - } finally { - segmentLock.unlock(); - LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released"); - } - } - } - } - return isDeleted; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/66df989a/store/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml index 6663683..51d2cf9 100644 --- a/store/sdk/pom.xml +++ b/store/sdk/pom.xml @@ -21,7 +21,7 @@ <dependencies> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-spark-common</artifactId> + <artifactId>carbondata-hadoop</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/66df989a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 51ca09c..e06200a 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -28,6 +28,7 @@ import java.util.Objects; import org.apache.carbondata.common.Strings; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.converter.SchemaConverter; @@ -40,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.ThriftWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.spark.util.DataLoadingUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder; /** * Biulder for {@link CarbonWriter} @@ -94,9 +95,9 @@ public class CarbonWriterBuilder { } /** - * Build a {@link CSVCarbonWriter}, which accepts row in CSV format + * Build a {@link CarbonWriter}, which accepts row in CSV format */ - public CarbonWriter buildWriterForCSVInput() throws IOException { + public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException { Objects.requireNonNull(schema, "schema should not be null"); Objects.requireNonNull(path, "path should not be null"); @@ -113,7 +114,7 @@ public class CarbonWriterBuilder { } /** - * Build a {@link AvroCarbonWriter}, which accepts Avro object + * Build a {@link CarbonWriter}, which accepts Avro object * @return * @throws IOException */ @@ -184,11 +185,13 @@ public class CarbonWriterBuilder { /** * Build a {@link CarbonLoadModel} */ - private CarbonLoadModel buildLoadModel(CarbonTable table) { + private CarbonLoadModel buildLoadModel(CarbonTable table) + throws InvalidLoadOptionException, IOException { Map<String, String> options = new HashMap<>(); if (sortColumns != null) { options.put("sort_columns", Strings.mkString(sortColumns, ",")); } - return DataLoadingUtil.buildCarbonLoadModelJava(table, options); + CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table); + return builder.build(options); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/66df989a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java index 531ec7c..aca2b2d 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java @@ -77,7 +77,7 @@ public class CSVCarbonWriterSuite { writer.write(new String[]{"robot" + i, String.valueOf(i), String.valueOf((double) i / 2)}); } writer.close(); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/66df989a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index 6316d84..bc7b042 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -34,10 +34,9 @@ import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} -import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer -import org.apache.carbondata.spark.util.DataLoadingUtil import org.apache.carbondata.streaming.segment.StreamSegment /** @@ -209,17 +208,15 @@ object StreamSinkFactory { segmentId: String): CarbonLoadModel = { val carbonProperty: CarbonProperties = CarbonProperties.getInstance() carbonProperty.addProperty("zookeeper.enable.lock", "false") - val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters) + val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava) optionsFinal.put("sort_scope", "no_sort") if (parameters.get("fileheader").isEmpty) { optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName) .asScala.map(_.getColName).mkString(",")) } val carbonLoadModel = new CarbonLoadModel() - DataLoadingUtil.buildCarbonLoadModel( - carbonTable, - carbonProperty, - parameters, + new CarbonLoadModelBuilder(carbonTable).build( + parameters.asJava, optionsFinal, carbonLoadModel, hadoopConf)