[12/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java -- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java new file mode 100644 index 000..836a757 --- /dev/null +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java @@ -0,0 +1,695 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.spark.merger; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.carbon.path.CarbonStorePath; +import org.apache.carbondata.core.carbon.path.CarbonTablePath; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; +import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastorage.store.impl.FileFactory; +import org.apache.carbondata.core.load.LoadMetadataDetails; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.lcm.locks.ICarbonLock; +import org.apache.carbondata.lcm.status.SegmentStatusManager; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.spark.load.CarbonLoaderUtil; + +/** + * utility class for load merging. + */ +public final class CarbonDataMergerUtil { + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName()); + + private CarbonDataMergerUtil() { + + } + + /** + * Returns the size of all the carbondata files present in the segment. + * @param carbonFile + * @return + */ + private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) { +long factSize = 0; + +// carbon data file case. +CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile file) { +return CarbonTablePath.isCarbonDataFile(file.getName()); + } +}); + +for (CarbonFile fact : factFile) { + factSize += fact.getSize(); +} + +return factSize; + } + + /** + * To check whether the merge property is enabled or not. + * + * @return + */ + + public static boolean checkIfAutoLoadMergingRequired() { +// load merge is not supported as per new store format +// moving the load merge check in early to avoid unnecessary load listing causing IOException +// check whether carbons segment merging operation is enabled or not. +// default will be false. +String isLoadMergeEnabled = CarbonProperties.getInstance() +.getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, +CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE); +if (isLoadMergeEnabled.equalsIgnoreCase("false")) { + return false; +} +return true; + } + + /** + * Form the Name of the New Merge Folder + * + * @param segmentsToBeMergedList + * @return + */ + public static String getMergedLoadName(List segmentsToBeMergedList) { +String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName(); +if (firstSegmentName.contains(".")) { + String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf(".")); + String afterDecimal = firstSegme
[04/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala -- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala deleted file mode 100644 index c91cec0..000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ /dev/null @@ -1,558 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.rdd - -import java.io.{DataInputStream, InputStreamReader} -import java.nio.charset.Charset -import java.text.SimpleDateFormat -import java.util.regex.Pattern - -import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.util.control.Breaks.{break, breakable} - -import au.com.bytecode.opencsv.CSVReader -import org.apache.commons.lang3.{ArrayUtils, StringUtils} -import org.apache.spark._ -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row - -import org.apache.carbondata.common.factory.CarbonCommonFactory -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier} -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastorage.store.impl.FileFactory -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} -import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage} -import org.apache.carbondata.processing.model.CarbonLoadModel -import org.apache.carbondata.spark.load.CarbonLoaderUtil -import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask} -import org.apache.carbondata.spark.util.CarbonScalaUtil -import org.apache.carbondata.spark.util.GlobalDictionaryUtil -import org.apache.carbondata.spark.util.GlobalDictionaryUtil._ - -/** - * A partitioner partition by column. - * - * @constructor create a partitioner - * @param numParts the number of partitions - */ -class ColumnPartitioner(numParts: Int) extends Partitioner { - override def numPartitions: Int = numParts - - override def getPartition(key: Any): Int = key.asInstanceOf[Int] -} - -trait GenericParser { - val dimension: CarbonDimension - - def addChild(child: GenericParser): Unit - - def parseString(input: String): Unit -} - -case class DictionaryStats(distinctValues: java.util.List[String], -dictWriteTime: Long, sortIndexWriteTime: Long) - -case class PrimitiveParser(dimension: CarbonDimension, -setOpt: Option[HashSet[String]]) extends GenericParser { - val (hasDictEncoding, set: HashSet[String]) = setOpt match { -case None => (false, new HashSet[String]) -case Some(x) => (true, x) - } - - def addChild(child: GenericParser): Unit = { - } - - def parseString(input: String): Unit = { -if (hasDictEncoding && input != null) { - set.add(input) -} - } -} - -case class ArrayParser(dimension: CarbonDimension, format: DataFormat) extends GenericParser { - var children: GenericParser = _ - - def addChild(child: GenericParser): Unit = { -children = child - } - - def parseString(input: String): Unit = { -if (StringUtils.isNotEmpty(input)) { - val splits = format.getSplits(input) - if (ArrayUtils.isNotEmpty(splits)) { -splits.foreach { s => - children.parseString(s) -} - } -} - } -} - -case class StructParser(dimension: CarbonDimension, -format: DataFormat) extends GenericParser { - val children = new ArrayBuffer[GenericParser] - - def addChild(child: GenericParser): Unit = { -children += child - } - - def parseString(input: String): Unit = { -if (StringUtils.isNotEmpty(input)) { - val splits = format.getSplits(input) - val len = Math.min(children.length, splits.length) - for (i <- 0 until len) { -chi
[06/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java -- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java deleted file mode 100644 index 61639d3..000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api; - -import java.io.Serializable; -import java.util.List; - -public interface Partition extends Serializable { - /** - * unique identification for the partition in the cluster. - */ - String getUniqueID(); - - /** - * File path for the raw data represented by this partition - */ - String getFilePath(); - - /** - * result - * - * @return - */ - List getFilesPath(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java -- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java deleted file mode 100644 index bc6e54f..000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api.impl; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Properties; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; - -public final class DataPartitionerProperties { - private static final LogService LOGGER = - LogServiceFactory.getLogService(DataPartitionerProperties.class.getName()); - - private static DataPartitionerProperties instance; - - private Properties properties; - - private DataPartitionerProperties() { -properties = loadProperties(); - } - - public static DataPartitionerProperties getInstance() { -if (instance == null) { - instance = new DataPartitionerProperties(); -} -return instance; - } - - public String getValue(String key, String defaultVal) { -return properties.getProperty(key, defaultVal); - } - - public String getValue(String key) { -return properties.getProperty(key); - } - - /** - * Read the properties from CSVFilePartitioner.properties - */ - private Properties loadProperties() { -Properties props = new Properties(); - -File file = new File("DataPartitioner.properties"); -FileInputStream fis = null; -try { - if (file.exists()) { -fis = new FileInputStream(file); - -props.load(fis); - } -} catch (Exception e) { - LOGGER - .error(e, e.getMessage()); -} finally { - if (null != fis) { -try { - fis.close(); -}
[01/14] incubator-carbondata git commit: rebase
Repository: incubator-carbondata Updated Branches: refs/heads/master 567fa5131 -> d94b99f36 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala -- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala deleted file mode 100644 index e5264ca..000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive - -import java.net.{InetAddress, InterfaceAddress, NetworkInterface} - -import scala.collection.JavaConverters._ - -import org.apache.spark.SparkContext -import org.apache.spark.sql.{CarbonContext, CarbonEnv} - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.carbon.datastore.block.Distributable -import org.apache.carbondata.spark.load.CarbonLoaderUtil - -object DistributionUtil { - @transient - val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName) - - /* - * This method will return the list of executers in the cluster. - * For this we take the memory status of all node with getExecutorMemoryStatus - * and extract the keys. getExecutorMemoryStatus also returns the driver memory also - * In client mode driver will run in the localhost - * There can be executor spawn in same drive node. So we can remove first occurance of - * localhost for retriving executor list - */ - def getNodeList(sparkContext: SparkContext): Array[String] = { -val arr = sparkContext.getExecutorMemoryStatus.map { kv => - kv._1.split(":")(0) -}.toSeq -val localhostIPs = getLocalhostIPs -val selectedLocalIPList = localhostIPs.filter(arr.contains(_)) - -val nodelist: List[String] = withoutDriverIP(arr.toList)(selectedLocalIPList.contains(_)) -val masterMode = sparkContext.getConf.get("spark.master") -if (nodelist.nonEmpty) { - // Specific for Yarn Mode - if ("yarn-cluster".equals(masterMode) || "yarn-client".equals(masterMode)) { -val nodeNames = nodelist.map { x => - val addr = InetAddress.getByName(x) - addr.getHostName -} -nodeNames.toArray - } else { -// For Standalone cluster, node IPs will be returned. -nodelist.toArray - } -} else { - Seq(InetAddress.getLocalHost.getHostName).toArray -} - } - - private def getLocalhostIPs = { -val iface = NetworkInterface.getNetworkInterfaces -var addresses: List[InterfaceAddress] = List.empty -while (iface.hasMoreElements) { - addresses = iface.nextElement().getInterfaceAddresses.asScala.toList ++ addresses -} -val inets = addresses.map(_.getAddress.getHostAddress) -inets - } - - /* - * This method will remove the first occurance of any of the ips mentioned in the predicate. - * Eg: l = List(Master,slave1,Master,slave2,slave3) is the list of nodes where first Master is - * the Driver node. - * this method withoutFirst (l)(x=> x == 'Master') will remove the first occurance of Master. - * The resulting List containt List(slave1,Master,slave2,slave3) - */ - def withoutDriverIP[A](xs: List[A])(p: A => Boolean): List[A] = { -xs match { - case x :: rest => if (p(x)) { -rest - } else { -x :: withoutDriverIP(rest)(p) - } - case _ => Nil -} - } - - /** - * - * Checking if the existing executors is greater than configured executors, if yes - * returning configured executors. - * - * @param blockList - * @param sparkContext - * @return - */ - def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], - sparkContext: SparkContext): Seq[String] = { -val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava) -ensureExecutorsByNumberAndGetNodeList(nodeMapping.size(), sparkContext) - } - - def ensureExecutorsByNumberAndGetNodeList(nodesOfData: In
[05/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala -- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala deleted file mode 100644 index cd629bf..000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.databricks.spark.csv - -import java.io.IOException - -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal - -import com.databricks.spark.csv.newapi.CarbonTextFile -import com.databricks.spark.csv.util._ -import com.databricks.spark.sql.readers._ -import org.apache.commons.csv._ -import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.fs.Path -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan} -import org.apache.spark.sql.types._ -import org.slf4j.LoggerFactory - -import org.apache.carbondata.processing.etl.DataLoadingException - -case class CarbonCsvRelation protected[spark] ( -location: String, -useHeader: Boolean, -delimiter: Char, -quote: Char, -escape: Character, -comment: Character, -parseMode: String, -parserLib: String, -ignoreLeadingWhiteSpace: Boolean, -ignoreTrailingWhiteSpace: Boolean, -userSchema: StructType = null, -charset: String = TextFile.DEFAULT_CHARSET.name(), -inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext) - extends BaseRelation with TableScan with InsertableRelation { - - /** - * Limit the number of lines we'll search for a header row that isn't comment-prefixed. - */ - private val MAX_COMMENT_LINES_IN_HEADER = 10 - - private val logger = LoggerFactory.getLogger(CarbonCsvRelation.getClass) - - // Parse mode flags - if (!ParseModes.isValidMode(parseMode)) { -logger.warn(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") - } - - if((ignoreLeadingWhiteSpace || ignoreLeadingWhiteSpace) && ParserLibs.isCommonsLib(parserLib)) { -logger.warn(s"Ignore white space options may not work with Commons parserLib option") - } - - private val failFast = ParseModes.isFailFastMode(parseMode) - private val dropMalformed = ParseModes.isDropMalformedMode(parseMode) - private val permissive = ParseModes.isPermissiveMode(parseMode) - - val schema = inferSchema() - - def tokenRdd(header: Array[String]): RDD[Array[String]] = { - -val baseRDD = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset) - -if(ParserLibs.isUnivocityLib(parserLib)) { - univocityParseCSV(baseRDD, header) -} else { - val csvFormat = CSVFormat.DEFAULT -.withDelimiter(delimiter) -.withQuote(quote) -.withEscape(escape) -.withSkipHeaderRecord(false) -.withHeader(header: _*) -.withCommentMarker(comment) - - // If header is set, make sure firstLine is materialized before sending to executors. - val filterLine = if (useHeader) firstLine else null - - baseRDD.mapPartitions { iter => -// When using header, any input line that equals firstLine is assumed to be header -val csvIter = if (useHeader) { - iter.filter(_ != filterLine) -} else { - iter -} -parseCSV(csvIter, csvFormat) - } -} - } - - // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits. - def buildScan: RDD[Row] = { -val schemaFields = schema.fields -tokenRdd(schemaFields.map(_.name)).flatMap{ tokens => - - if (dropMalformed && schemaFields.length != tokens.size) { -logger.warn(s"Dropping malformed line: $tokens") -None - } else if (failFast && schemaFields.length != tokens.size) { -throw new RuntimeException(s"Malformed line in FAILFAST mode: $tokens") - } else { -v
[10/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala -- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala new file mode 100644 index 000..1d8d6b2 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.lang.Long +import java.text.SimpleDateFormat +import java.util +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD} +import org.apache.spark.sql.Row +import org.apache.spark.util.SparkUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.StandardLogService +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} +import org.apache.carbondata.processing.constants.DataProcessorConstants +import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator +import org.apache.carbondata.processing.csvreaderstep.RddInputUtils +import org.apache.carbondata.processing.etl.DataLoadingException +import org.apache.carbondata.processing.graphgenerator.GraphGenerator +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.spark.DataLoadResult +import org.apache.carbondata.spark.load._ +import org.apache.carbondata.spark.splits.TableSplit +import org.apache.carbondata.spark.util.CarbonQueryUtil +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This partition class use to split by TableSplit + * + */ +class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit, +val blocksDetails: Array[BlockDetails]) + extends Partition { + + override val index: Int = idx + val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit) + val partitionBlocksDetail = blocksDetails + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +/** + * This partition class use to split by Host + * + */ +class CarbonNodePartition(rddId: Int, val idx: Int, host: String, +val blocksDetails: Array[BlockDetails]) + extends Partition { + + override val index: Int = idx + val serializableHadoopSplit = host + val nodeBlocksDetail = blocksDetails + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +class SparkPartitionLoader(model: CarbonLoadModel, +splitIndex: Int, +storePath: String, +kettleHomePath: String, +loadCount: Int, +loadMetadataDetails: LoadMetadataDetails) { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + var storeLocation: String = "" + + def initialize(): Unit = { +val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) +if (null == carbonPropertiesFilePath) { + System.setProperty("carbon.properties.filepath", +System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties") +} + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId) +CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true") + CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1") +CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true") +CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true") +CarbonProperties.getInstance().addProperty("aggregate.columnar.
[07/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java -- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java deleted file mode 100644 index f2a1f9f..000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ /dev/null @@ -1,976 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.spark.load; - -import java.io.BufferedWriter; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.charset.Charset; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.cache.Cache; -import org.apache.carbondata.core.cache.CacheProvider; -import org.apache.carbondata.core.cache.CacheType; -import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; -import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; -import org.apache.carbondata.core.carbon.CarbonDataLoadSchema; -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.carbon.ColumnIdentifier; -import org.apache.carbondata.core.carbon.datastore.block.Distributable; -import org.apache.carbondata.core.carbon.metadata.CarbonMetadata; -import org.apache.carbondata.core.carbon.metadata.datatype.DataType; -import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.carbon.path.CarbonStorePath; -import org.apache.carbondata.core.carbon.path.CarbonTablePath; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; -import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType; -import org.apache.carbondata.core.load.LoadMetadataDetails; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.CarbonUtilException; -import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations; -import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl; -import org.apache.carbondata.lcm.fileoperations.FileWriteOperation; -import org.apache.carbondata.lcm.locks.ICarbonLock; -import org.apache.carbondata.lcm.status.SegmentStatusManager; -import org.apache.carbondata.processing.api.dataloader.DataLoadModel; -import org.apache.carbondata.processing.api.dataloader.SchemaInfo; -import org.apache.carbondata.processing.csvload.DataGraphExecuter; -import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus; -import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus; -import org.apache.carbondata.processing.graphgenerator.GraphGenerator; -import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException; -import org.apache.carbondata.processing.model.CarbonLoadModel; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; -import org.apache.carbondata.spark.merger.NodeBlockRelation; -import org.apache.carbondata.spark.merger.NodeMultiBlockRelation; - -import com.google.gson.Gson; -import org.apache.spark.SparkConf; -import org.apache.spark.util.Utils; - - -public final
[08/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala -- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala new file mode 100644 index 000..9360ad8 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import java.util +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable.Map + +import org.apache.spark.sql.SQLContext + +import org.apache.carbondata.common.factory.CarbonCommonFactory +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.carbon.metadata.datatype.DataType +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding +import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry} +import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.load.LoadMetadataDetails +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.spark.CarbonSparkFactory +import org.apache.carbondata.spark.merger.CompactionType +import org.apache.carbondata.spark.util.DataTypeConverterUtil + +case class TableModel( +ifNotExistsSet: Boolean, +var databaseName: String, +databaseNameOp: Option[String], +tableName: String, +tableProperties: Map[String, String], +dimCols: Seq[Field], +msrCols: Seq[Field], +highcardinalitydims: Option[Seq[String]], +noInvertedIdxCols: Option[Seq[String]], +columnGroups: Seq[String], +colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None) + +case class Field(column: String, var dataType: Option[String], name: Option[String], +children: Option[List[Field]], parent: String = null, +storeType: Option[String] = Some("columnar"), +var precision: Int = 0, var scale: Int = 0) + +case class ColumnProperty(key: String, value: String) + +case class ComplexField(complexType: String, primitiveField: Option[Field], +complexField: Option[ComplexField]) + +case class Partitioner(partitionClass: String, partitionColumn: Array[String], partitionCount: Int, +nodeList: Array[String]) + +case class PartitionerField(partitionColumn: String, dataType: Option[String], +columnComment: String) + +case class DataLoadTableFileMapping(table: String, loadPath: String) + +case class CarbonMergerMapping(storeLocation: String, +storePath: String, +metadataFilePath: String, +mergedLoadName: String, +kettleHomePath: String, +tableCreationTime: Long, +databaseName: String, +factTableName: String, +validSegments: Array[String], +tableId: String, +// maxSegmentColCardinality is Cardinality of last segment of compaction +var maxSegmentColCardinality: Array[Int], +// maxSegmentColumnSchemaList is list of column schema of last segment of compaction +var maxSegmentColumnSchemaList: List[ColumnSchema]) + +case class NodeInfo(TaskId: String, noOfBlocks: Int) + +case class AlterTableModel(dbName: Option[String], tableName: String, +compactionType: String, alterSql: String) + +case class CompactionModel(compactionSize: Long, +compactionType: CompactionType, +carbonTable: CarbonTable, +tableCreationTime: Long, +isDDLTrigger: Boolean) + +case class CompactionCallableModel(storePath: String, +carbonLoadModel: CarbonLoadModel, +storeLocation: String, +carbonTable: CarbonTable, +kettleHomePath: String, +cubeCreationTime: Long, +loadsToMerge: util.List[LoadMetadataDetails], +
[02/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala -- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index b7673db..9353a92 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -19,426 +19,37 @@ package org.apache.spark.sql.execution.command import java.io.File import java.text.SimpleDateFormat -import java.util -import java.util.UUID import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, Map} import scala.language.implicitConversions -import scala.util.Random -import org.apache.spark.SparkEnv import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal} import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan} -import org.apache.spark.sql.hive.{CarbonHiveMetadataUtil, HiveContext} +import org.apache.spark.sql.hive.CarbonMetastore import org.apache.spark.sql.types.TimestampType import org.apache.spark.util.FileUtils import org.codehaus.jackson.map.ObjectMapper -import org.apache.carbondata.common.factory.CarbonCommonFactory import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier} import org.apache.carbondata.core.carbon.metadata.CarbonMetadata -import org.apache.carbondata.core.carbon.metadata.datatype.DataType import org.apache.carbondata.core.carbon.metadata.encoder.Encoding -import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry} -import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} -import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, -ColumnSchema} +import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.carbon.path.CarbonStorePath import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastorage.store.impl.FileFactory -import org.apache.carbondata.core.load.LoadMetadataDetails import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.integration.spark.merger.CompactionType import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.lcm.status.SegmentStatusManager import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.model.CarbonLoadModel -import org.apache.carbondata.spark.CarbonSparkFactory import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.load._ -import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil, -GlobalDictionaryUtil} - -case class tableModel( -ifNotExistsSet: Boolean, -var databaseName: String, -databaseNameOp: Option[String], -tableName: String, -tableProperties: Map[String, String], -dimCols: Seq[Field], -msrCols: Seq[Field], -highcardinalitydims: Option[Seq[String]], -noInvertedIdxCols: Option[Seq[String]], -partitioner: Option[Partitioner], -columnGroups: Seq[String], -colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None) - -case class Field(column: String, var dataType: Option[String], name: Option[String], -children: Option[List[Field]], parent: String = null, -storeType: Option[String] = Some("columnar"), -var precision: Int = 0, var scale: Int = 0) - -case class ColumnProperty(key: String, value: String) - -case class ComplexField(complexType: String, primitiveField: Option[Field], -complexField: Option[ComplexField]) - -case class Partitioner(partitionClass: String, partitionColumn: Array[String], partitionCount: Int, -nodeList: Array[String]) - -case class PartitionerField(partitionColumn: String, dataType: Option[String], -columnComment: String) - -case class DataLoadTableFileMapping(table: String, loadPath: String) - -case class CarbonMergerMapping(storeLocation: String, -storePath: String, -metadataFilePath: String, -mergedLoadName: String, -kettleHomePath: String, -tableCreationTime: Long, -databaseName: String, -
[14/14] incubator-carbondata git commit: [CARBONDATA-463] Extract code to spark-common. This closes #365
[CARBONDATA-463] Extract code to spark-common. This closes #365 Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d94b99f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d94b99f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d94b99f3 Branch: refs/heads/master Commit: d94b99f366465b2426bbcd8277f80651f2408770 Parents: 567fa51 66ccd30 Author: ravipesala Authored: Wed Nov 30 13:21:06 2016 +0530 Committer: ravipesala Committed: Wed Nov 30 13:21:06 2016 +0530 -- .../examples/GenerateDictionaryExample.scala| 2 +- integration/spark-common/pom.xml| 2 +- .../MalformedCarbonCommandException.java| 83 ++ .../carbondata/spark/load/CarbonLoaderUtil.java | 973 ++ .../spark/load/DeleteLoadFolders.java | 259 + .../spark/load/DeletedLoadMetadata.java | 53 + .../spark/merger/CarbonCompactionExecutor.java | 233 + .../spark/merger/CarbonCompactionUtil.java | 283 ++ .../spark/merger/CarbonDataMergerUtil.java | 695 + .../spark/merger/CompactionCallable.java| 44 + .../carbondata/spark/merger/CompactionType.java | 28 + .../spark/merger/NodeBlockRelation.java | 60 ++ .../spark/merger/NodeMultiBlockRelation.java| 59 ++ .../spark/merger/RowResultMerger.java | 336 +++ .../carbondata/spark/merger/TableMeta.java | 42 + .../spark/merger/TupleConversionAdapter.java| 85 ++ .../spark/partition/api/DataPartitioner.java| 54 + .../spark/partition/api/Partition.java | 42 + .../api/impl/DataPartitionerProperties.java | 87 ++ .../partition/api/impl/DefaultLoadBalancer.java | 69 ++ .../spark/partition/api/impl/PartitionImpl.java | 54 + .../api/impl/PartitionMultiFileImpl.java| 51 + .../api/impl/QueryPartitionHelper.java | 77 ++ .../api/impl/SampleDataPartitionerImpl.java | 151 +++ .../readsupport/SparkRowReadSupportImpl.java| 69 ++ .../carbondata/spark/splits/TableSplit.java | 129 +++ .../carbondata/spark/util/CarbonQueryUtil.java | 142 +++ .../carbondata/spark/util/LoadMetadataUtil.java | 61 ++ .../spark/CarbonAliasDecoderRelation.scala | 43 + .../spark/CarbonColumnValidator.scala | 36 + .../apache/carbondata/spark/CarbonFilters.scala | 391 .../apache/carbondata/spark/CarbonOption.scala | 48 + .../carbondata/spark/CarbonSparkFactory.scala | 59 ++ .../spark/DictionaryDetailHelper.scala | 63 ++ .../org/apache/carbondata/spark/KeyVal.scala| 89 ++ .../carbondata/spark/csv/CarbonCsvReader.scala | 182 .../spark/csv/CarbonCsvRelation.scala | 249 + .../carbondata/spark/csv/CarbonTextFile.scala | 91 ++ .../carbondata/spark/csv/DefaultSource.scala| 183 .../spark/rdd/CarbonCleanFilesRDD.scala | 82 ++ .../spark/rdd/CarbonDataLoadRDD.scala | 598 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 91 ++ .../spark/rdd/CarbonDeleteLoadRDD.scala | 84 ++ .../spark/rdd/CarbonDropTableRDD.scala | 71 ++ .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 557 +++ .../carbondata/spark/rdd/CarbonMergerRDD.scala | 342 +++ .../spark/rdd/CarbonSparkPartition.scala| 35 + .../apache/carbondata/spark/rdd/Compactor.scala | 130 +++ .../spark/rdd/DataLoadCoalescedRDD.scala| 68 ++ .../spark/rdd/DataLoadPartitionCoalescer.scala | 363 +++ .../spark/tasks/DictionaryWriterTask.scala | 106 ++ .../spark/tasks/SortIndexWriterTask.scala | 59 ++ .../carbondata/spark/util/CarbonScalaUtil.scala | 195 .../carbondata/spark/util/CommonUtil.scala | 259 + .../spark/util/DataTypeConverterUtil.scala | 74 ++ .../spark/util/GlobalDictionaryUtil.scala | 843 .../CarbonTableIdentifierImplicit.scala | 42 + .../execution/command/carbonTableSchema.scala | 359 +++ .../spark/sql/hive/DistributionUtil.scala | 167 .../CarbonDecoderOptimizerHelper.scala | 149 +++ .../scala/org/apache/spark/util/FileUtils.scala | 94 ++ .../apache/spark/util/ScalaCompilerUtil.scala | 35 + .../scala/org/apache/spark/util/SparkUtil.scala | 73 ++ .../spark/merger/CarbonCompactionExecutor.java | 233 - .../spark/merger/CarbonCompactionUtil.java | 284 -- .../spark/merger/CompactionCallable.java| 44 - .../spark/merger/CompactionType.java| 28 - .../spark/merger/RowResultMerger.java | 336 --- .../spark/merger/TupleConversionAdapter.java| 85 -- .../MalformedCarbonCommandException.java| 83 -- .../carbondata/spark/load/CarbonLoaderUtil.java | 976 --- .../spark/load/DeleteLoadFolders.java | 259 - .../spa
[09/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala -- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala new file mode 100644 index 000..af349a8 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet} +import scala.collection.mutable + +import org.apache.spark.Partition +import org.apache.spark.scheduler.TaskLocation + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * DataLoadPartitionCoalescer + * Repartition the partitions of rdd to few partitions, one partition per node. + * exmaple: + * blk_hst host1 host2 host3 host4 host5 + * block1 host1 host2 host3 + * block2 host2 host4 host5 + * block3 host3 host4 host5 + * block4 host1 host2 host4 + * block5 host1 host3 host4 + * block6 host1 host2 host5 + * --- + * 1. sort host by number of blocks + * --- + * host3: block1 block3 block5 + * host5: block2 block3 block6 + * host1: block1 block4 block5 block6 + * host2: block1 block2 block4 block6 + * host4: block2 block3 block4 block5 + * --- + * 2. sort blocks of each host1 + * new partitions are before old partitions + * --- + * host3: block1 block3 block5 + * host5:block2 block6+block3 + * host1: block4+block1 block5 block6 + * host2: block1 block2 block4 block6 + * host4: block2 block3 block4 block5 + * --- + * 3. assign blocks to host + * --- + * step1: host3 choose block1, remove from host1, host2 + * step2: host5 choose block2, remove from host2, host4 + * step3: host1 choose block4, . + * --- + * result: + * host3: block1 block5 + * host5:block2 + * host1: block4 + * host2: block6 + * host4:block3 + */ +class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val prevPartitions = prev.partitions + var numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length)) + // host => partition id list + val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]] + // partition id => host list + val partitionIdMapHosts = new HashMap[Int, ArrayBuffer[String]] + val noLocalityPartitions = new ArrayBuffer[Int] + var noLocality = true + /** + * assign a task location for a partition + */ + private def getLocation(index: Int): Option[String] = { +if (index < nodeList.length) { + Some(nodeList(index)) +} else { + None +} + } + + /** + * collect partitions to each node + */ + private def groupByNode(): Unit = { +// initialize hostMapPartitionIds +nodeList.foreach { node => + val map = new LinkedHashSet[Int] + hostMapPartitionIds.put(node, map) +} +// collect partitions for each node +val tmpNoLocalityPartitions = new ArrayBuffer[Int] +prevPartitions.foreach { p => + val locs = DataLoadPartitionCoalescer.getPreferredLocs(prev, p) + if (locs.isEmpty) { +// if a partition has no location, add to noLocalityPartitions +tmpNoLocalityPartitions += p.index + } else { +// add partion to hostMapPartitionIds and partitionIdMapHosts +locs.foreach { loc => + val host = loc
[11/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java -- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java new file mode 100644 index 000..d2e716f --- /dev/null +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.spark.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.load.LoadMetadataDetails; +import org.apache.carbondata.scan.model.CarbonQueryPlan; +import org.apache.carbondata.spark.partition.api.Partition; +import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer; +import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl; +import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper; +import org.apache.carbondata.spark.splits.TableSplit; + +import org.apache.commons.lang3.StringUtils; + +/** + * This utilty parses the Carbon query plan to actual query model object. + */ +public final class CarbonQueryUtil { + + private CarbonQueryUtil() { + + } + + /** + * It creates the one split for each region server. + */ + public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName, + CarbonQueryPlan queryPlan) throws IOException { + +//Just create splits depends on locations of region servers +List allPartitions = null; +if (queryPlan == null) { + allPartitions = + QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName); +} else { + allPartitions = + QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan); +} +TableSplit[] splits = new TableSplit[allPartitions.size()]; +for (int i = 0; i < splits.length; i++) { + splits[i] = new TableSplit(); + List locations = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); + Partition partition = allPartitions.get(i); + String location = QueryPartitionHelper.getInstance() + .getLocation(partition, databaseName, tableName); + locations.add(location); + splits[i].setPartition(partition); + splits[i].setLocations(locations); +} + +return splits; + } + + /** + * It creates the one split for each region server. + */ + public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception { + +//Just create splits depends on locations of region servers +DefaultLoadBalancer loadBalancer = null; +List allPartitions = getAllFilesForDataLoad(sourcePath); +loadBalancer = new DefaultLoadBalancer(new ArrayList(), allPartitions); +TableSplit[] tblSplits = new TableSplit[allPartitions.size()]; +for (int i = 0; i < tblSplits.length; i++) { + tblSplits[i] = new TableSplit(); + List locations = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); + Partition partition = allPartitions.get(i); + String location = loadBalancer.getNodeForPartitions(partition); + locations.add(location); + tblSplits[i].setPartition(partition); + tblSplits[i].setLocations(locations); +} +return tblSplits; + } + + /** + * split sourcePath by comma + */ + public static void splitFilePath(String sourcePath, List partitionsFiles, + String separator) { +if (StringUtils.isNotEmpty(sourcePath)) { + String[] files = sourcePath.split(separator); + for (String file : files) { +partitionsFiles.add(file); + } +} + } + + private static List getAllFilesForDataLoad(String sourcePath) throws Exception { +List files = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); +splitFilePath(sourcePath, files,
[13/14] incubator-carbondata git commit: rebase
rebase rebase rename package rebase change package name fix style fix spark2 Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/66ccd308 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/66ccd308 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/66ccd308 Branch: refs/heads/master Commit: 66ccd308536d5e64f05fe57aa3a7a9003b4adf5a Parents: 567fa51 Author: jackylk Authored: Tue Nov 29 17:42:06 2016 +0800 Committer: ravipesala Committed: Wed Nov 30 13:19:54 2016 +0530 -- .../examples/GenerateDictionaryExample.scala| 2 +- integration/spark-common/pom.xml| 2 +- .../MalformedCarbonCommandException.java| 83 ++ .../carbondata/spark/load/CarbonLoaderUtil.java | 973 ++ .../spark/load/DeleteLoadFolders.java | 259 + .../spark/load/DeletedLoadMetadata.java | 53 + .../spark/merger/CarbonCompactionExecutor.java | 233 + .../spark/merger/CarbonCompactionUtil.java | 283 ++ .../spark/merger/CarbonDataMergerUtil.java | 695 + .../spark/merger/CompactionCallable.java| 44 + .../carbondata/spark/merger/CompactionType.java | 28 + .../spark/merger/NodeBlockRelation.java | 60 ++ .../spark/merger/NodeMultiBlockRelation.java| 59 ++ .../spark/merger/RowResultMerger.java | 336 +++ .../carbondata/spark/merger/TableMeta.java | 42 + .../spark/merger/TupleConversionAdapter.java| 85 ++ .../spark/partition/api/DataPartitioner.java| 54 + .../spark/partition/api/Partition.java | 42 + .../api/impl/DataPartitionerProperties.java | 87 ++ .../partition/api/impl/DefaultLoadBalancer.java | 69 ++ .../spark/partition/api/impl/PartitionImpl.java | 54 + .../api/impl/PartitionMultiFileImpl.java| 51 + .../api/impl/QueryPartitionHelper.java | 77 ++ .../api/impl/SampleDataPartitionerImpl.java | 151 +++ .../readsupport/SparkRowReadSupportImpl.java| 69 ++ .../carbondata/spark/splits/TableSplit.java | 129 +++ .../carbondata/spark/util/CarbonQueryUtil.java | 142 +++ .../carbondata/spark/util/LoadMetadataUtil.java | 61 ++ .../spark/CarbonAliasDecoderRelation.scala | 43 + .../spark/CarbonColumnValidator.scala | 36 + .../apache/carbondata/spark/CarbonFilters.scala | 391 .../apache/carbondata/spark/CarbonOption.scala | 48 + .../carbondata/spark/CarbonSparkFactory.scala | 59 ++ .../spark/DictionaryDetailHelper.scala | 63 ++ .../org/apache/carbondata/spark/KeyVal.scala| 89 ++ .../carbondata/spark/csv/CarbonCsvReader.scala | 182 .../spark/csv/CarbonCsvRelation.scala | 249 + .../carbondata/spark/csv/CarbonTextFile.scala | 91 ++ .../carbondata/spark/csv/DefaultSource.scala| 183 .../spark/rdd/CarbonCleanFilesRDD.scala | 82 ++ .../spark/rdd/CarbonDataLoadRDD.scala | 598 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 91 ++ .../spark/rdd/CarbonDeleteLoadRDD.scala | 84 ++ .../spark/rdd/CarbonDropTableRDD.scala | 71 ++ .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 557 +++ .../carbondata/spark/rdd/CarbonMergerRDD.scala | 342 +++ .../spark/rdd/CarbonSparkPartition.scala| 35 + .../apache/carbondata/spark/rdd/Compactor.scala | 130 +++ .../spark/rdd/DataLoadCoalescedRDD.scala| 68 ++ .../spark/rdd/DataLoadPartitionCoalescer.scala | 363 +++ .../spark/tasks/DictionaryWriterTask.scala | 106 ++ .../spark/tasks/SortIndexWriterTask.scala | 59 ++ .../carbondata/spark/util/CarbonScalaUtil.scala | 195 .../carbondata/spark/util/CommonUtil.scala | 259 + .../spark/util/DataTypeConverterUtil.scala | 74 ++ .../spark/util/GlobalDictionaryUtil.scala | 843 .../CarbonTableIdentifierImplicit.scala | 42 + .../execution/command/carbonTableSchema.scala | 359 +++ .../spark/sql/hive/DistributionUtil.scala | 167 .../CarbonDecoderOptimizerHelper.scala | 149 +++ .../scala/org/apache/spark/util/FileUtils.scala | 94 ++ .../apache/spark/util/ScalaCompilerUtil.scala | 35 + .../scala/org/apache/spark/util/SparkUtil.scala | 73 ++ .../spark/merger/CarbonCompactionExecutor.java | 233 - .../spark/merger/CarbonCompactionUtil.java | 284 -- .../spark/merger/CompactionCallable.java| 44 - .../spark/merger/CompactionType.java| 28 - .../spark/merger/RowResultMerger.java | 336 --- .../spark/merger/TupleConversionAdapter.java| 85 -- .../MalformedCarbonCommandException.java| 83 -- .../carbondata/spark/load/CarbonLoaderUtil.java | 976 --- .../spark/load/DeleteLoadFolders.java | 259 -
[03/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala -- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala deleted file mode 100644 index 9a4e209..000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ /dev/null @@ -1,875 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.util - -import java.io.{FileNotFoundException, IOException} -import java.nio.charset.Charset -import java.util.regex.Pattern - -import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.language.implicitConversions -import scala.util.control.Breaks.{break, breakable} - -import org.apache.commons.lang3.{ArrayUtils, StringUtils} -import org.apache.spark.Accumulator -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.apache.spark.sql.hive.CarbonMetastoreCatalog -import org.apache.spark.util.FileUtils - -import org.apache.carbondata.common.factory.CarbonCommonFactory -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.cache.dictionary.Dictionary -import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier} -import org.apache.carbondata.core.carbon.metadata.datatype.DataType -import org.apache.carbondata.core.carbon.metadata.encoder.Encoding -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.carbon.path.CarbonStorePath -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastorage.store.impl.FileFactory -import org.apache.carbondata.core.reader.CarbonDictionaryReader -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.writer.CarbonDictionaryWriter -import org.apache.carbondata.processing.etl.DataLoadingException -import org.apache.carbondata.processing.model.CarbonLoadModel -import org.apache.carbondata.spark.CarbonSparkFactory -import org.apache.carbondata.spark.load.CarbonLoaderUtil -import org.apache.carbondata.spark.rdd._ - -/** - * A object which provide a method to generate global dictionary from CSV files. - */ -object GlobalDictionaryUtil { - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - /** - * The default separator to use if none is supplied to the constructor. - */ - val DEFAULT_SEPARATOR: Char = ',' - /** - * The default quote character to use if none is supplied to the - * constructor. - */ - val DEFAULT_QUOTE_CHARACTER: Char = '"' - - /** - * find columns which need to generate global dictionary. - * - * @param dimensions dimension list of schema - * @param headerscolumn headers - * @param columnscolumn list of csv file - */ - def pruneDimensions(dimensions: Array[CarbonDimension], - headers: Array[String], - columns: Array[String]): (Array[CarbonDimension], Array[String]) = { -val dimensionBuffer = new ArrayBuffer[CarbonDimension] -val columnNameBuffer = new ArrayBuffer[String] -val dimensionsWithDict = dimensions.filter(hasEncoding(_, Encoding.DICTIONARY, - Encoding.DIRECT_DICTIONARY)) -dimensionsWithDict.foreach { dim => - breakable { -headers.zipWithIndex.foreach { h => - if (dim.getColName.equalsIgnoreCase(h._1)) { -dimensionBuffer += dim -columnNameBuffer += columns(h._2) -break - } -} - } -} -(dimensionBuffer.toArray, columnNameBuffer.toArray) - } - - /** - * use this method to judge whether CarbonDimension use some encoding or not - * - * @param dimension carbonDimension - * @param encodingthe coding way of dimension - * @param excludeEncoding the coding way to exclude - */ - def hasE
[2/2] incubator-carbondata git commit: [CARBONDATA-368]Imporve performance of dataframe loading This closes #278
[CARBONDATA-368]Imporve performance of dataframe loading This closes #278 Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/567fa513 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/567fa513 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/567fa513 Branch: refs/heads/master Commit: 567fa5131628b70c8c4829368fda6d48cb013af3 Parents: 879bfe7 f8a0c87 Author: jackylk Authored: Tue Nov 29 17:15:20 2016 +0800 Committer: jackylk Committed: Tue Nov 29 17:15:20 2016 +0800 -- .../spark/rdd/CarbonDataLoadRDD.scala | 96 ++--- .../spark/rdd/CarbonDataRDDFactory.scala| 88 +++-- .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 11 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 51 +++ .../spark/util/GlobalDictionaryUtil.scala | 11 +- .../apache/spark/rdd/DataLoadCoalescedRDD.scala | 68 .../spark/rdd/DataLoadPartitionCoalescer.scala | 363 +++ .../spark/sql/hive/DistributionUtil.scala | 19 +- .../org/apache/spark/util/TaskContextUtil.scala | 29 ++ .../TestDataLoadPartitionCoalescer.scala| 170 + .../spark/util/AllDictionaryTestCase.scala | 9 +- .../util/ExternalColumnDictionaryTestCase.scala | 14 +- ...GlobalDictionaryUtilConcurrentTestCase.scala | 23 +- .../util/GlobalDictionaryUtilTestCase.scala | 10 +- .../processing/csvreaderstep/CsvInput.java | 73 +++- .../csvreaderstep/JavaRddIterator.java | 32 ++ .../processing/csvreaderstep/RddInputUtils.java | 11 +- 17 files changed, 921 insertions(+), 157 deletions(-) --
[1/2] incubator-carbondata git commit: DataLoadCoalescedRDD
Repository: incubator-carbondata Updated Branches: refs/heads/master 879bfe742 -> 567fa5131 DataLoadCoalescedRDD DataLoadPartitionCoalescer concurrently read dataframe add test case fix comments fix comments Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f8a0c876 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f8a0c876 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f8a0c876 Branch: refs/heads/master Commit: f8a0c876158be256119219bde4cce0e074acf03a Parents: 879bfe7 Author: QiangCai Authored: Mon Oct 24 10:54:20 2016 +0800 Committer: jackylk Committed: Tue Nov 29 17:06:23 2016 +0800 -- .../spark/rdd/CarbonDataLoadRDD.scala | 96 ++--- .../spark/rdd/CarbonDataRDDFactory.scala| 88 +++-- .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 11 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 51 +++ .../spark/util/GlobalDictionaryUtil.scala | 11 +- .../apache/spark/rdd/DataLoadCoalescedRDD.scala | 68 .../spark/rdd/DataLoadPartitionCoalescer.scala | 363 +++ .../spark/sql/hive/DistributionUtil.scala | 19 +- .../org/apache/spark/util/TaskContextUtil.scala | 29 ++ .../TestDataLoadPartitionCoalescer.scala| 170 + .../spark/util/AllDictionaryTestCase.scala | 9 +- .../util/ExternalColumnDictionaryTestCase.scala | 14 +- ...GlobalDictionaryUtilConcurrentTestCase.scala | 23 +- .../util/GlobalDictionaryUtilTestCase.scala | 10 +- .../processing/csvreaderstep/CsvInput.java | 73 +++- .../csvreaderstep/JavaRddIterator.java | 32 ++ .../processing/csvreaderstep/RddInputUtils.java | 11 +- 17 files changed, 921 insertions(+), 157 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala -- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala index 87b5673..e306a89 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala @@ -28,9 +28,12 @@ import scala.util.Random import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.DataLoadCoalescedRDD +import org.apache.spark.rdd.DataLoadPartitionWrap import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.Partitioner import org.apache.spark.sql.Row +import org.apache.spark.util.TaskContextUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.common.logging.impl.StandardLogService @@ -38,6 +41,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} import org.apache.carbondata.processing.constants.DataProcessorConstants +import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator import org.apache.carbondata.processing.csvreaderstep.RddInputUtils import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.graphgenerator.GraphGenerator @@ -46,6 +50,7 @@ import org.apache.carbondata.spark.DataLoadResult import org.apache.carbondata.spark.load._ import org.apache.carbondata.spark.splits.TableSplit import org.apache.carbondata.spark.util.CarbonQueryUtil +import org.apache.carbondata.spark.util.CarbonScalaUtil /** * This partition class use to split by TableSplit @@ -125,6 +130,7 @@ class SparkPartitionLoader(model: CarbonLoadModel, try { CarbonLoaderUtil.executeGraph(model, storeLocation, storePath, kettleHomePath) + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) } catch { case e: DataLoadingException => if (e.getErrorCode == DataProcessorConstants.BAD_REC_FOUND) { @@ -235,14 +241,11 @@ class DataFileLoaderRDD[K, V]( theSplit.index try { loadMetadataDetails.setPartitionCount(partitionID) - loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) - carbonLoadModel.setSegmentId(String.valueOf(loadCount)) setModelAndBlocksInfo() val loader = new SparkPartitionLoade