http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java deleted file mode 100644 index 195374a..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.reader; -/** - * Copyright 2005 Bytecode Pty Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; - -/** - * Interface for the ResultSetHelperService. Allows the user to define their own ResultSetHelper - * for use in the CSVWriter. - */ -public interface ResultSetHelper { - /** - * Returns the column Names from the ResultSet. - * - * @param rs - ResultSet - * @return - string array containing the column names. - * @throws SQLException - thrown by the ResultSet. - */ - String[] getColumnNames(ResultSet rs) throws SQLException; - - /** - * Returns the column values from the result set. - * - * @param rs - the ResultSet containing the values. - * @return String Array containing the values. - * @throws SQLException - thrown by the ResultSet. - * @throws IOException - thrown by the ResultSet. - */ - String[] getColumnValues(ResultSet rs) throws SQLException, IOException; - - /** - * Returns the column values from the result set with the values trimmed if desired. - * - * @param rs - the ResultSet containing the values. - * @param trim - values should have white spaces trimmed. - * @return String Array containing the values. - * @throws SQLException - thrown by the ResultSet. - * @throws IOException - thrown by the ResultSet. - */ - String[] getColumnValues(ResultSet rs, boolean trim) throws SQLException, IOException; - - /** - * Returns the column values from the result set with the values trimmed if desired. - * Also format the date and time columns based on the format strings passed in. - * - * @param rs - the ResultSet containing the values. - * @param trim - values should have white spaces trimmed. - * @param dateFormatString - format String for dates. - * @param timeFormatString - format String for timestamps. - * @return String Array containing the values. - * @throws SQLException - thrown by the ResultSet. - * @throws IOException - thrown by the ResultSet. - */ - String[] getColumnValues(ResultSet rs, boolean trim, String dateFormatString, - String timeFormatString) throws SQLException, IOException; -}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java deleted file mode 100644 index 3d15949..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java +++ /dev/null @@ -1,327 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.reader; -/** - * Copyright 2005 Bytecode Pty Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.io.Reader; -import java.math.BigDecimal; -import java.sql.Clob; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; - -/** - * helper class for processing JDBC ResultSet objects. - */ -public class ResultSetHelperService implements ResultSetHelper { - public static final int CLOBBUFFERSIZE = 2048; - - // note: we want to maintain compatibility with Java 5 VM's - // These types don't exist in Java 5 - static final int NVARCHAR = -9; - static final int NCHAR = -15; - static final int LONGNVARCHAR = -16; - static final int NCLOB = 2011; - - static final String DEFAULT_DATE_FORMAT = "dd-MMM-yyyy"; - static final String DEFAULT_TIMESTAMP_FORMAT = "dd-MMM-yyyy HH:mm:ss"; - - /** - * Default Constructor. - */ - public ResultSetHelperService() { - } - - private static String read(Clob c) throws SQLException, IOException { - StringBuilder sb = new StringBuilder((int) c.length()); - Reader r = c.getCharacterStream(); - try { - char[] cbuf = new char[CLOBBUFFERSIZE]; - int n; - while ((n = r.read(cbuf, 0, cbuf.length)) != -1) { - sb.append(cbuf, 0, n); - } - } finally { - r.close(); - } - return sb.toString(); - - } - - /** - * Returns the column names from the result set. - * - * @param rs - ResultSet - * @return - a string array containing the column names. - * @throws SQLException - thrown by the result set. - */ - public String[] getColumnNames(ResultSet rs) throws SQLException { - List<String> names = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - //CHECKSTYLE:OFF Approval No:Approval-V1R2C10_010 - ResultSetMetaData metadata = rs.getMetaData(); - //CHECKSTYLE:ON - for (int i = 0; i < metadata.getColumnCount(); i++) { - names.add(metadata.getColumnName(i + 1)); - } - - String[] nameArray = new String[names.size()]; - return names.toArray(nameArray); - } - - /** - * Get all the column values from the result set. - * - * @param rs - the ResultSet containing the values. - * @return - String array containing all the column values. - * @throws SQLException - thrown by the result set. - * @throws IOException - thrown by the result set. - */ - public String[] getColumnValues(ResultSet rs) throws SQLException, IOException { - return this.getColumnValues(rs, false, DEFAULT_DATE_FORMAT, DEFAULT_TIMESTAMP_FORMAT); - } - - /** - * Get all the column values from the result set. - * - * @param rs - the ResultSet containing the values. - * @param trim - values should have white spaces trimmed. - * @return - String array containing all the column values. - * @throws SQLException - thrown by the result set. - * @throws IOException - thrown by the result set. - */ - public String[] getColumnValues(ResultSet rs, boolean trim) throws SQLException, IOException { - return this.getColumnValues(rs, trim, DEFAULT_DATE_FORMAT, DEFAULT_TIMESTAMP_FORMAT); - } - - /** - * Get all the column values from the result set. - * - * @param rs - the ResultSet containing the values. - * @param trim - values should have white spaces trimmed. - * @param dateFormatString - format String for dates. - * @param timeFormatString - format String for timestamps. - * @return - String array containing all the column values. - * @throws SQLException - thrown by the result set. - * @throws IOException - thrown by the result set. - */ - public String[] getColumnValues(ResultSet rs, boolean trim, String dateFormatString, - String timeFormatString) throws SQLException, IOException { - List<String> values = new ArrayList<>(); - ResultSetMetaData metadata = rs.getMetaData(); - - for (int i = 0; i < metadata.getColumnCount(); i++) { - values.add(getColumnValue(rs, metadata.getColumnType(i + 1), i + 1, trim, dateFormatString, - timeFormatString)); - } - - String[] valueArray = new String[values.size()]; - return values.toArray(valueArray); - } - - /** - * changes an object to a String. - * - * @param obj - Object to format. - * @return - String value of an object or empty string if the object is null. - */ - protected String handleObject(Object obj) { - return obj == null ? "" : String.valueOf(obj); - } - - /** - * changes a BigDecimal to String. - * - * @param decimal - BigDecimal to format - * @return String representation of a BigDecimal or empty string if null - */ - protected String handleBigDecimal(BigDecimal decimal) { - return decimal == null ? "" : decimal.toString(); - } - - /** - * Retrieves the string representation of an Long value from the result set. - * - * @param rs - Result set containing the data. - * @param columnIndex - index to the column of the long. - * @return - the string representation of the long - * @throws SQLException - thrown by the result set on error. - */ - protected String handleLong(ResultSet rs, int columnIndex) throws SQLException { - long lv = rs.getLong(columnIndex); - return rs.wasNull() ? "" : Long.toString(lv); - } - - /** - * Retrieves the string representation of an Integer value from the result set. - * - * @param rs - Result set containing the data. - * @param columnIndex - index to the column of the integer. - * @return - string representation of the Integer. - * @throws SQLException - returned from the result set on error. - */ - protected String handleInteger(ResultSet rs, int columnIndex) throws SQLException { - int i = rs.getInt(columnIndex); - return rs.wasNull() ? "" : Integer.toString(i); - } - - /** - * Retrieves a date from the result set. - * - * @param rs - Result set containing the data - * @param columnIndex - index to the column of the date - * @param dateFormatString - format for the date - * @return - formatted date. - * @throws SQLException - returned from the result set on error. - */ - protected String handleDate(ResultSet rs, int columnIndex, String dateFormatString) - throws SQLException { - java.sql.Date date = rs.getDate(columnIndex); - String value = null; - if (date != null) { - SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString); - value = dateFormat.format(date); - } - return value; - } - - /** - * Return time read from ResultSet. - * - * @param time time read from ResultSet - * @return String version of time or null if time is null. - */ - protected String handleTime(Time time) { - return time == null ? null : time.toString(); - } - - /** - * The formatted timestamp. - * - * @param timestamp - timestamp read from resultset - * @param timestampFormatString - format string - * @return - formatted time stamp. - */ - protected String handleTimestamp(Timestamp timestamp, String timestampFormatString) { - SimpleDateFormat timeFormat = new SimpleDateFormat(timestampFormatString); - return timestamp == null ? null : timeFormat.format(timestamp); - } - - private String getColumnValue(ResultSet rs, int colType, int colIndex, boolean trim, - String dateFormatString, String timestampFormatString) throws SQLException, IOException { - - String value = ""; - - switch (colType) { - case Types.BIT: - case Types.JAVA_OBJECT: - value = handleObject(rs.getObject(colIndex)); - break; - case Types.BOOLEAN: - boolean b = rs.getBoolean(colIndex); - value = Boolean.valueOf(b).toString(); - break; - case NCLOB: // todo : use rs.getNClob - case Types.CLOB: - Clob c = rs.getClob(colIndex); - if (c != null) { - value = read(c); - } - break; - case Types.BIGINT: - value = handleLong(rs, colIndex); - break; - case Types.DECIMAL: - case Types.DOUBLE: - case Types.FLOAT: - case Types.REAL: - case Types.NUMERIC: - value = handleBigDecimal(rs.getBigDecimal(colIndex)); - break; - case Types.INTEGER: - case Types.TINYINT: - case Types.SMALLINT: - value = handleInteger(rs, colIndex); - break; - case Types.DATE: - value = handleDate(rs, colIndex, dateFormatString); - break; - case Types.TIME: - value = handleTime(rs.getTime(colIndex)); - break; - case Types.TIMESTAMP: - value = handleTimestamp(rs.getTimestamp(colIndex), timestampFormatString); - break; - case NVARCHAR: // todo : use rs.getNString - case NCHAR: // todo : use rs.getNString - case LONGNVARCHAR: // todo : use rs.getNString - case Types.LONGVARCHAR: - case Types.VARCHAR: - case Types.CHAR: - value = getColumnValue(rs, colIndex, trim); - break; - default: - value = ""; - } - - if (value == null) { - value = ""; - } - - return value; - } - - /** - * @param rs - * @param colIndex - * @param trim - * @return - * @throws SQLException - */ - public String getColumnValue(ResultSet rs, int colIndex, boolean trim) throws SQLException { - String value; - String columnValue = rs.getString(colIndex); - if (trim && columnValue != null) { - value = columnValue.trim(); - } else { - value = columnValue; - } - return value; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala deleted file mode 100644 index d6932cd..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.carbondata.spark.rdd - -import scala.collection.JavaConverters._ - -import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.command.Partitioner - -import org.apache.carbondata.common.logging.impl.StandardLogService -import org.apache.carbondata.spark.PartitionResult -import org.apache.carbondata.spark.partition.api.impl.CSVFilePartitioner -import org.apache.carbondata.spark.splits.TableSplit -import org.apache.carbondata.spark.util.CarbonQueryUtil - - -class CarbonSparkRawDataPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit) - extends Partition { - - override val index: Int = idx - val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit) - - override def hashCode(): Int = 41 * (41 + rddId) + idx -} - -/** - * This RDD class is used to create splits the fact csv store to various partitions as per - * configuration and compute each split in the respective node located in the server. - * . - */ -class CarbonDataPartitionRDD[K, V]( - sc: SparkContext, - results: PartitionResult[K, V], - databaseName: String, - tableName: String, - sourcePath: String, - targetFolder: String, - requiredColumns: Array[String], - headers: String, - delimiter: String, - quoteChar: String, - escapeChar: String, - multiLine: Boolean, - partitioner: Partitioner) - extends RDD[(K, V)](sc, Nil) with Logging { - - sc.setLocalProperty("spark.scheduler.pool", "DDL") - - override def getPartitions: Array[Partition] = { - val splits = CarbonQueryUtil - .getPartitionSplits(sourcePath, partitioner.nodeList, partitioner.partitionCount) - splits.zipWithIndex.map {s => - new CarbonSparkRawDataPartition(id, s._2, s._1) - } - } - - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { - new Iterator[(K, V)] { - val split = theSplit.asInstanceOf[CarbonSparkRawDataPartition] - StandardLogService - .setThreadName(split.serializableHadoopSplit.value.getPartition.getUniqueID, null) - logInfo("Input split: " + split.serializableHadoopSplit.value) - - val csvPart = new CSVFilePartitioner(partitioner.partitionClass, sourcePath) - csvPart.splitFile(databaseName, tableName, - split.serializableHadoopSplit.value.getPartition.getFilesPath, targetFolder, - partitioner.nodeList.toList.asJava, partitioner.partitionCount, partitioner.partitionColumn, - requiredColumns, delimiter, quoteChar, headers, escapeChar, multiLine) - - var finished = false - - override def hasNext: Boolean = { - if (!finished) { - finished = true - finished - } - else { - !finished - } - } - - override def next(): (K, V) = { - results.getKey(partitioner.partitionCount, csvPart.isPartialSuccess) - } - } - } - - override def getPreferredLocations(split: Partition): Seq[String] = { - val theSplit = split.asInstanceOf[CarbonSparkRawDataPartition] - val s = theSplit.serializableHadoopSplit.value.getLocations.asScala - logInfo("Host Name : " + s.head + s.length) - s - } -} - http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 3118d3f..964c955 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -62,37 +62,6 @@ object CarbonDataRDDFactory extends Logging { val logger = LogServiceFactory.getLogService(CarbonDataRDDFactory.getClass.getName) - // scalastyle:off - def partitionCarbonData(sc: SparkContext, - databaseName: String, - tableName: String, - sourcePath: String, - targetFolder: String, - requiredColumns: Array[String], - headers: String, - delimiter: String, - quoteChar: String, - escapeChar: String, - multiLine: Boolean, - partitioner: Partitioner): String = { - // scalastyle:on - val status = new - CarbonDataPartitionRDD(sc, new PartitionResultImpl(), databaseName, tableName, sourcePath, - targetFolder, requiredColumns, headers, delimiter, quoteChar, escapeChar, multiLine, - partitioner - ).collect - CarbonDataProcessorUtil - .renameBadRecordsFromInProgressToNormal("partition/" + databaseName + '/' + tableName) - var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS - status.foreach { - case (key, value) => - if (value) { - loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS - } - } - loadStatus - } - def mergeCarbonData( sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 59db05b..a7d807e 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.control.Breaks.{break, breakable} +import au.com.bytecode.opencsv.CSVReader import org.apache.commons.lang3.{ArrayUtils, StringUtils} import org.apache.spark._ import org.apache.spark.rdd.RDD @@ -38,9 +39,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastorage.store.impl.FileFactory import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage} -import org.apache.carbondata.spark.load.CarbonLoaderUtil -import org.apache.carbondata.spark.load.CarbonLoadModel -import org.apache.carbondata.spark.partition.reader.{CSVParser, CSVReader} +import org.apache.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel} import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask} import org.apache.carbondata.spark.util.GlobalDictionaryUtil import org.apache.carbondata.spark.util.GlobalDictionaryUtil._ @@ -500,7 +499,7 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, inputStream = FileFactory.getDataInputStream(preDefDictFilePath, FileFactory.getFileType(preDefDictFilePath)) csvReader = new CSVReader(new InputStreamReader(inputStream, Charset.defaultCharset), - CSVReader.DEFAULT_SKIP_LINES, new CSVParser(carbonLoadModel.getCsvDelimiter.charAt(0))) + carbonLoadModel.getCsvDelimiter.charAt(0)) // read the column data to list iterator colDictData = csvReader.readAll.iterator } catch { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 818aa4a..e714520 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -49,7 +49,6 @@ import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.spark.CarbonSparkFactory import org.apache.carbondata.spark.load.CarbonLoaderUtil import org.apache.carbondata.spark.load.CarbonLoadModel -import org.apache.carbondata.spark.partition.reader.CSVWriter import org.apache.carbondata.spark.rdd._ /** @@ -58,6 +57,16 @@ import org.apache.carbondata.spark.rdd._ object GlobalDictionaryUtil extends Logging { /** + * The default separator to use if none is supplied to the constructor. + */ + val DEFAULT_SEPARATOR: Char = ',' + /** + * The default quote character to use if none is supplied to the + * constructor. + */ + val DEFAULT_QUOTE_CHARACTER: Char = '"' + + /** * find columns which need to generate global dictionary. * * @param dimensions dimension list of schema @@ -354,7 +363,7 @@ object GlobalDictionaryUtil extends Logging { }) .option("delimiter", { if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) { - "" + CSVWriter.DEFAULT_SEPARATOR + "" + DEFAULT_SEPARATOR } else { carbonLoadModel.getCsvDelimiter @@ -367,7 +376,7 @@ object GlobalDictionaryUtil extends Logging { .option("codec", "gzip") .option("quote", { if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) { - "" + CSVWriter. DEFAULT_QUOTE_CHARACTER + "" + DEFAULT_QUOTE_CHARACTER } else { carbonLoadModel.getQuoteChar @@ -592,7 +601,7 @@ object GlobalDictionaryUtil extends Logging { */ private def parseRecord(x: String, accum: Accumulator[Int], csvFileColumns: Array[String]) : (String, String) = { - val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR) + val tokens = x.split("" + DEFAULT_SEPARATOR) var columnName: String = "" var value: String = "" // such as "," , "", throw ex @@ -713,7 +722,7 @@ object GlobalDictionaryUtil extends Logging { if (null != readLine) { val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) { - "" + CSVWriter.DEFAULT_SEPARATOR + "" + DEFAULT_SEPARATOR } else { carbonLoadModel.getCsvDelimiter } @@ -756,7 +765,7 @@ object GlobalDictionaryUtil extends Logging { df.columns } else { - carbonLoadModel.getCsvHeader.split("" + CSVWriter.DEFAULT_SEPARATOR) + carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR) } headers = headers.map(headerName => headerName.trim) val colDictFilePath = carbonLoadModel.getColDictFilePath @@ -820,7 +829,7 @@ object GlobalDictionaryUtil extends Logging { var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) { getHeaderFormFactFile(carbonLoadModel) } else { - carbonLoadModel.getCsvHeader.toLowerCase.split("" + CSVWriter.DEFAULT_SEPARATOR) + carbonLoadModel.getCsvHeader.toLowerCase.split("" + DEFAULT_SEPARATOR) } headers = headers.map(headerName => headerName.trim) // prune columns according to the CSV file header, dimension columns http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala index 972de05..c9fdd6b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala @@ -28,13 +28,12 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.ExtractPythonUDFs -import org.apache.spark.sql.execution.command.PartitionData import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, PreWriteCheck} import org.apache.spark.sql.hive._ import org.apache.spark.sql.optimizer.CarbonOptimizer import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder} +import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} import org.apache.carbondata.spark.rdd.CarbonDataFrameRDD @@ -153,45 +152,6 @@ object CarbonContext { @transient val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName) - /** - * @param databaseName - Database Name - * @param tableName - Table Name - * @param factPath - Raw CSV data path - * @param targetPath - Target path where the file will be split as per partition - * @param delimiter - default file delimiter is comma(,) - * @param quoteChar - default quote character used in Raw CSV file, Default quote - * character is double quote(") - * @param fileHeader - Header should be passed if not available in Raw CSV File, else pass null, - * Header will be read from CSV - * @param escapeChar - This parameter by default will be null, there wont be any validation if - * default escape character(\) is found on the RawCSV file - * @param multiLine - This parameter will be check for end of quote character if escape character - * & quote character is set. - * if set as false, it will check for end of quote character within the line - * and skips only 1 line if end of quote not found - * if set as true, By default it will check for 10000 characters in multiple - * lines for end of quote & skip all lines if end of quote not found. - */ - final def partitionData( - databaseName: String = null, - tableName: String, - factPath: String, - targetPath: String, - delimiter: String = ",", - quoteChar: String = "\"", - fileHeader: String = null, - escapeChar: String = null, - multiLine: Boolean = false)(hiveContext: HiveContext): String = { - updateCarbonPorpertiesPath(hiveContext) - var databaseNameLocal = databaseName - if (databaseNameLocal == null) { - databaseNameLocal = "default" - } - val partitionDataClass = PartitionData(databaseName, tableName, factPath, targetPath, delimiter, - quoteChar, fileHeader, escapeChar, multiLine) - partitionDataClass.run(hiveContext) - partitionDataClass.partitionStatus - } final def updateCarbonPorpertiesPath(hiveContext: HiveContext) { val carbonPropertiesFilePath = hiveContext.getConf("carbon.properties.filepath", null) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index e4a79ab..ac48624 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -1139,28 +1139,6 @@ private[sql] case class LoadTable( carbonLoadModel.setColDictFilePath(columnDict) carbonLoadModel.setDirectLoad(true) } - else { - val fileType = FileFactory.getFileType(partitionLocation) - if (FileFactory.isFileExist(partitionLocation, fileType)) { - val file = FileFactory.getCarbonFile(partitionLocation, fileType) - CarbonUtil.deleteFoldersAndFiles(file) - } - partitionLocation += System.currentTimeMillis() - FileFactory.mkdirs(partitionLocation, fileType) - LOGGER.info("Initiating Data Partitioning for the Table : (" + - dbName + "." + tableName + ")") - partitionStatus = CarbonContext.partitionData( - dbName, - tableName, - factPath, - partitionLocation, - delimiter, - quoteChar, - fileHeader, - escapeChar, multiLine)(sqlContext.asInstanceOf[HiveContext]) - carbonLoadModel.setFactFilePath(FileUtils.getPaths(partitionLocation)) - carbonLoadModel.setColDictFilePath(columnDict) - } GlobalDictionaryUtil .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath) CarbonDataRDDFactory @@ -1213,34 +1191,6 @@ private[sql] case class LoadTable( } -private[sql] case class PartitionData(databaseName: String, tableName: String, factPath: String, - targetPath: String, delimiter: String, quoteChar: String, - fileHeader: String, escapeChar: String, multiLine: Boolean) - extends RunnableCommand { - - var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS - - def run(sqlContext: SQLContext): Seq[Row] = { - val identifier = TableIdentifier(tableName, Option(databaseName)) - val relation = CarbonEnv.getInstance(sqlContext) - .carbonCatalog.lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation] - val dimNames = relation.tableMeta.carbonTable - .getDimensionByTableName(tableName).asScala.map(_.getColName) - val msrNames = relation.tableMeta.carbonTable - .getDimensionByTableName(tableName).asScala.map(_.getColName) - val targetFolder = targetPath - partitionStatus = CarbonDataRDDFactory.partitionCarbonData( - sqlContext.sparkContext, databaseName, - tableName, factPath, targetFolder, (dimNames ++ msrNames).toArray - , fileHeader, delimiter, - quoteChar, escapeChar, multiLine, relation.tableMeta.partitioner) - if (partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) { - logInfo("Bad Record Found while partitioning data") - } - Seq.empty - } -} - private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: Option[String], tableName: String) extends RunnableCommand {