[12/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
--
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
new file mode 100644
index 000..836a757
--- /dev/null
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.merger;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import 
org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.lcm.locks.ICarbonLock;
+import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.spark.load.CarbonLoaderUtil;
+
+/**
+ * utility class for load merging.
+ */
+public final class CarbonDataMergerUtil {
+  private static final LogService LOGGER =
+  LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName());
+
+  private CarbonDataMergerUtil() {
+
+  }
+
+  /**
+   * Returns the size of all the carbondata files present in the segment.
+   * @param carbonFile
+   * @return
+   */
+  private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) {
+long factSize = 0;
+
+// carbon data file case.
+CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
+
+  @Override public boolean accept(CarbonFile file) {
+return CarbonTablePath.isCarbonDataFile(file.getName());
+  }
+});
+
+for (CarbonFile fact : factFile) {
+  factSize += fact.getSize();
+}
+
+return factSize;
+  }
+
+  /**
+   * To check whether the merge property is enabled or not.
+   *
+   * @return
+   */
+
+  public static boolean checkIfAutoLoadMergingRequired() {
+// load merge is not supported as per new store format
+// moving the load merge check in early to avoid unnecessary load listing 
causing IOException
+// check whether carbons segment merging operation is enabled or not.
+// default will be false.
+String isLoadMergeEnabled = CarbonProperties.getInstance()
+.getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE);
+if (isLoadMergeEnabled.equalsIgnoreCase("false")) {
+  return false;
+}
+return true;
+  }
+
+  /**
+   * Form the Name of the New Merge Folder
+   *
+   * @param segmentsToBeMergedList
+   * @return
+   */
+  public static String getMergedLoadName(List 
segmentsToBeMergedList) {
+String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName();
+if (firstSegmentName.contains(".")) {
+  String beforeDecimal = firstSegmentName.substring(0, 
firstSegmentName.indexOf("."));
+  String afterDecimal = 
firstSegme

[04/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
--
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
deleted file mode 100644
index c91cec0..000
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.io.{DataInputStream, InputStreamReader}
-import java.nio.charset.Charset
-import java.text.SimpleDateFormat
-import java.util.regex.Pattern
-
-import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.control.Breaks.{break, breakable}
-
-import au.com.bytecode.opencsv.CSVReader
-import org.apache.commons.lang3.{ArrayUtils, StringUtils}
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, 
ColumnIdentifier}
-import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, 
SortIndexWriterTask}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.carbondata.spark.util.GlobalDictionaryUtil
-import org.apache.carbondata.spark.util.GlobalDictionaryUtil._
-
-/**
- * A partitioner partition by column.
- *
- * @constructor create a partitioner
- * @param numParts the number of partitions
- */
-class ColumnPartitioner(numParts: Int) extends Partitioner {
-  override def numPartitions: Int = numParts
-
-  override def getPartition(key: Any): Int = key.asInstanceOf[Int]
-}
-
-trait GenericParser {
-  val dimension: CarbonDimension
-
-  def addChild(child: GenericParser): Unit
-
-  def parseString(input: String): Unit
-}
-
-case class DictionaryStats(distinctValues: java.util.List[String],
-dictWriteTime: Long, sortIndexWriteTime: Long)
-
-case class PrimitiveParser(dimension: CarbonDimension,
-setOpt: Option[HashSet[String]]) extends GenericParser {
-  val (hasDictEncoding, set: HashSet[String]) = setOpt match {
-case None => (false, new HashSet[String])
-case Some(x) => (true, x)
-  }
-
-  def addChild(child: GenericParser): Unit = {
-  }
-
-  def parseString(input: String): Unit = {
-if (hasDictEncoding && input != null) {
-  set.add(input)
-}
-  }
-}
-
-case class ArrayParser(dimension: CarbonDimension, format: DataFormat) extends 
GenericParser {
-  var children: GenericParser = _
-
-  def addChild(child: GenericParser): Unit = {
-children = child
-  }
-
-  def parseString(input: String): Unit = {
-if (StringUtils.isNotEmpty(input)) {
-  val splits = format.getSplits(input)
-  if (ArrayUtils.isNotEmpty(splits)) {
-splits.foreach { s =>
-  children.parseString(s)
-}
-  }
-}
-  }
-}
-
-case class StructParser(dimension: CarbonDimension,
-format: DataFormat) extends GenericParser {
-  val children = new ArrayBuffer[GenericParser]
-
-  def addChild(child: GenericParser): Unit = {
-children += child
-  }
-
-  def parseString(input: String): Unit = {
-if (StringUtils.isNotEmpty(input)) {
-  val splits = format.getSplits(input)
-  val len = Math.min(children.length, splits.length)
-  for (i <- 0 until len) {
-chi

[06/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
--
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
deleted file mode 100644
index 61639d3..000
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface Partition extends Serializable {
-  /**
-   * unique identification for the partition in the cluster.
-   */
-  String getUniqueID();
-
-  /**
-   * File path for the raw data represented by this partition
-   */
-  String getFilePath();
-
-  /**
-   * result
-   *
-   * @return
-   */
-  List getFilesPath();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
--
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
deleted file mode 100644
index bc6e54f..000
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-public final class DataPartitionerProperties {
-  private static final LogService LOGGER =
-  
LogServiceFactory.getLogService(DataPartitionerProperties.class.getName());
-
-  private static DataPartitionerProperties instance;
-
-  private Properties properties;
-
-  private DataPartitionerProperties() {
-properties = loadProperties();
-  }
-
-  public static DataPartitionerProperties getInstance() {
-if (instance == null) {
-  instance = new DataPartitionerProperties();
-}
-return instance;
-  }
-
-  public String getValue(String key, String defaultVal) {
-return properties.getProperty(key, defaultVal);
-  }
-
-  public String getValue(String key) {
-return properties.getProperty(key);
-  }
-
-  /**
-   * Read the properties from CSVFilePartitioner.properties
-   */
-  private Properties loadProperties() {
-Properties props = new Properties();
-
-File file = new File("DataPartitioner.properties");
-FileInputStream fis = null;
-try {
-  if (file.exists()) {
-fis = new FileInputStream(file);
-
-props.load(fis);
-  }
-} catch (Exception e) {
-  LOGGER
-  .error(e, e.getMessage());
-} finally {
-  if (null != fis) {
-try {
-  fis.close();
-} 

[01/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 567fa5131 -> d94b99f36


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
--
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
deleted file mode 100644
index e5264ca..000
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{CarbonContext, CarbonEnv}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.datastore.block.Distributable
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-
-object DistributionUtil {
-  @transient
-  val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
-
-  /*
-   * This method will return the list of executers in the cluster.
-   * For this we take the  memory status of all node with 
getExecutorMemoryStatus
-   * and extract the keys. getExecutorMemoryStatus also returns the driver 
memory also
-   * In client mode driver will run in the localhost
-   * There can be executor spawn in same drive node. So we can remove first 
occurance of
-   * localhost for retriving executor list
-   */
-  def getNodeList(sparkContext: SparkContext): Array[String] = {
-val arr = sparkContext.getExecutorMemoryStatus.map { kv =>
-  kv._1.split(":")(0)
-}.toSeq
-val localhostIPs = getLocalhostIPs
-val selectedLocalIPList = localhostIPs.filter(arr.contains(_))
-
-val nodelist: List[String] = 
withoutDriverIP(arr.toList)(selectedLocalIPList.contains(_))
-val masterMode = sparkContext.getConf.get("spark.master")
-if (nodelist.nonEmpty) {
-  // Specific for Yarn Mode
-  if ("yarn-cluster".equals(masterMode) || 
"yarn-client".equals(masterMode)) {
-val nodeNames = nodelist.map { x =>
-  val addr = InetAddress.getByName(x)
-  addr.getHostName
-}
-nodeNames.toArray
-  } else {
-// For Standalone cluster, node IPs will be returned.
-nodelist.toArray
-  }
-} else {
-  Seq(InetAddress.getLocalHost.getHostName).toArray
-}
-  }
-
-  private def getLocalhostIPs = {
-val iface = NetworkInterface.getNetworkInterfaces
-var addresses: List[InterfaceAddress] = List.empty
-while (iface.hasMoreElements) {
-  addresses = iface.nextElement().getInterfaceAddresses.asScala.toList ++ 
addresses
-}
-val inets = addresses.map(_.getAddress.getHostAddress)
-inets
-  }
-
-  /*
-   * This method will remove the first occurance of any of the ips  mentioned 
in the predicate.
-   * Eg: l = List(Master,slave1,Master,slave2,slave3) is the list of nodes 
where first Master is
-   * the Driver  node.
-   * this method withoutFirst (l)(x=> x == 'Master') will remove the first 
occurance of Master.
-   * The resulting List containt List(slave1,Master,slave2,slave3)
-   */
-  def withoutDriverIP[A](xs: List[A])(p: A => Boolean): List[A] = {
-xs match {
-  case x :: rest => if (p(x)) {
-rest
-  } else {
-x :: withoutDriverIP(rest)(p)
-  }
-  case _ => Nil
-}
-  }
-
-  /**
-   *
-   * Checking if the existing executors is greater than configured executors, 
if yes
-   * returning configured executors.
-   *
-   * @param blockList
-   * @param sparkContext
-   * @return
-   */
-  def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
-  sparkContext: SparkContext): Seq[String] = {
-val nodeMapping = 
CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
-ensureExecutorsByNumberAndGetNodeList(nodeMapping.size(), sparkContext)
-  }
-
-  def ensureExecutorsByNumberAndGetNodeList(nodesOfData: In

[05/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
--
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
deleted file mode 100644
index cd629bf..000
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.databricks.spark.csv
-
-import java.io.IOException
-
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import com.databricks.spark.csv.newapi.CarbonTextFile
-import com.databricks.spark.csv.util._
-import com.databricks.spark.sql.readers._
-import org.apache.commons.csv._
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.fs.Path
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, 
TableScan}
-import org.apache.spark.sql.types._
-import org.slf4j.LoggerFactory
-
-import org.apache.carbondata.processing.etl.DataLoadingException
-
-case class CarbonCsvRelation protected[spark] (
-location: String,
-useHeader: Boolean,
-delimiter: Char,
-quote: Char,
-escape: Character,
-comment: Character,
-parseMode: String,
-parserLib: String,
-ignoreLeadingWhiteSpace: Boolean,
-ignoreTrailingWhiteSpace: Boolean,
-userSchema: StructType = null,
-charset: String = TextFile.DEFAULT_CHARSET.name(),
-inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext)
-  extends BaseRelation with TableScan with InsertableRelation {
-
-  /**
-   * Limit the number of lines we'll search for a header row that isn't 
comment-prefixed.
-   */
-  private val MAX_COMMENT_LINES_IN_HEADER = 10
-
-  private val logger = LoggerFactory.getLogger(CarbonCsvRelation.getClass)
-
-  // Parse mode flags
-  if (!ParseModes.isValidMode(parseMode)) {
-logger.warn(s"$parseMode is not a valid parse mode. Using 
${ParseModes.DEFAULT}.")
-  }
-
-  if((ignoreLeadingWhiteSpace || ignoreLeadingWhiteSpace) && 
ParserLibs.isCommonsLib(parserLib)) {
-logger.warn(s"Ignore white space options may not work with Commons 
parserLib option")
-  }
-
-  private val failFast = ParseModes.isFailFastMode(parseMode)
-  private val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
-  private val permissive = ParseModes.isPermissiveMode(parseMode)
-
-  val schema = inferSchema()
-
-  def tokenRdd(header: Array[String]): RDD[Array[String]] = {
-
-val baseRDD = CarbonTextFile.withCharset(sqlContext.sparkContext, 
location, charset)
-
-if(ParserLibs.isUnivocityLib(parserLib)) {
-  univocityParseCSV(baseRDD, header)
-} else {
-  val csvFormat = CSVFormat.DEFAULT
-.withDelimiter(delimiter)
-.withQuote(quote)
-.withEscape(escape)
-.withSkipHeaderRecord(false)
-.withHeader(header: _*)
-.withCommentMarker(comment)
-
-  // If header is set, make sure firstLine is materialized before sending 
to executors.
-  val filterLine = if (useHeader) firstLine else null
-
-  baseRDD.mapPartitions { iter =>
-// When using header, any input line that equals firstLine is assumed 
to be header
-val csvIter = if (useHeader) {
-  iter.filter(_ != filterLine)
-} else {
-  iter
-}
-parseCSV(csvIter, csvFormat)
-  }
-}
-  }
-
-  // By making this a lazy val we keep the RDD around, amortizing the cost of 
locating splits.
-  def buildScan: RDD[Row] = {
-val schemaFields = schema.fields
-tokenRdd(schemaFields.map(_.name)).flatMap{ tokens =>
-
-  if (dropMalformed && schemaFields.length != tokens.size) {
-logger.warn(s"Dropping malformed line: $tokens")
-None
-  } else if (failFast && schemaFields.length != tokens.size) {
-throw new RuntimeException(s"Malformed line in FAILFAST mode: $tokens")
-  } else {
-v

[10/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
--
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
new file mode 100644
index 000..1d8d6b2
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
SparkEnv, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
+import org.apache.spark.sql.Row
+import org.apache.spark.util.SparkUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonTimeStatisticsFactory}
+import org.apache.carbondata.processing.constants.DataProcessorConstants
+import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
+import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
+import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This partition class use to split by TableSplit
+ *
+ */
+class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val 
tableSplit: TableSplit,
+val blocksDetails: Array[BlockDetails])
+  extends Partition {
+
+  override val index: Int = idx
+  val serializableHadoopSplit = new 
SerializableWritable[TableSplit](tableSplit)
+  val partitionBlocksDetail = blocksDetails
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * This partition class use to split by Host
+ *
+ */
+class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
+val blocksDetails: Array[BlockDetails])
+  extends Partition {
+
+  override val index: Int = idx
+  val serializableHadoopSplit = host
+  val nodeBlocksDetail = blocksDetails
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+class SparkPartitionLoader(model: CarbonLoadModel,
+splitIndex: Int,
+storePath: String,
+kettleHomePath: String,
+loadCount: Int,
+loadMetadataDetails: LoadMetadataDetails) {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  var storeLocation: String = ""
+
+  def initialize(): Unit = {
+val carbonPropertiesFilePath = 
System.getProperty("carbon.properties.filepath", null)
+if (null == carbonPropertiesFilePath) {
+  System.setProperty("carbon.properties.filepath",
+System.getProperty("user.dir") + '/' + "conf" + '/' + 
"carbon.properties")
+}
+
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
+CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", 
"true")
+
CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar",
 "1")
+CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", 
"true")
+CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true")
+CarbonProperties.getInstance().addProperty("aggregate.columnar.

[07/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
--
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
deleted file mode 100644
index f2a1f9f..000
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ /dev/null
@@ -1,976 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.load;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.ColumnIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.Distributable;
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import 
org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.CarbonUtilException;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
-import org.apache.carbondata.lcm.locks.ICarbonLock;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.processing.api.dataloader.DataLoadModel;
-import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
-import org.apache.carbondata.processing.csvload.DataGraphExecuter;
-import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus;
-import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
-import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
-import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.spark.merger.NodeBlockRelation;
-import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;
-
-import com.google.gson.Gson;
-import org.apache.spark.SparkConf;
-import org.apache.spark.util.Utils;
-
-
-public final 

[08/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
--
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
new file mode 100644
index 000..9360ad8
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.SQLContext
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, 
SchemaEvolutionEntry}
+import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.LoadMetadataDetails
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.CarbonSparkFactory
+import org.apache.carbondata.spark.merger.CompactionType
+import org.apache.carbondata.spark.util.DataTypeConverterUtil
+
+case class TableModel(
+ifNotExistsSet: Boolean,
+var databaseName: String,
+databaseNameOp: Option[String],
+tableName: String,
+tableProperties: Map[String, String],
+dimCols: Seq[Field],
+msrCols: Seq[Field],
+highcardinalitydims: Option[Seq[String]],
+noInvertedIdxCols: Option[Seq[String]],
+columnGroups: Seq[String],
+colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None)
+
+case class Field(column: String, var dataType: Option[String], name: 
Option[String],
+children: Option[List[Field]], parent: String = null,
+storeType: Option[String] = Some("columnar"),
+var precision: Int = 0, var scale: Int = 0)
+
+case class ColumnProperty(key: String, value: String)
+
+case class ComplexField(complexType: String, primitiveField: Option[Field],
+complexField: Option[ComplexField])
+
+case class Partitioner(partitionClass: String, partitionColumn: Array[String], 
partitionCount: Int,
+nodeList: Array[String])
+
+case class PartitionerField(partitionColumn: String, dataType: Option[String],
+columnComment: String)
+
+case class DataLoadTableFileMapping(table: String, loadPath: String)
+
+case class CarbonMergerMapping(storeLocation: String,
+storePath: String,
+metadataFilePath: String,
+mergedLoadName: String,
+kettleHomePath: String,
+tableCreationTime: Long,
+databaseName: String,
+factTableName: String,
+validSegments: Array[String],
+tableId: String,
+// maxSegmentColCardinality is Cardinality of last segment of compaction
+var maxSegmentColCardinality: Array[Int],
+// maxSegmentColumnSchemaList is list of column schema of last segment of 
compaction
+var maxSegmentColumnSchemaList: List[ColumnSchema])
+
+case class NodeInfo(TaskId: String, noOfBlocks: Int)
+
+case class AlterTableModel(dbName: Option[String], tableName: String,
+compactionType: String, alterSql: String)
+
+case class CompactionModel(compactionSize: Long,
+compactionType: CompactionType,
+carbonTable: CarbonTable,
+tableCreationTime: Long,
+isDDLTrigger: Boolean)
+
+case class CompactionCallableModel(storePath: String,
+carbonLoadModel: CarbonLoadModel,
+storeLocation: String,
+carbonTable: CarbonTable,
+kettleHomePath: String,
+cubeCreationTime: Long,
+loadsToMerge: util.List[LoadMetadataDetails],
+ 

[02/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
--
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index b7673db..9353a92 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -19,426 +19,37 @@ package org.apache.spark.sql.execution.command
 
 import java.io.File
 import java.text.SimpleDateFormat
-import java.util
-import java.util.UUID
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, Map}
 import scala.language.implicitConversions
-import scala.util.Random
 
-import org.apache.spark.SparkEnv
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Cast, Literal}
 import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
-import org.apache.spark.sql.hive.{CarbonHiveMetadataUtil, HiveContext}
+import org.apache.spark.sql.hive.CarbonMetastore
 import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
-import org.apache.carbondata.common.factory.CarbonCommonFactory
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, 
SchemaEvolutionEntry}
-import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
-import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension,
-ColumnSchema}
+import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, 
TableInfo}
+import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.load.LoadMetadataDetails
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.integration.spark.merger.CompactionType
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, 
DataTypeConverterUtil,
-GlobalDictionaryUtil}
-
-case class tableModel(
-ifNotExistsSet: Boolean,
-var databaseName: String,
-databaseNameOp: Option[String],
-tableName: String,
-tableProperties: Map[String, String],
-dimCols: Seq[Field],
-msrCols: Seq[Field],
-highcardinalitydims: Option[Seq[String]],
-noInvertedIdxCols: Option[Seq[String]],
-partitioner: Option[Partitioner],
-columnGroups: Seq[String],
-colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None)
-
-case class Field(column: String, var dataType: Option[String], name: 
Option[String],
-children: Option[List[Field]], parent: String = null,
-storeType: Option[String] = Some("columnar"),
-var precision: Int = 0, var scale: Int = 0)
-
-case class ColumnProperty(key: String, value: String)
-
-case class ComplexField(complexType: String, primitiveField: Option[Field],
-complexField: Option[ComplexField])
-
-case class Partitioner(partitionClass: String, partitionColumn: Array[String], 
partitionCount: Int,
-nodeList: Array[String])
-
-case class PartitionerField(partitionColumn: String, dataType: Option[String],
-columnComment: String)
-
-case class DataLoadTableFileMapping(table: String, loadPath: String)
-
-case class CarbonMergerMapping(storeLocation: String,
-storePath: String,
-metadataFilePath: String,
-mergedLoadName: String,
-kettleHomePath: String,
-tableCreationTime: Long,
-databaseName: String,
-  

[14/14] incubator-carbondata git commit: [CARBONDATA-463] Extract code to spark-common. This closes #365

2016-11-29 Thread ravipesala
[CARBONDATA-463] Extract code to spark-common. This closes #365


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d94b99f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d94b99f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d94b99f3

Branch: refs/heads/master
Commit: d94b99f366465b2426bbcd8277f80651f2408770
Parents: 567fa51 66ccd30
Author: ravipesala 
Authored: Wed Nov 30 13:21:06 2016 +0530
Committer: ravipesala 
Committed: Wed Nov 30 13:21:06 2016 +0530

--
 .../examples/GenerateDictionaryExample.scala|   2 +-
 integration/spark-common/pom.xml|   2 +-
 .../MalformedCarbonCommandException.java|  83 ++
 .../carbondata/spark/load/CarbonLoaderUtil.java | 973 ++
 .../spark/load/DeleteLoadFolders.java   | 259 +
 .../spark/load/DeletedLoadMetadata.java |  53 +
 .../spark/merger/CarbonCompactionExecutor.java  | 233 +
 .../spark/merger/CarbonCompactionUtil.java  | 283 ++
 .../spark/merger/CarbonDataMergerUtil.java  | 695 +
 .../spark/merger/CompactionCallable.java|  44 +
 .../carbondata/spark/merger/CompactionType.java |  28 +
 .../spark/merger/NodeBlockRelation.java |  60 ++
 .../spark/merger/NodeMultiBlockRelation.java|  59 ++
 .../spark/merger/RowResultMerger.java   | 336 +++
 .../carbondata/spark/merger/TableMeta.java  |  42 +
 .../spark/merger/TupleConversionAdapter.java|  85 ++
 .../spark/partition/api/DataPartitioner.java|  54 +
 .../spark/partition/api/Partition.java  |  42 +
 .../api/impl/DataPartitionerProperties.java |  87 ++
 .../partition/api/impl/DefaultLoadBalancer.java |  69 ++
 .../spark/partition/api/impl/PartitionImpl.java |  54 +
 .../api/impl/PartitionMultiFileImpl.java|  51 +
 .../api/impl/QueryPartitionHelper.java  |  77 ++
 .../api/impl/SampleDataPartitionerImpl.java | 151 +++
 .../readsupport/SparkRowReadSupportImpl.java|  69 ++
 .../carbondata/spark/splits/TableSplit.java | 129 +++
 .../carbondata/spark/util/CarbonQueryUtil.java  | 142 +++
 .../carbondata/spark/util/LoadMetadataUtil.java |  61 ++
 .../spark/CarbonAliasDecoderRelation.scala  |  43 +
 .../spark/CarbonColumnValidator.scala   |  36 +
 .../apache/carbondata/spark/CarbonFilters.scala | 391 
 .../apache/carbondata/spark/CarbonOption.scala  |  48 +
 .../carbondata/spark/CarbonSparkFactory.scala   |  59 ++
 .../spark/DictionaryDetailHelper.scala  |  63 ++
 .../org/apache/carbondata/spark/KeyVal.scala|  89 ++
 .../carbondata/spark/csv/CarbonCsvReader.scala  | 182 
 .../spark/csv/CarbonCsvRelation.scala   | 249 +
 .../carbondata/spark/csv/CarbonTextFile.scala   |  91 ++
 .../carbondata/spark/csv/DefaultSource.scala| 183 
 .../spark/rdd/CarbonCleanFilesRDD.scala |  82 ++
 .../spark/rdd/CarbonDataLoadRDD.scala   | 598 
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |  91 ++
 .../spark/rdd/CarbonDeleteLoadRDD.scala |  84 ++
 .../spark/rdd/CarbonDropTableRDD.scala  |  71 ++
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 557 +++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 342 +++
 .../spark/rdd/CarbonSparkPartition.scala|  35 +
 .../apache/carbondata/spark/rdd/Compactor.scala | 130 +++
 .../spark/rdd/DataLoadCoalescedRDD.scala|  68 ++
 .../spark/rdd/DataLoadPartitionCoalescer.scala  | 363 +++
 .../spark/tasks/DictionaryWriterTask.scala  | 106 ++
 .../spark/tasks/SortIndexWriterTask.scala   |  59 ++
 .../carbondata/spark/util/CarbonScalaUtil.scala | 195 
 .../carbondata/spark/util/CommonUtil.scala  | 259 +
 .../spark/util/DataTypeConverterUtil.scala  |  74 ++
 .../spark/util/GlobalDictionaryUtil.scala   | 843 
 .../CarbonTableIdentifierImplicit.scala |  42 +
 .../execution/command/carbonTableSchema.scala   | 359 +++
 .../spark/sql/hive/DistributionUtil.scala   | 167 
 .../CarbonDecoderOptimizerHelper.scala  | 149 +++
 .../scala/org/apache/spark/util/FileUtils.scala |  94 ++
 .../apache/spark/util/ScalaCompilerUtil.scala   |  35 +
 .../scala/org/apache/spark/util/SparkUtil.scala |  73 ++
 .../spark/merger/CarbonCompactionExecutor.java  | 233 -
 .../spark/merger/CarbonCompactionUtil.java  | 284 --
 .../spark/merger/CompactionCallable.java|  44 -
 .../spark/merger/CompactionType.java|  28 -
 .../spark/merger/RowResultMerger.java   | 336 ---
 .../spark/merger/TupleConversionAdapter.java|  85 --
 .../MalformedCarbonCommandException.java|  83 --
 .../carbondata/spark/load/CarbonLoaderUtil.java | 976 ---
 .../spark/load/DeleteLoadFolders.java   | 259 -
 .../spa

[09/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
--
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
new file mode 100644
index 000..af349a8
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet}
+import scala.collection.mutable
+
+import org.apache.spark.Partition
+import org.apache.spark.scheduler.TaskLocation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * DataLoadPartitionCoalescer
+ * Repartition the partitions of rdd to few partitions, one partition per node.
+ * exmaple:
+ * blk_hst  host1 host2 host3 host4 host5
+ * block1   host1 host2 host3
+ * block2 host2   host4 host5
+ * block3   host3 host4 host5
+ * block4   host1 host2   host4
+ * block5   host1   host3 host4
+ * block6   host1 host2 host5
+ * ---
+ * 1. sort host by number of blocks
+ * ---
+ * host3: block1 block3 block5
+ * host5: block2 block3 block6
+ * host1: block1 block4 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * ---
+ * 2. sort blocks of each host1
+ * new partitions are before old partitions
+ * ---
+ * host3:  block1 block3 block5
+ * host5:block2 block6+block3
+ * host1: block4+block1 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * ---
+ * 3. assign blocks to host
+ * ---
+ * step1: host3 choose block1, remove from host1, host2
+ * step2: host5 choose block2, remove from host2, host4
+ * step3: host1 choose block4, .
+ * ---
+ * result:
+ * host3:  block1   block5
+ * host5:block2
+ * host1: block4
+ * host2:  block6
+ * host4:block3
+ */
+class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) {
+
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  val prevPartitions = prev.partitions
+  var numOfParts = Math.max(1, Math.min(nodeList.length, 
prevPartitions.length))
+  // host => partition id list
+  val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]]
+  // partition id => host list
+  val partitionIdMapHosts = new HashMap[Int, ArrayBuffer[String]]
+  val noLocalityPartitions = new ArrayBuffer[Int]
+  var noLocality = true
+  /**
+   * assign a task location for a partition
+   */
+  private def getLocation(index: Int): Option[String] = {
+if (index < nodeList.length) {
+  Some(nodeList(index))
+} else {
+  None
+}
+  }
+
+  /**
+   * collect partitions to each node
+   */
+  private def groupByNode(): Unit = {
+// initialize hostMapPartitionIds
+nodeList.foreach { node =>
+  val map = new LinkedHashSet[Int]
+  hostMapPartitionIds.put(node, map)
+}
+// collect partitions for each node
+val tmpNoLocalityPartitions = new ArrayBuffer[Int]
+prevPartitions.foreach { p =>
+  val locs = DataLoadPartitionCoalescer.getPreferredLocs(prev, p)
+  if (locs.isEmpty) {
+// if a partition has no location, add to noLocalityPartitions
+tmpNoLocalityPartitions += p.index
+  } else {
+// add partion to hostMapPartitionIds and partitionIdMapHosts
+locs.foreach { loc =>
+  val host = loc

[11/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
--
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
new file mode 100644
index 000..d2e716f
--- /dev/null
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.spark.partition.api.Partition;
+import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
+import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
+import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
+import org.apache.carbondata.spark.splits.TableSplit;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * This utilty parses the Carbon query plan to actual query model object.
+ */
+public final class CarbonQueryUtil {
+
+  private CarbonQueryUtil() {
+
+  }
+
+  /**
+   * It creates the one split for each region server.
+   */
+  public static synchronized TableSplit[] getTableSplits(String databaseName, 
String tableName,
+  CarbonQueryPlan queryPlan) throws IOException {
+
+//Just create splits depends on locations of region servers
+List allPartitions = null;
+if (queryPlan == null) {
+  allPartitions =
+  QueryPartitionHelper.getInstance().getAllPartitions(databaseName, 
tableName);
+} else {
+  allPartitions =
+  QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
+}
+TableSplit[] splits = new TableSplit[allPartitions.size()];
+for (int i = 0; i < splits.length; i++) {
+  splits[i] = new TableSplit();
+  List locations = new 
ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+  Partition partition = allPartitions.get(i);
+  String location = QueryPartitionHelper.getInstance()
+  .getLocation(partition, databaseName, tableName);
+  locations.add(location);
+  splits[i].setPartition(partition);
+  splits[i].setLocations(locations);
+}
+
+return splits;
+  }
+
+  /**
+   * It creates the one split for each region server.
+   */
+  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) 
throws Exception {
+
+//Just create splits depends on locations of region servers
+DefaultLoadBalancer loadBalancer = null;
+List allPartitions = getAllFilesForDataLoad(sourcePath);
+loadBalancer = new DefaultLoadBalancer(new ArrayList(), 
allPartitions);
+TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
+for (int i = 0; i < tblSplits.length; i++) {
+  tblSplits[i] = new TableSplit();
+  List locations = new 
ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+  Partition partition = allPartitions.get(i);
+  String location = loadBalancer.getNodeForPartitions(partition);
+  locations.add(location);
+  tblSplits[i].setPartition(partition);
+  tblSplits[i].setLocations(locations);
+}
+return tblSplits;
+  }
+
+  /**
+   * split sourcePath by comma
+   */
+  public static void splitFilePath(String sourcePath, List 
partitionsFiles,
+  String separator) {
+if (StringUtils.isNotEmpty(sourcePath)) {
+  String[] files = sourcePath.split(separator);
+  for (String file : files) {
+partitionsFiles.add(file);
+  }
+}
+  }
+
+  private static List getAllFilesForDataLoad(String sourcePath) 
throws Exception {
+List files = new 
ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+splitFilePath(sourcePath, files,

[13/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
rebase

rebase

rename package

rebase

change package name

fix style

fix spark2


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/66ccd308
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/66ccd308
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/66ccd308

Branch: refs/heads/master
Commit: 66ccd308536d5e64f05fe57aa3a7a9003b4adf5a
Parents: 567fa51
Author: jackylk 
Authored: Tue Nov 29 17:42:06 2016 +0800
Committer: ravipesala 
Committed: Wed Nov 30 13:19:54 2016 +0530

--
 .../examples/GenerateDictionaryExample.scala|   2 +-
 integration/spark-common/pom.xml|   2 +-
 .../MalformedCarbonCommandException.java|  83 ++
 .../carbondata/spark/load/CarbonLoaderUtil.java | 973 ++
 .../spark/load/DeleteLoadFolders.java   | 259 +
 .../spark/load/DeletedLoadMetadata.java |  53 +
 .../spark/merger/CarbonCompactionExecutor.java  | 233 +
 .../spark/merger/CarbonCompactionUtil.java  | 283 ++
 .../spark/merger/CarbonDataMergerUtil.java  | 695 +
 .../spark/merger/CompactionCallable.java|  44 +
 .../carbondata/spark/merger/CompactionType.java |  28 +
 .../spark/merger/NodeBlockRelation.java |  60 ++
 .../spark/merger/NodeMultiBlockRelation.java|  59 ++
 .../spark/merger/RowResultMerger.java   | 336 +++
 .../carbondata/spark/merger/TableMeta.java  |  42 +
 .../spark/merger/TupleConversionAdapter.java|  85 ++
 .../spark/partition/api/DataPartitioner.java|  54 +
 .../spark/partition/api/Partition.java  |  42 +
 .../api/impl/DataPartitionerProperties.java |  87 ++
 .../partition/api/impl/DefaultLoadBalancer.java |  69 ++
 .../spark/partition/api/impl/PartitionImpl.java |  54 +
 .../api/impl/PartitionMultiFileImpl.java|  51 +
 .../api/impl/QueryPartitionHelper.java  |  77 ++
 .../api/impl/SampleDataPartitionerImpl.java | 151 +++
 .../readsupport/SparkRowReadSupportImpl.java|  69 ++
 .../carbondata/spark/splits/TableSplit.java | 129 +++
 .../carbondata/spark/util/CarbonQueryUtil.java  | 142 +++
 .../carbondata/spark/util/LoadMetadataUtil.java |  61 ++
 .../spark/CarbonAliasDecoderRelation.scala  |  43 +
 .../spark/CarbonColumnValidator.scala   |  36 +
 .../apache/carbondata/spark/CarbonFilters.scala | 391 
 .../apache/carbondata/spark/CarbonOption.scala  |  48 +
 .../carbondata/spark/CarbonSparkFactory.scala   |  59 ++
 .../spark/DictionaryDetailHelper.scala  |  63 ++
 .../org/apache/carbondata/spark/KeyVal.scala|  89 ++
 .../carbondata/spark/csv/CarbonCsvReader.scala  | 182 
 .../spark/csv/CarbonCsvRelation.scala   | 249 +
 .../carbondata/spark/csv/CarbonTextFile.scala   |  91 ++
 .../carbondata/spark/csv/DefaultSource.scala| 183 
 .../spark/rdd/CarbonCleanFilesRDD.scala |  82 ++
 .../spark/rdd/CarbonDataLoadRDD.scala   | 598 
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |  91 ++
 .../spark/rdd/CarbonDeleteLoadRDD.scala |  84 ++
 .../spark/rdd/CarbonDropTableRDD.scala  |  71 ++
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 557 +++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 342 +++
 .../spark/rdd/CarbonSparkPartition.scala|  35 +
 .../apache/carbondata/spark/rdd/Compactor.scala | 130 +++
 .../spark/rdd/DataLoadCoalescedRDD.scala|  68 ++
 .../spark/rdd/DataLoadPartitionCoalescer.scala  | 363 +++
 .../spark/tasks/DictionaryWriterTask.scala  | 106 ++
 .../spark/tasks/SortIndexWriterTask.scala   |  59 ++
 .../carbondata/spark/util/CarbonScalaUtil.scala | 195 
 .../carbondata/spark/util/CommonUtil.scala  | 259 +
 .../spark/util/DataTypeConverterUtil.scala  |  74 ++
 .../spark/util/GlobalDictionaryUtil.scala   | 843 
 .../CarbonTableIdentifierImplicit.scala |  42 +
 .../execution/command/carbonTableSchema.scala   | 359 +++
 .../spark/sql/hive/DistributionUtil.scala   | 167 
 .../CarbonDecoderOptimizerHelper.scala  | 149 +++
 .../scala/org/apache/spark/util/FileUtils.scala |  94 ++
 .../apache/spark/util/ScalaCompilerUtil.scala   |  35 +
 .../scala/org/apache/spark/util/SparkUtil.scala |  73 ++
 .../spark/merger/CarbonCompactionExecutor.java  | 233 -
 .../spark/merger/CarbonCompactionUtil.java  | 284 --
 .../spark/merger/CompactionCallable.java|  44 -
 .../spark/merger/CompactionType.java|  28 -
 .../spark/merger/RowResultMerger.java   | 336 ---
 .../spark/merger/TupleConversionAdapter.java|  85 --
 .../MalformedCarbonCommandException.java|  83 --
 .../carbondata/spark/load/CarbonLoaderUtil.java | 976 ---
 .../spark/load/DeleteLoadFolders.java   | 259 -

[03/14] incubator-carbondata git commit: rebase

2016-11-29 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
--
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
deleted file mode 100644
index 9a4e209..000
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ /dev/null
@@ -1,875 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import java.io.{FileNotFoundException, IOException}
-import java.nio.charset.Charset
-import java.util.regex.Pattern
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.language.implicitConversions
-import scala.util.control.Breaks.{break, breakable}
-
-import org.apache.commons.lang3.{ArrayUtils, StringUtils}
-import org.apache.spark.Accumulator
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-import org.apache.spark.util.FileUtils
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.carbon.path.CarbonStorePath
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.reader.CarbonDictionaryReader
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.writer.CarbonDictionaryWriter
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.CarbonSparkFactory
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.rdd._
-
-/**
- * A object which provide a method to generate global dictionary from CSV 
files.
- */
-object GlobalDictionaryUtil {
-  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * The default separator to use if none is supplied to the constructor.
-   */
-  val DEFAULT_SEPARATOR: Char = ','
-  /**
-   * The default quote character to use if none is supplied to the
-   * constructor.
-   */
-  val DEFAULT_QUOTE_CHARACTER: Char = '"'
-
-  /**
-   * find columns which need to generate global dictionary.
-   *
-   * @param dimensions dimension list of schema
-   * @param headerscolumn headers
-   * @param columnscolumn list of csv file
-   */
-  def pruneDimensions(dimensions: Array[CarbonDimension],
-  headers: Array[String],
-  columns: Array[String]): (Array[CarbonDimension], Array[String]) = {
-val dimensionBuffer = new ArrayBuffer[CarbonDimension]
-val columnNameBuffer = new ArrayBuffer[String]
-val dimensionsWithDict = dimensions.filter(hasEncoding(_, 
Encoding.DICTIONARY,
-  Encoding.DIRECT_DICTIONARY))
-dimensionsWithDict.foreach { dim =>
-  breakable {
-headers.zipWithIndex.foreach { h =>
-  if (dim.getColName.equalsIgnoreCase(h._1)) {
-dimensionBuffer += dim
-columnNameBuffer += columns(h._2)
-break
-  }
-}
-  }
-}
-(dimensionBuffer.toArray, columnNameBuffer.toArray)
-  }
-
-  /**
-   * use this method to judge whether CarbonDimension use some encoding or not
-   *
-   * @param dimension   carbonDimension
-   * @param encodingthe coding way of dimension
-   * @param excludeEncoding the coding way to exclude
-   */
-  def hasE

[2/2] incubator-carbondata git commit: [CARBONDATA-368]Imporve performance of dataframe loading This closes #278

2016-11-29 Thread jackylk
[CARBONDATA-368]Imporve performance of dataframe loading This closes #278


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/567fa513
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/567fa513
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/567fa513

Branch: refs/heads/master
Commit: 567fa5131628b70c8c4829368fda6d48cb013af3
Parents: 879bfe7 f8a0c87
Author: jackylk 
Authored: Tue Nov 29 17:15:20 2016 +0800
Committer: jackylk 
Committed: Tue Nov 29 17:15:20 2016 +0800

--
 .../spark/rdd/CarbonDataLoadRDD.scala   |  96 ++---
 .../spark/rdd/CarbonDataRDDFactory.scala|  88 +++--
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  11 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  51 +++
 .../spark/util/GlobalDictionaryUtil.scala   |  11 +-
 .../apache/spark/rdd/DataLoadCoalescedRDD.scala |  68 
 .../spark/rdd/DataLoadPartitionCoalescer.scala  | 363 +++
 .../spark/sql/hive/DistributionUtil.scala   |  19 +-
 .../org/apache/spark/util/TaskContextUtil.scala |  29 ++
 .../TestDataLoadPartitionCoalescer.scala| 170 +
 .../spark/util/AllDictionaryTestCase.scala  |   9 +-
 .../util/ExternalColumnDictionaryTestCase.scala |  14 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |  23 +-
 .../util/GlobalDictionaryUtilTestCase.scala |  10 +-
 .../processing/csvreaderstep/CsvInput.java  |  73 +++-
 .../csvreaderstep/JavaRddIterator.java  |  32 ++
 .../processing/csvreaderstep/RddInputUtils.java |  11 +-
 17 files changed, 921 insertions(+), 157 deletions(-)
--




[1/2] incubator-carbondata git commit: DataLoadCoalescedRDD

2016-11-29 Thread jackylk
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 879bfe742 -> 567fa5131


DataLoadCoalescedRDD

DataLoadPartitionCoalescer

concurrently read dataframe

add test case

fix comments

fix comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f8a0c876
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f8a0c876
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f8a0c876

Branch: refs/heads/master
Commit: f8a0c876158be256119219bde4cce0e074acf03a
Parents: 879bfe7
Author: QiangCai 
Authored: Mon Oct 24 10:54:20 2016 +0800
Committer: jackylk 
Committed: Tue Nov 29 17:06:23 2016 +0800

--
 .../spark/rdd/CarbonDataLoadRDD.scala   |  96 ++---
 .../spark/rdd/CarbonDataRDDFactory.scala|  88 +++--
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  11 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  51 +++
 .../spark/util/GlobalDictionaryUtil.scala   |  11 +-
 .../apache/spark/rdd/DataLoadCoalescedRDD.scala |  68 
 .../spark/rdd/DataLoadPartitionCoalescer.scala  | 363 +++
 .../spark/sql/hive/DistributionUtil.scala   |  19 +-
 .../org/apache/spark/util/TaskContextUtil.scala |  29 ++
 .../TestDataLoadPartitionCoalescer.scala| 170 +
 .../spark/util/AllDictionaryTestCase.scala  |   9 +-
 .../util/ExternalColumnDictionaryTestCase.scala |  14 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |  23 +-
 .../util/GlobalDictionaryUtilTestCase.scala |  10 +-
 .../processing/csvreaderstep/CsvInput.java  |  73 +++-
 .../csvreaderstep/JavaRddIterator.java  |  32 ++
 .../processing/csvreaderstep/RddInputUtils.java |  11 +-
 17 files changed, 921 insertions(+), 157 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
--
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 87b5673..e306a89 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -28,9 +28,12 @@ import scala.util.Random
 
 import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
SparkEnv, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.DataLoadCoalescedRDD
+import org.apache.spark.rdd.DataLoadPartitionWrap
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 import org.apache.spark.sql.Row
+import org.apache.spark.util.TaskContextUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
@@ -38,6 +41,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonTimeStatisticsFactory}
 import org.apache.carbondata.processing.constants.DataProcessorConstants
+import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
 import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator
@@ -46,6 +50,7 @@ import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.CarbonQueryUtil
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
  * This partition class use to split by TableSplit
@@ -125,6 +130,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
 try {
   CarbonLoaderUtil.executeGraph(model, storeLocation, storePath,
 kettleHomePath)
+  
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
 } catch {
   case e: DataLoadingException => if (e.getErrorCode ==
   
DataProcessorConstants.BAD_REC_FOUND) {
@@ -235,14 +241,11 @@ class DataFileLoaderRDD[K, V](
theSplit.index
   try {
 loadMetadataDetails.setPartitionCount(partitionID)
-
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
-
 carbonLoadModel.setSegmentId(String.valueOf(loadCount))
 setModelAndBlocksInfo()
 val loader = new SparkPartitionLoade