adapt data with header for all dictionary use DEFAULT_CHARSET
remove listFiles Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/51e4c11e Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/51e4c11e Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/51e4c11e Branch: refs/heads/branch-0.1 Commit: 51e4c11e40611494a456a806a86d57d9348b5b4f Parents: 854b75e Author: foryou2030 <foryou2...@126.com> Authored: Mon Aug 22 18:00:00 2016 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Sep 22 09:29:22 2016 +0530 ---------------------------------------------------------------------- .../apache/carbondata/core/util/CarbonUtil.java | 30 ++++++++++++ .../src/main/resources/datawithoutheader.csv | 10 ---- .../examples/AllDictionaryExample.scala | 12 ++--- .../examples/util/AllDictionaryUtil.scala | 2 +- .../spark/util/GlobalDictionaryUtil.scala | 50 +++++++++++++------- .../processing/csvload/GraphExecutionUtil.java | 29 +----------- 6 files changed, 71 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 2460f6e..df538e0 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -20,14 +20,17 @@ package org.apache.carbondata.core.util; +import java.io.BufferedReader; import java.io.Closeable; import java.io.DataInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.charset.Charset; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; @@ -1385,5 +1388,32 @@ public final class CarbonUtil { } return dictionaryOneChunkSize; } + + /** + * @param csvFilePath + * @return + */ + public static String readHeader(String csvFilePath) { + + DataInputStream fileReader = null; + BufferedReader bufferedReader = null; + String readLine = null; + + try { + fileReader = + FileFactory.getDataInputStream(csvFilePath, FileFactory.getFileType(csvFilePath)); + bufferedReader = new BufferedReader(new InputStreamReader(fileReader, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + readLine = bufferedReader.readLine(); + + } catch (FileNotFoundException e) { + LOGGER.error(e, "CSV Input File not found " + e.getMessage()); + } catch (IOException e) { + LOGGER.error(e, "Not able to read CSV input File " + e.getMessage()); + } finally { + CarbonUtil.closeStreams(fileReader, bufferedReader); + } + return readLine; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/examples/src/main/resources/datawithoutheader.csv ---------------------------------------------------------------------- diff --git a/examples/src/main/resources/datawithoutheader.csv b/examples/src/main/resources/datawithoutheader.csv deleted file mode 100644 index df2b945..0000000 --- a/examples/src/main/resources/datawithoutheader.csv +++ /dev/null @@ -1,10 +0,0 @@ -1,2015/7/23,china,aaa1,phone197,ASD69643,15000 -2,2015/7/24,china,aaa2,phone756,ASD42892,15001 -3,2015/7/25,china,aaa3,phone1904,ASD37014,15002 -4,2015/7/26,china,aaa4,phone2435,ASD66902,15003 -5,2015/7/27,china,aaa5,phone2441,ASD90633,15004 -6,2015/7/28,china,aaa6,phone294,ASD59961,15005 -7,2015/7/29,china,aaa7,phone610,ASD14875,15006 -8,2015/7/30,china,aaa8,phone1848,ASD57308,15007 -9,2015/7/18,china,aaa9,phone706,ASD86717,15008 -10,2015/7/19,usa,aaa10,phone685,ASD30505,15009 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala index 195c7a6..a2b72e3 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala @@ -23,11 +23,10 @@ import org.apache.carbondata.examples.util.{AllDictionaryUtil, InitForExamples} object AllDictionaryExample { def main(args: Array[String]) { val cc = InitForExamples.createCarbonContext("CarbonExample") - val testData = InitForExamples.currentPath + "/src/main/resources/datawithoutheader.csv" - val csvHeader = "id,date,country,name,phonetype,serialname,salary" + val testData = InitForExamples.currentPath + "/src/main/resources/data.csv" + val csvHeader = "ID,date,country,name,phonetype,serialname,salary" val dictCol = "|date|country|name|phonetype|serialname|" - val allDictFile = InitForExamples.currentPath + - "/src/main/resources/datawithoutheader.dictionary" + val allDictFile = InitForExamples.currentPath + "/src/main/resources/data.dictionary" // extract all dictionary files from source data AllDictionaryUtil.extractDictionary(cc.sparkContext, testData, allDictFile, csvHeader, dictCol) @@ -41,13 +40,12 @@ object AllDictionaryExample { CREATE TABLE IF NOT EXISTS t3 (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Int) - STORED BY 'org.apache.carbondata.format' + STORED BY 'carbondata' """) cc.sql(s""" LOAD DATA LOCAL INPATH '$testData' into table t3 - options('FILEHEADER'='id,date,country,name,phonetype,serialname,salary', - 'ALL_DICTIONARY_PATH'='$allDictFile') + options('ALL_DICTIONARY_PATH'='$allDictFile') """) cc.sql(""" http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala b/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala index 3e8df71..bd625f3 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala @@ -35,7 +35,7 @@ object AllDictionaryUtil extends Logging{ val fileHeaderArr = fileHeader.split(",") val isDictCol = new Array[Boolean](fileHeaderArr.length) for (i <- 0 until fileHeaderArr.length) { - if (dictCol.contains("|" + fileHeaderArr(i) + "|")) { + if (dictCol.contains("|" + fileHeaderArr(i).toLowerCase() + "|")) { isDictCol(i) = true } else { isDictCol(i) = false http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index ddb596b..b96a826 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -43,7 +43,7 @@ import org.apache.carbondata.core.carbon.path.CarbonStorePath import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastorage.store.impl.FileFactory import org.apache.carbondata.core.reader.CarbonDictionaryReader -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.writer.CarbonDictionaryWriter import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.spark.CarbonSparkFactory @@ -583,8 +583,8 @@ object GlobalDictionaryUtil extends Logging { * @return allDictionaryRdd */ private def readAllDictionaryFiles(sqlContext: SQLContext, - csvFileColumns: Array[String], - requireColumns: Array[String], + csvFileColumns: Array[String], + requireColumns: Array[String], allDictionaryPath: String) = { var allDictionaryRdd: RDD[(String, Iterable[String])] = null try { @@ -651,6 +651,31 @@ object GlobalDictionaryUtil extends Logging { } /** + * get file headers from fact file + * + * @param carbonLoadModel + * @return headers + */ + private def getHeaderFormFactFile(carbonLoadModel: CarbonLoadModel): Array[String] = { + var headers: Array[String] = null + val factFile: String = carbonLoadModel.getFactFilePath.split(",")(0) + val readLine = CarbonUtil.readHeader(factFile) + + if (null != readLine) { + val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) { + "" + CSVWriter.DEFAULT_SEPARATOR + } else { + carbonLoadModel.getCsvDelimiter + } + headers = readLine.toLowerCase().split(delimiter); + } else { + logError("Not found file header! Please set fileheader") + throw new IOException("Failed to get file header") + } + headers + } + + /** * generate global dictionary with SQLContext and CarbonLoadModel * * @param sqlContext sql context @@ -736,27 +761,20 @@ object GlobalDictionaryUtil extends Logging { logInfo("Generate global dictionary from all dictionary files!") val isNonempty = validateAllDictionaryPath(allDictionaryPath) if(isNonempty) { - // fill the map[columnIndex -> columnName] - var fileHeaders : Array[String] = null - if(!StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) { - val splitColumns = carbonLoadModel.getCsvHeader.split("" + CSVWriter.DEFAULT_SEPARATOR) - val fileHeadersArr = new ArrayBuffer[String]() - for(i <- 0 until splitColumns.length) { - fileHeadersArr += splitColumns(i).trim.toLowerCase() - } - fileHeaders = fileHeadersArr.toArray + var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) { + getHeaderFormFactFile(carbonLoadModel) } else { - logError("Not found file header! Please set fileheader") - throw new IOException("Failed to get file header") + carbonLoadModel.getCsvHeader.toLowerCase.split("" + CSVWriter.DEFAULT_SEPARATOR) } + headers = headers.map(headerName => headerName.trim) // prune columns according to the CSV file header, dimension columns val (requireDimension, requireColumnNames) = - pruneDimensions(dimensions, fileHeaders, fileHeaders) + pruneDimensions(dimensions, headers, headers) if (requireDimension.nonEmpty) { val model = createDictionaryLoadModel(carbonLoadModel, table, requireDimension, hdfsLocation, dictfolderPath, false) // read local dictionary file, and group by key - val allDictionaryRdd = readAllDictionaryFiles(sqlContext, fileHeaders, + val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers, requireColumnNames, allDictionaryPath) // read exist dictionary and combine val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java index 2a35002..6d82bcd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java @@ -254,40 +254,13 @@ public final class GraphExecutionUtil { /** * @param csvFilePath - * @return - */ - private static String readCSVFile(String csvFilePath) { - - DataInputStream fileReader = null; - BufferedReader bufferedReader = null; - String readLine = null; - - try { - fileReader = - FileFactory.getDataInputStream(csvFilePath, FileFactory.getFileType(csvFilePath)); - bufferedReader = - new BufferedReader(new InputStreamReader(fileReader, Charset.defaultCharset())); - readLine = bufferedReader.readLine(); - - } catch (FileNotFoundException e) { - LOGGER.error(e, "CSV Input File not found " + e.getMessage()); - } catch (IOException e) { - LOGGER.error(e, "Not able to read CSV input File " + e.getMessage()); - } finally { - CarbonUtil.closeStreams(fileReader, bufferedReader); - } - return readLine; - } - - /** - * @param csvFilePath * @param columnNames * @return */ public static boolean checkCSVAndRequestedTableColumns(String csvFilePath, String[] columnNames, String delimiter) { - String readLine = readCSVFile(csvFilePath); + String readLine = CarbonUtil.readHeader(csvFilePath); if (null != readLine) { delimiter = CarbonUtil.delimiterConverter(delimiter);