http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala new file mode 100644 index 0000000..af349a8 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet} +import scala.collection.mutable + +import org.apache.spark.Partition +import org.apache.spark.scheduler.TaskLocation + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * DataLoadPartitionCoalescer + * Repartition the partitions of rdd to few partitions, one partition per node. + * exmaple: + * blk_hst host1 host2 host3 host4 host5 + * block1 host1 host2 host3 + * block2 host2 host4 host5 + * block3 host3 host4 host5 + * block4 host1 host2 host4 + * block5 host1 host3 host4 + * block6 host1 host2 host5 + * ------------------------------------------------------- + * 1. sort host by number of blocks + * ------------------------------------------------------- + * host3: block1 block3 block5 + * host5: block2 block3 block6 + * host1: block1 block4 block5 block6 + * host2: block1 block2 block4 block6 + * host4: block2 block3 block4 block5 + * ------------------------------------------------------- + * 2. sort blocks of each host1 + * new partitions are before old partitions + * ------------------------------------------------------- + * host3: block1 block3 block5 + * host5: block2 block6+block3 + * host1: block4+block1 block5 block6 + * host2: block1 block2 block4 block6 + * host4: block2 block3 block4 block5 + * ------------------------------------------------------- + * 3. assign blocks to host + * ------------------------------------------------------- + * step1: host3 choose block1, remove from host1, host2 + * step2: host5 choose block2, remove from host2, host4 + * step3: host1 choose block4, ..... + * ------------------------------------------------------- + * result: + * host3: block1 block5 + * host5: block2 + * host1: block4 + * host2: block6 + * host4: block3 + */ +class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val prevPartitions = prev.partitions + var numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length)) + // host => partition id list + val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]] + // partition id => host list + val partitionIdMapHosts = new HashMap[Int, ArrayBuffer[String]] + val noLocalityPartitions = new ArrayBuffer[Int] + var noLocality = true + /** + * assign a task location for a partition + */ + private def getLocation(index: Int): Option[String] = { + if (index < nodeList.length) { + Some(nodeList(index)) + } else { + None + } + } + + /** + * collect partitions to each node + */ + private def groupByNode(): Unit = { + // initialize hostMapPartitionIds + nodeList.foreach { node => + val map = new LinkedHashSet[Int] + hostMapPartitionIds.put(node, map) + } + // collect partitions for each node + val tmpNoLocalityPartitions = new ArrayBuffer[Int] + prevPartitions.foreach { p => + val locs = DataLoadPartitionCoalescer.getPreferredLocs(prev, p) + if (locs.isEmpty) { + // if a partition has no location, add to noLocalityPartitions + tmpNoLocalityPartitions += p.index + } else { + // add partion to hostMapPartitionIds and partitionIdMapHosts + locs.foreach { loc => + val host = loc.host + hostMapPartitionIds.get(host) match { + // if the location of the partition is not in node list, + // will add this partition to noLocalityPartitions + case None => tmpNoLocalityPartitions += p.index + case Some(ids) => + noLocality = false + ids += p.index + partitionIdMapHosts.get(p.index) match { + case None => + val hosts = new ArrayBuffer[String] + hosts += host + partitionIdMapHosts.put(p.index, hosts) + case Some(hosts) => + hosts += host + } + } + } + } + } + + // remove locality partition + tmpNoLocalityPartitions.distinct.foreach {index => + partitionIdMapHosts.get(index) match { + case None => noLocalityPartitions += index + case Some(_) => + } + } + } + + /** + * sort host and partitions + */ + private def sortHostAndPartitions(hostMapPartitionIdsSeq: Seq[(String, LinkedHashSet[Int])]) = { + val oldPartitionIdSet = new HashSet[Int] + // sort host by number of partitions + hostMapPartitionIdsSeq.sortBy(_._2.size).map { loc => + // order: newPartitionIds + oldPartitionIds + val sortedPartitionIdSet = new LinkedHashSet[Int] + var newPartitionIds = new ArrayBuffer[Int] + var oldPartitionIds = new ArrayBuffer[Int] + loc._2.foreach { p => + if (oldPartitionIdSet.contains(p)) { + oldPartitionIds += p + } else { + newPartitionIds += p + oldPartitionIdSet.add(p) + } + } + // sort and add new partitions + newPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_)) + // sort and add old partitions + oldPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_)) + // update hostMapPartitionIds + hostMapPartitionIds.put(loc._1, sortedPartitionIdSet) + (loc._1, sortedPartitionIdSet) + }.toArray + } + + /** + * assign locality partition to each host + */ + private def assignPartitonNodeLocality( + noEmptyHosts: Seq[(String, LinkedHashSet[Int])]): Array[ArrayBuffer[Int]] = { + val localityResult = new Array[ArrayBuffer[Int]](noEmptyHosts.length) + for (i <- 0 until localityResult.length) { + localityResult(i) = new ArrayBuffer[Int] + } + val noEmptyHostSet = new HashSet[String] + noEmptyHosts.foreach {loc => noEmptyHostSet.add(loc._1)} + + var hostIndex = 0 + while (noEmptyHostSet.nonEmpty) { + val hostEntry = noEmptyHosts(hostIndex) + if (noEmptyHostSet.contains(hostEntry._1)) { + if (hostEntry._2.nonEmpty) { + var partitionId = hostEntry._2.iterator.next + localityResult(hostIndex) += partitionId + // remove from sortedParts + partitionIdMapHosts.get(partitionId) match { + case Some(locs) => + locs.foreach { loc => + hostMapPartitionIds.get(loc) match { + case Some(parts) => + parts.remove(partitionId) + } + } + } + } else { + noEmptyHostSet.remove(hostEntry._1) + } + } + + hostIndex = hostIndex + 1 + if (hostIndex == noEmptyHosts.length) { + hostIndex = 0 + } + } + localityResult + } + + /** + * assign no locality partitions to each host + */ + private def assignPartitionNoLocality(emptyHosts: mutable.Buffer[String], + noEmptyHosts: mutable.Buffer[String], + localityResult: mutable.Buffer[ArrayBuffer[Int]]): Array[ArrayBuffer[Int]] = { + val noLocalityResult = new Array[ArrayBuffer[Int]](emptyHosts.length) + LOGGER.info(s"non empty host: ${noEmptyHosts.length}, empty host: ${emptyHosts.length}") + val avgNumber = prevPartitions.length / (noEmptyHosts.length + emptyHosts.length) + for (i <- 0 until noLocalityResult.length) { + noLocalityResult(i) = new ArrayBuffer[Int] + } + var noLocalityPartitionIndex = 0 + if (noLocalityPartitions.nonEmpty) { + if (emptyHosts.nonEmpty) { + // at first, assign avg number to empty node + for (i <- 0 until avgNumber) { + noLocalityResult.foreach { partitionIds => + if (noLocalityPartitionIndex < noLocalityPartitions.length) { + partitionIds += noLocalityPartitions(noLocalityPartitionIndex) + noLocalityPartitionIndex = noLocalityPartitionIndex + 1 + } + } + } + } + // still have no locality partitions + // assign to all hosts + if (noLocalityPartitionIndex < noLocalityPartitions.length) { + var partIndex = 0 + for (i <- noLocalityPartitionIndex until noLocalityPartitions.length) { + if (partIndex < localityResult.length) { + localityResult(partIndex) += noLocalityPartitions(i) + } else { + noLocalityResult(partIndex - localityResult.length) += noLocalityPartitions(i) + } + partIndex = partIndex + 1 + if (partIndex == localityResult.length + noLocalityResult.length) { + partIndex = 0 + } + } + } + } + noLocalityResult + } + + /** + * no locality repartition + */ + private def repartitionNoLocality(): Array[Partition] = { + // no locality repartition + LOGGER.info("no locality partition") + val prevPartIndexs = new Array[ArrayBuffer[Int]](numOfParts) + for (i <- 0 until numOfParts) { + prevPartIndexs(i) = new ArrayBuffer[Int] + } + for (i <- 0 until prevPartitions.length) { + prevPartIndexs(i % numOfParts) += prevPartitions(i).index + } + prevPartIndexs.filter(_.nonEmpty).zipWithIndex.map { x => + new CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2)) + } + } + + private def repartitionLocality(): Array[Partition] = { + LOGGER.info("locality partition") + val hostMapPartitionIdsSeq = hostMapPartitionIds.toSeq + // empty host seq + val emptyHosts = hostMapPartitionIdsSeq.filter(_._2.isEmpty).map(_._1).toBuffer + // non empty host array + var tempNoEmptyHosts = hostMapPartitionIdsSeq.filter(_._2.nonEmpty) + + // 1. do locality repartition + // sort host and partitions + tempNoEmptyHosts = sortHostAndPartitions(tempNoEmptyHosts) + // assign locality partition to non empty hosts + val templocalityResult = assignPartitonNodeLocality(tempNoEmptyHosts) + // collect non empty hosts and empty hosts + val noEmptyHosts = mutable.Buffer[String]() + val localityResult = mutable.Buffer[ArrayBuffer[Int]]() + for(index <- 0 until templocalityResult.size) { + if (templocalityResult(index).isEmpty) { + emptyHosts += tempNoEmptyHosts(index)._1 + } else { + noEmptyHosts += tempNoEmptyHosts(index)._1 + localityResult += templocalityResult(index) + } + } + // 2. do no locality repartition + // assign no locality partitions to all hosts + val noLocalityResult = assignPartitionNoLocality(emptyHosts, noEmptyHosts, localityResult) + + // 3. generate CoalescedRDDPartition + (0 until localityResult.length + noLocalityResult.length).map { index => + val ids = if (index < localityResult.length) { + localityResult(index).toArray + } else { + noLocalityResult(index - localityResult.length).toArray + } + val loc = if (index < localityResult.length) { + Some(noEmptyHosts(index)) + } else { + Some(emptyHosts(index - localityResult.length)) + } + LOGGER.info(s"CoalescedRDDPartition ${index}, ${ids.length}, ${loc} ") + new CoalescedRDDPartition(index, prev, ids, loc) + }.filter(_.parentsIndices.nonEmpty).toArray + + } + + def run(): Array[Partition] = { + // 1. group partitions by node + groupByNode() + LOGGER.info(s"partition: ${prevPartitions.length}, no locality: ${noLocalityPartitions.length}") + val partitions = if (noLocality) { + // 2.A no locality partition + repartitionNoLocality() + } else { + // 2.B locality partition + repartitionLocality() + } + DataLoadPartitionCoalescer.checkPartition(prevPartitions, partitions) + partitions + } +} + +object DataLoadPartitionCoalescer { + def getPreferredLocs(prev: RDD[_], p: Partition): Seq[TaskLocation] = { + prev.context.getPreferredLocs(prev, p.index) + } + + def getParentsIndices(p: Partition): Array[Int] = { + p.asInstanceOf[CoalescedRDDPartition].parentsIndices + } + + def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = { + val prevPartIds = new ArrayBuffer[Int] + parts.foreach{ p => + prevPartIds ++= DataLoadPartitionCoalescer.getParentsIndices(p) + } + // all partitions must be arranged once. + assert(prevPartIds.size == prevParts.size) + val prevPartIdsMap = prevPartIds.map{ id => + (id, id) + }.toMap + prevParts.foreach{ p => + prevPartIdsMap.get(p.index) match { + case None => assert(false, "partition " + p.index + " not found") + case Some(_) => + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala new file mode 100644 index 0000000..e23b58d --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.tasks + +import java.io.IOException + +import scala.collection.mutable + +import org.apache.carbondata.common.factory.CarbonCommonFactory +import org.apache.carbondata.core.cache.dictionary.Dictionary +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.DataTypeUtil +import org.apache.carbondata.core.writer.CarbonDictionaryWriter +import org.apache.carbondata.spark.rdd.DictionaryLoadModel + +/** + * + * @param valuesBuffer + * @param dictionary + * @param model + * @param columnIndex + * @param writer + */ +class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String], + dictionary: Dictionary, + model: DictionaryLoadModel, columnIndex: Int, + var writer: CarbonDictionaryWriter = null) { + + /** + * execute the task + * + * @return distinctValueList and time taken to write + */ + def execute(): java.util.List[String] = { + val values = valuesBuffer.toArray + java.util.Arrays.sort(values, Ordering[String]) + val dictService = CarbonCommonFactory.getDictionaryService + writer = dictService.getDictionaryWriter( + model.table, + model.columnIdentifier(columnIndex), + model.hdfsLocation) + val distinctValues: java.util.List[String] = new java.util.ArrayList() + + try { + if (!model.dictFileExists(columnIndex)) { + writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL) + distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL) + } + + if (values.length >= 1) { + if (model.dictFileExists(columnIndex)) { + for (value <- values) { + val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value, + model.primDimensions(columnIndex)) + if (null != parsedValue && dictionary.getSurrogateKey(parsedValue) == + CarbonCommonConstants.INVALID_SURROGATE_KEY) { + writer.write(parsedValue) + distinctValues.add(parsedValue) + } + } + + } else { + for (value <- values) { + val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value, + model.primDimensions(columnIndex)) + if (null != parsedValue) { + writer.write(parsedValue) + distinctValues.add(parsedValue) + } + } + } + } + } catch { + case ex: IOException => + throw ex + } finally { + if (null != writer) { + writer.close() + } + } + distinctValues + } + + /** + * update dictionary metadata + */ + def updateMetaData() { + if (null != writer) { + writer.commit() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala new file mode 100644 index 0000000..d552331 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.tasks + +import org.apache.carbondata.common.factory.CarbonCommonFactory +import org.apache.carbondata.core.cache.dictionary.Dictionary +import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator} +import org.apache.carbondata.spark.rdd.DictionaryLoadModel + +/** + * This task writes sort index file + * + * @param model + * @param index + * @param dictionary + * @param distinctValues + * @param carbonDictionarySortIndexWriter + */ +class SortIndexWriterTask(model: DictionaryLoadModel, + index: Int, + dictionary: Dictionary, + distinctValues: java.util.List[String], + var carbonDictionarySortIndexWriter: CarbonDictionarySortIndexWriter = null) { + def execute() { + try { + if (distinctValues.size() > 0) { + val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator + val dictService = CarbonCommonFactory.getDictionaryService + val dictionarySortInfo: CarbonDictionarySortInfo = + preparator.getDictionarySortInfo(distinctValues, dictionary, + model.primDimensions(index).getDataType) + carbonDictionarySortIndexWriter = + dictService.getDictionarySortIndexWriter(model.table, model.columnIdentifier(index), + model.hdfsLocation) + carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex) + carbonDictionarySortIndexWriter + .writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted) + } + } finally { + if (null != carbonDictionarySortIndexWriter) { + carbonDictionarySortIndexWriter.close() + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala new file mode 100644 index 0000000..dc63186 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.util + +import java.io.File +import java.text.SimpleDateFormat + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonDataType} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastorage.store.impl.FileFactory +import org.apache.carbondata.core.util.CarbonProperties + +object CarbonScalaUtil { + def convertSparkToCarbonDataType( + dataType: org.apache.spark.sql.types.DataType): CarbonDataType = { + dataType match { + case StringType => CarbonDataType.STRING + case ShortType => CarbonDataType.SHORT + case IntegerType => CarbonDataType.INT + case LongType => CarbonDataType.LONG + case DoubleType => CarbonDataType.DOUBLE + case FloatType => CarbonDataType.FLOAT + case DateType => CarbonDataType.DATE + case BooleanType => CarbonDataType.BOOLEAN + case TimestampType => CarbonDataType.TIMESTAMP + case ArrayType(_, _) => CarbonDataType.ARRAY + case StructType(_) => CarbonDataType.STRUCT + case NullType => CarbonDataType.NULL + case _ => CarbonDataType.DECIMAL + } + } + + def convertSparkToCarbonSchemaDataType(dataType: String): String = { + dataType match { + case CarbonCommonConstants.STRING_TYPE => CarbonCommonConstants.STRING + case CarbonCommonConstants.INTEGER_TYPE => CarbonCommonConstants.INTEGER + case CarbonCommonConstants.BYTE_TYPE => CarbonCommonConstants.INTEGER + case CarbonCommonConstants.SHORT_TYPE => CarbonCommonConstants.SHORT + case CarbonCommonConstants.LONG_TYPE => CarbonCommonConstants.NUMERIC + case CarbonCommonConstants.DOUBLE_TYPE => CarbonCommonConstants.NUMERIC + case CarbonCommonConstants.FLOAT_TYPE => CarbonCommonConstants.NUMERIC + case CarbonCommonConstants.DECIMAL_TYPE => CarbonCommonConstants.NUMERIC + case CarbonCommonConstants.DATE_TYPE => CarbonCommonConstants.STRING + case CarbonCommonConstants.BOOLEAN_TYPE => CarbonCommonConstants.STRING + case CarbonCommonConstants.TIMESTAMP_TYPE => CarbonCommonConstants.TIMESTAMP + case anyType => anyType + } + } + + def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = { + dataType match { + case CarbonDataType.STRING => StringType + case CarbonDataType.SHORT => ShortType + case CarbonDataType.INT => IntegerType + case CarbonDataType.LONG => LongType + case CarbonDataType.DOUBLE => DoubleType + case CarbonDataType.BOOLEAN => BooleanType + case CarbonDataType.DECIMAL => DecimalType.SYSTEM_DEFAULT + case CarbonDataType.TIMESTAMP => TimestampType + } + } + + def updateDataType( + currentDataType: org.apache.spark.sql.types.DataType): org.apache.spark.sql.types.DataType = { + currentDataType match { + case decimal: DecimalType => + val scale = currentDataType.asInstanceOf[DecimalType].scale + DecimalType(DecimalType.MAX_PRECISION, scale) + case _ => + currentDataType + } + } + + def getKettleHome(sqlContext: SQLContext): String = { + var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null) + if (null == kettleHomePath) { + kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home") + } + if (null == kettleHomePath) { + val carbonHome = System.getenv("CARBON_HOME") + if (null != carbonHome) { + kettleHomePath = carbonHome + "/processing/carbonplugins" + } + } + if (kettleHomePath != null) { + val sparkMaster = sqlContext.sparkContext.getConf.get("spark.master").toLowerCase() + // get spark master, if local, need to correct the kettle home + // e.g: --master local, the executor running in local machine + if (sparkMaster.startsWith("local")) { + val kettleHomeFileType = FileFactory.getFileType(kettleHomePath) + val kettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, kettleHomeFileType) + // check if carbon.kettle.home path is exists + if (!kettleHomeFile.exists()) { + // get the path of this class + // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon- + // xxx.jar!/org/carbondata/spark/rdd/ + var jarFilePath = this.getClass.getResource("").getPath + val endIndex = jarFilePath.indexOf(".jar!") + 4 + // get the jar file path + // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-*.jar + jarFilePath = jarFilePath.substring(0, endIndex) + val jarFileType = FileFactory.getFileType(jarFilePath) + val jarFile = FileFactory.getCarbonFile(jarFilePath, jarFileType) + // get the parent folder of the jar file + // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib + val carbonLibPath = jarFile.getParentFile.getPath + // find the kettle home under the previous folder + // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/cabonplugins + kettleHomePath = carbonLibPath + File.separator + CarbonCommonConstants.KETTLE_HOME_NAME + val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + logger.error(s"carbon.kettle.home path is not exists, reset it as $kettleHomePath") + val newKettleHomeFileType = FileFactory.getFileType(kettleHomePath) + val newKettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, newKettleHomeFileType) + // check if the found kettle home exists + if (!newKettleHomeFile.exists()) { + sys.error("Kettle home not found. Failed to reset carbon.kettle.home") + } + } + } + } else { + sys.error("carbon.kettle.home is not set") + } + kettleHomePath + } + + def getString(value: Any, + serializationNullFormat: String, + delimiterLevel1: String, + delimiterLevel2: String, + format: SimpleDateFormat, + level: Int = 1): String = { + if (value == null) { + serializationNullFormat + } else { + value match { + case s: String => s + case d: java.math.BigDecimal => d.toPlainString + case i: java.lang.Integer => i.toString + case d: java.lang.Double => d.toString + case t: java.sql.Timestamp => format format t + case d: java.sql.Date => format format d + case b: java.lang.Boolean => b.toString + case s: java.lang.Short => s.toString + case f: java.lang.Float => f.toString + case bs: Array[Byte] => new String(bs) + case s: scala.collection.Seq[Any] => + val delimiter = if (level == 1) { + delimiterLevel1 + } else { + delimiterLevel2 + } + val builder = new StringBuilder() + s.foreach { x => + builder.append(getString(x, serializationNullFormat, delimiterLevel1, + delimiterLevel2, format, level + 1)).append(delimiter) + } + builder.substring(0, builder.length - 1) + case m: scala.collection.Map[Any, Any] => + throw new Exception("Unsupported data type: Map") + case r: org.apache.spark.sql.Row => + val delimiter = if (level == 1) { + delimiterLevel1 + } else { + delimiterLevel2 + } + val builder = new StringBuilder() + for (i <- 0 until r.length) { + builder.append(getString(r(i), serializationNullFormat, delimiterLevel1, + delimiterLevel2, format, level + 1)).append(delimiter) + } + builder.substring(0, builder.length - 1) + case other => other.toString + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala new file mode 100644 index 0000000..1c9d774 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.util + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable.Map + +import org.apache.spark.sql.execution.command.ColumnProperty +import org.apache.spark.sql.execution.command.Field + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.lcm.status.SegmentStatusManager +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + +object CommonUtil { + def validateColumnGroup(colGroup: String, noDictionaryDims: Seq[String], + msrs: Seq[Field], retrievedColGrps: Seq[String], dims: Seq[Field]) { + val colGrpCols = colGroup.split(',').map(_.trim) + colGrpCols.foreach { x => + // if column is no dictionary + if (noDictionaryDims.contains(x)) { + throw new MalformedCarbonCommandException( + "Column group is not supported for no dictionary columns:" + x) + } else if (msrs.exists(msr => msr.column.equals(x))) { + // if column is measure + throw new MalformedCarbonCommandException("Column group is not supported for measures:" + x) + } else if (foundIndExistingColGrp(x)) { + throw new MalformedCarbonCommandException("Column is available in other column group:" + x) + } else if (isComplex(x, dims)) { + throw new MalformedCarbonCommandException( + "Column group doesn't support Complex column:" + x) + } else if (isTimeStampColumn(x, dims)) { + throw new MalformedCarbonCommandException( + "Column group doesn't support Timestamp datatype:" + x) + }// if invalid column is + else if (!dims.exists(dim => dim.column.equalsIgnoreCase(x))) { + // present + throw new MalformedCarbonCommandException( + "column in column group is not a valid column: " + x + ) + } + } + // check if given column is present in other groups + def foundIndExistingColGrp(colName: String): Boolean = { + retrievedColGrps.foreach { colGrp => + if (colGrp.split(",").contains(colName)) { + return true + } + } + false + } + + } + + + def isTimeStampColumn(colName: String, dims: Seq[Field]): Boolean = { + dims.foreach { dim => + if (dim.column.equalsIgnoreCase(colName)) { + if (dim.dataType.isDefined && null != dim.dataType.get && + "timestamp".equalsIgnoreCase(dim.dataType.get)) { + return true + } + } + } + false + } + + def isComplex(colName: String, dims: Seq[Field]): Boolean = { + dims.foreach { x => + if (x.children.isDefined && null != x.children.get && x.children.get.nonEmpty) { + val children = x.children.get + if (x.column.equals(colName)) { + return true + } else { + children.foreach { child => + val fieldName = x.column + "." + child.column + if (fieldName.equalsIgnoreCase(colName)) { + return true + } + } + } + } + } + false + } + + def getColumnProperties(column: String, + tableProperties: Map[String, String]): Option[util.List[ColumnProperty]] = { + val fieldProps = new util.ArrayList[ColumnProperty]() + val columnPropertiesStartKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "." + tableProperties.foreach { + case (key, value) => + if (key.startsWith(columnPropertiesStartKey)) { + fieldProps.add(ColumnProperty(key.substring(columnPropertiesStartKey.length(), + key.length()), value)) + } + } + if (fieldProps.isEmpty) { + None + } else { + Some(fieldProps) + } + } + + def validateTblProperties(tableProperties: Map[String, String], fields: Seq[Field]): Boolean = { + val itr = tableProperties.keys + var isValid: Boolean = true + tableProperties.foreach { + case (key, value) => + if (!validateFields(key, fields)) { + isValid = false + throw new MalformedCarbonCommandException(s"Invalid table properties ${ key }") + } + } + isValid + } + + def validateFields(key: String, fields: Seq[Field]): Boolean = { + var isValid: Boolean = false + fields.foreach { field => + if (field.children.isDefined && field.children.get != null) { + field.children.foreach(fields => { + fields.foreach(complexfield => { + val column = if ("val" == complexfield.column) { + field.column + } else { + field.column + "." + complexfield.column + } + if (validateColumnProperty(key, column)) { + isValid = true + } + } + ) + } + ) + } else { + if (validateColumnProperty(key, field.column)) { + isValid = true + } + } + + } + isValid + } + + def validateColumnProperty(key: String, column: String): Boolean = { + if (!key.startsWith(CarbonCommonConstants.COLUMN_PROPERTIES)) { + return true + } + val columnPropertyKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "." + if (key.startsWith(columnPropertyKey)) { + true + } else { + false + } + } + + /** + * @param colGrps + * @param dims + * @return columns of column groups in schema order + */ + def arrangeColGrpsInSchemaOrder(colGrps: Seq[String], dims: Seq[Field]): Seq[String] = { + def sortByIndex(colGrp1: String, colGrp2: String) = { + val firstCol1 = colGrp1.split(",")(0) + val firstCol2 = colGrp2.split(",")(0) + val dimIndex1: Int = getDimIndex(firstCol1, dims) + val dimIndex2: Int = getDimIndex(firstCol2, dims) + dimIndex1 < dimIndex2 + } + val sortedColGroups: Seq[String] = colGrps.sortWith(sortByIndex) + sortedColGroups + } + + /** + * @param colName + * @param dims + * @return return index for given column in dims + */ + def getDimIndex(colName: String, dims: Seq[Field]): Int = { + var index: Int = -1 + dims.zipWithIndex.foreach { h => + if (h._1.column.equalsIgnoreCase(colName)) { + index = h._2.toInt + } + } + index + } + + /** + * This method will validate the table block size specified by the user + * + * @param tableProperties + */ + def validateTableBlockSize(tableProperties: Map[String, String]): Unit = { + var tableBlockSize: Integer = 0 + if (tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).isDefined) { + val blockSizeStr: String = + parsePropertyValueStringInMB(tableProperties(CarbonCommonConstants.TABLE_BLOCKSIZE)) + try { + tableBlockSize = Integer.parseInt(blockSizeStr) + } catch { + case e: NumberFormatException => + throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " + + s"$blockSizeStr, only int value from 1 MB to " + + s"2048 MB is supported.") + } + if (tableBlockSize < CarbonCommonConstants.BLOCK_SIZE_MIN_VAL || + tableBlockSize > CarbonCommonConstants.BLOCK_SIZE_MAX_VAL) { + throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " + + s"$blockSizeStr, only int value from 1 MB to " + + s"2048 MB is supported.") + } + tableProperties.put(CarbonCommonConstants.TABLE_BLOCKSIZE, blockSizeStr) + } + } + + /** + * This method will parse the configure string from 'XX MB/M' to 'XX' + * + * @param propertyValueString + */ + def parsePropertyValueStringInMB(propertyValueString: String): String = { + var parsedPropertyValueString: String = propertyValueString + if (propertyValueString.trim.toLowerCase.endsWith("mb")) { + parsedPropertyValueString = propertyValueString.trim.toLowerCase + .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("mb")).trim + } + if (propertyValueString.trim.toLowerCase.endsWith("m")) { + parsedPropertyValueString = propertyValueString.trim.toLowerCase + .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("m")).trim + } + parsedPropertyValueString + } + + def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = { + val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath + val details = SegmentStatusManager.readLoadMetadata(metadataPath) + model.setLoadMetadataDetails(details.toList.asJava) + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala new file mode 100644 index 0000000..5ec96df --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.util + +import org.apache.carbondata.core.carbon.metadata.datatype.DataType + +object DataTypeConverterUtil { + def convertToCarbonType(dataType: String): DataType = { + dataType.toLowerCase match { + case "string" => DataType.STRING + case "int" => DataType.INT + case "integer" => DataType.INT + case "tinyint" => DataType.SHORT + case "short" => DataType.SHORT + case "long" => DataType.LONG + case "bigint" => DataType.LONG + case "numeric" => DataType.DOUBLE + case "double" => DataType.DOUBLE + case "decimal" => DataType.DECIMAL + case "timestamp" => DataType.TIMESTAMP + case "array" => DataType.ARRAY + case "struct" => DataType.STRUCT + case _ => convertToCarbonTypeForSpark2(dataType) + } + } + + def convertToCarbonTypeForSpark2(dataType: String): DataType = { + dataType.toLowerCase match { + case "stringtype" => DataType.STRING + case "inttype" => DataType.INT + case "integertype" => DataType.INT + case "tinyinttype" => DataType.SHORT + case "shorttype" => DataType.SHORT + case "longtype" => DataType.LONG + case "biginttype" => DataType.LONG + case "numerictype" => DataType.DOUBLE + case "doubletype" => DataType.DOUBLE + case "decimaltype" => DataType.DECIMAL + case "timestamptype" => DataType.TIMESTAMP + case "arraytype" => DataType.ARRAY + case "structtype" => DataType.STRUCT + case _ => sys.error(s"Unsupported data type: $dataType") + } + } + + def convertToString(dataType: DataType): String = { + dataType match { + case DataType.STRING => "string" + case DataType.SHORT => "smallint" + case DataType.INT => "int" + case DataType.LONG => "bigint" + case DataType.DOUBLE => "double" + case DataType.DECIMAL => "decimal" + case DataType.TIMESTAMP => "timestamp" + case DataType.ARRAY => "array" + case DataType.STRUCT => "struct" + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala new file mode 100644 index 0000000..e650bfe --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -0,0 +1,843 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.util + +import java.io.{FileNotFoundException, IOException} +import java.nio.charset.Charset +import java.util.regex.Pattern + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.language.implicitConversions +import scala.util.control.Breaks.{break, breakable} + +import org.apache.commons.lang3.{ArrayUtils, StringUtils} +import org.apache.spark.Accumulator +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.util.FileUtils + +import org.apache.carbondata.common.factory.CarbonCommonFactory +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.dictionary.Dictionary +import org.apache.carbondata.core.carbon.metadata.datatype.DataType +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension +import org.apache.carbondata.core.carbon.path.CarbonStorePath +import org.apache.carbondata.core.carbon.CarbonTableIdentifier +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastorage.store.impl.FileFactory +import org.apache.carbondata.core.reader.CarbonDictionaryReader +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.writer.CarbonDictionaryWriter +import org.apache.carbondata.processing.etl.DataLoadingException +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.spark.CarbonSparkFactory +import org.apache.carbondata.spark.load.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd._ + +/** + * A object which provide a method to generate global dictionary from CSV files. + */ +object GlobalDictionaryUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * The default separator to use if none is supplied to the constructor. + */ + val DEFAULT_SEPARATOR: Char = ',' + /** + * The default quote character to use if none is supplied to the + * constructor. + */ + val DEFAULT_QUOTE_CHARACTER: Char = '"' + + /** + * find columns which need to generate global dictionary. + * + * @param dimensions dimension list of schema + * @param headers column headers + * @param columns column list of csv file + */ + def pruneDimensions(dimensions: Array[CarbonDimension], + headers: Array[String], + columns: Array[String]): (Array[CarbonDimension], Array[String]) = { + val dimensionBuffer = new ArrayBuffer[CarbonDimension] + val columnNameBuffer = new ArrayBuffer[String] + val dimensionsWithDict = dimensions.filter(hasEncoding(_, Encoding.DICTIONARY, + Encoding.DIRECT_DICTIONARY)) + dimensionsWithDict.foreach { dim => + breakable { + headers.zipWithIndex.foreach { h => + if (dim.getColName.equalsIgnoreCase(h._1)) { + dimensionBuffer += dim + columnNameBuffer += columns(h._2) + break + } + } + } + } + (dimensionBuffer.toArray, columnNameBuffer.toArray) + } + + /** + * use this method to judge whether CarbonDimension use some encoding or not + * + * @param dimension carbonDimension + * @param encoding the coding way of dimension + * @param excludeEncoding the coding way to exclude + */ + def hasEncoding(dimension: CarbonDimension, + encoding: Encoding, + excludeEncoding: Encoding): Boolean = { + if (dimension.isComplex()) { + val children = dimension.getListOfChildDimensions + children.asScala.exists(hasEncoding(_, encoding, excludeEncoding)) + } else { + dimension.hasEncoding(encoding) && + (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding)) + } + } + + def gatherDimensionByEncoding(carbonLoadModel: CarbonLoadModel, + dimension: CarbonDimension, + encoding: Encoding, + excludeEncoding: Encoding, + dimensionsWithEncoding: ArrayBuffer[CarbonDimension], + forPreDefDict: Boolean) { + if (dimension.isComplex) { + val children = dimension.getListOfChildDimensions.asScala + children.foreach { c => + gatherDimensionByEncoding(carbonLoadModel, c, encoding, excludeEncoding, + dimensionsWithEncoding, forPreDefDict) + } + } else { + if (dimension.hasEncoding(encoding) && + (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))) { + if ((forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) != null) || + (!forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) == null)) { + dimensionsWithEncoding += dimension + } + } + } + } + + def getPrimDimensionWithDict(carbonLoadModel: CarbonLoadModel, + dimension: CarbonDimension, + forPreDefDict: Boolean): Array[CarbonDimension] = { + val dimensionsWithDict = new ArrayBuffer[CarbonDimension] + gatherDimensionByEncoding(carbonLoadModel, dimension, Encoding.DICTIONARY, + Encoding.DIRECT_DICTIONARY, + dimensionsWithDict, forPreDefDict) + dimensionsWithDict.toArray + } + + /** + * invoke CarbonDictionaryWriter to write dictionary to file. + * + * @param model instance of DictionaryLoadModel + * @param columnIndex the index of current column in column list + * @param iter distinct value list of dictionary + */ + def writeGlobalDictionaryToFile(model: DictionaryLoadModel, + columnIndex: Int, + iter: Iterator[String]): Unit = { + val dictService = CarbonCommonFactory.getDictionaryService + val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter( + model.table, + model.columnIdentifier(columnIndex), + model.hdfsLocation + ) + try { + while (iter.hasNext) { + writer.write(iter.next) + } + } finally { + writer.close() + } + } + + /** + * read global dictionary from cache + */ + def readGlobalDictionaryFromCache(model: DictionaryLoadModel): HashMap[String, Dictionary] = { + val dictMap = new HashMap[String, Dictionary] + model.primDimensions.zipWithIndex.filter(f => model.dictFileExists(f._2)).foreach { m => + val dict = CarbonLoaderUtil.getDictionary(model.table, + m._1.getColumnIdentifier, model.hdfsLocation, + m._1.getDataType + ) + dictMap.put(m._1.getColumnId, dict) + } + dictMap + } + + /** + * invoke CarbonDictionaryReader to read dictionary from files. + * + * @param model carbon dictionary load model + */ + def readGlobalDictionaryFromFile(model: DictionaryLoadModel): HashMap[String, HashSet[String]] = { + val dictMap = new HashMap[String, HashSet[String]] + val dictService = CarbonCommonFactory.getDictionaryService + for (i <- model.primDimensions.indices) { + val set = new HashSet[String] + if (model.dictFileExists(i)) { + val reader: CarbonDictionaryReader = dictService.getDictionaryReader(model.table, + model.columnIdentifier(i), model.hdfsLocation + ) + val values = reader.read + if (values != null) { + for (j <- 0 until values.size) { + set.add(new String(values.get(j), + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))) + } + } + } + dictMap.put(model.primDimensions(i).getColumnId, set) + } + dictMap + } + + def generateParserForChildrenDimension(dim: CarbonDimension, + format: DataFormat, + mapColumnValuesWithId: + HashMap[String, HashSet[String]], + generic: GenericParser): Unit = { + val children = dim.getListOfChildDimensions.asScala + for (i <- children.indices) { + generateParserForDimension(Some(children(i)), format.cloneAndIncreaseIndex, + mapColumnValuesWithId) match { + case Some(childDim) => + generic.addChild(childDim) + case None => + } + } + } + + def generateParserForDimension(dimension: Option[CarbonDimension], + format: DataFormat, + mapColumnValuesWithId: HashMap[String, HashSet[String]]): Option[GenericParser] = { + dimension match { + case None => + None + case Some(dim) => + dim.getDataType match { + case DataType.ARRAY => + val arrDim = ArrayParser(dim, format) + generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, arrDim) + Some(arrDim) + case DataType.STRUCT => + val stuDim = StructParser(dim, format) + generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, stuDim) + Some(stuDim) + case _ => + Some(PrimitiveParser(dim, mapColumnValuesWithId.get(dim.getColumnId))) + } + } + } + + def createDataFormat(delimiters: Array[String]): DataFormat = { + if (ArrayUtils.isNotEmpty(delimiters)) { + val patterns = delimiters.map { d => + Pattern.compile(if (d == null) { + "" + } else { + d + }) + } + DataFormat(delimiters, 0, patterns) + } else { + null + } + } + + def isHighCardinalityColumn(columnCardinality: Int, + rowCount: Long, + model: DictionaryLoadModel): Boolean = { + (columnCardinality > model.highCardThreshold) && + (rowCount > 0) && + (columnCardinality.toDouble / rowCount * 100 > model.rowCountPercentage) + } + + /** + * create a instance of DictionaryLoadModel + * + * @param carbonLoadModel carbon load model + * @param table CarbonTableIdentifier + * @param dimensions column list + * @param hdfsLocation store location in HDFS + * @param dictfolderPath path of dictionary folder + */ + def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel, + table: CarbonTableIdentifier, + dimensions: Array[CarbonDimension], + hdfsLocation: String, + dictfolderPath: String, + forPreDefDict: Boolean): DictionaryLoadModel = { + val primDimensionsBuffer = new ArrayBuffer[CarbonDimension] + val isComplexes = new ArrayBuffer[Boolean] + for (i <- dimensions.indices) { + val dims = getPrimDimensionWithDict(carbonLoadModel, dimensions(i), forPreDefDict) + for (j <- dims.indices) { + primDimensionsBuffer += dims(j) + isComplexes += dimensions(i).isComplex + } + } + val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table) + val primDimensions = primDimensionsBuffer.map { x => x }.toArray + val dictDetail = CarbonSparkFactory.getDictionaryDetailService(). + getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation) + val dictFilePaths = dictDetail.dictFilePaths + val dictFileExists = dictDetail.dictFileExists + val columnIdentifier = dictDetail.columnIdentifiers + val hdfsTempLocation = CarbonProperties.getInstance. + getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, System.getProperty("java.io.tmpdir")) + val lockType = CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS) + val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL) + // load high cardinality identify configure + val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE, + CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT).toBoolean + val highCardThreshold = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD, + CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT).toInt + val rowCountPercentage = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE, + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble + + val serializationNullFormat = + carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) + // get load count + if (null == carbonLoadModel.getLoadMetadataDetails) { + CommonUtil.readLoadMetadataDetails(carbonLoadModel, hdfsLocation) + } + new DictionaryLoadModel(table, + dimensions, + hdfsLocation, + dictfolderPath, + dictFilePaths, + dictFileExists, + isComplexes.toArray, + primDimensions, + carbonLoadModel.getDelimiters, + highCardIdentifyEnable, + highCardThreshold, + rowCountPercentage, + columnIdentifier, + carbonLoadModel.getLoadMetadataDetails.size() == 0, + hdfsTempLocation, + lockType, + zookeeperUrl, + serializationNullFormat) + } + + /** + * load CSV files to DataFrame by using datasource "com.databricks.spark.csv" + * + * @param sqlContext SQLContext + * @param carbonLoadModel carbon data load model + */ + def loadDataFrame(sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel): DataFrame = { + val df = sqlContext.read + .format("com.databricks.spark.csv.newapi") + .option("header", { + if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) { + "true" + } else { + "false" + } + }) + .option("delimiter", { + if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) { + "" + DEFAULT_SEPARATOR + } else { + carbonLoadModel.getCsvDelimiter + } + }) + .option("parserLib", "univocity") + .option("escape", carbonLoadModel.getEscapeChar) + .option("ignoreLeadingWhiteSpace", "false") + .option("ignoreTrailingWhiteSpace", "false") + .option("codec", "gzip") + .option("quote", { + if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) { + "" + DEFAULT_QUOTE_CHARACTER + } else { + carbonLoadModel.getQuoteChar + } + }) + .option("comment", carbonLoadModel.getCommentChar) + .load(carbonLoadModel.getFactFilePath) + df + } + + // Hack for spark2 integration + var updateTableMetadataFunc: (CarbonLoadModel, SQLContext, DictionaryLoadModel, + Array[CarbonDimension]) => Unit = _ + + /** + * check whether global dictionary have been generated successfully or not + * + * @param status checking whether the generating is successful + */ + private def checkStatus(carbonLoadModel: CarbonLoadModel, + sqlContext: SQLContext, + model: DictionaryLoadModel, + status: Array[(Int, String, Boolean)]) = { + var result = false + val noDictionaryColumns = new ArrayBuffer[CarbonDimension] + val tableName = model.table.getTableName + status.foreach { x => + val columnName = model.primDimensions(x._1).getColName + if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(x._2)) { + result = true + LOGGER.error(s"table:$tableName column:$columnName generate global dictionary file failed") + } + if (x._3) { + noDictionaryColumns += model.primDimensions(x._1) + } + } + if (noDictionaryColumns.nonEmpty) { + updateTableMetadataFunc(carbonLoadModel, sqlContext, model, noDictionaryColumns.toArray) + } + if (result) { + LOGGER.error("generate global dictionary files failed") + throw new Exception("Failed to generate global dictionary files") + } else { + LOGGER.info("generate global dictionary successfully") + } + } + + /** + * get external columns and whose dictionary file path + * + * @param colDictFilePath external column dict file path + * @param table table identifier + * @param dimensions dimension columns + */ + private def setPredefinedColumnDictPath(carbonLoadModel: CarbonLoadModel, + colDictFilePath: String, + table: CarbonTableIdentifier, + dimensions: Array[CarbonDimension]) = { + val colFileMapArray = colDictFilePath.split(",") + for (colPathMap <- colFileMapArray) { + val colPathMapTrim = colPathMap.trim + val colNameWithPath = colPathMapTrim.split(":") + if (colNameWithPath.length == 1) { + LOGGER.error("the format of external column dictionary should be " + + "columnName:columnPath, please check") + throw new DataLoadingException("the format of predefined column dictionary" + + " should be columnName:columnPath, please check") + } + setPredefineDict(carbonLoadModel, dimensions, table, colNameWithPath(0), + FileUtils.getPaths(colPathMapTrim.substring(colNameWithPath(0).length + 1))) + } + } + + /** + * set pre defined dictionary for dimension + * + * @param dimensions all the dimensions + * @param table carbon table identifier + * @param colName user specified column name for predefined dict + * @param colDictPath column dictionary file path + * @param parentDimName parent dimenion for complex type + */ + def setPredefineDict(carbonLoadModel: CarbonLoadModel, + dimensions: Array[CarbonDimension], + table: CarbonTableIdentifier, + colName: String, + colDictPath: String, + parentDimName: String = "") { + val middleDimName = colName.split("\\.")(0) + val dimParent = parentDimName + { + colName match { + case "" => colName + case _ => + if (parentDimName.isEmpty) middleDimName else "." + middleDimName + } + } + // judge whether the column is exists + val preDictDimensionOption = dimensions.filter( + _.getColName.equalsIgnoreCase(dimParent)) + if (preDictDimensionOption.length == 0) { + LOGGER.error(s"Column $dimParent is not a key column " + + s"in ${ table.getDatabaseName }.${ table.getTableName }") + throw new DataLoadingException(s"Column $dimParent is not a key column. " + + s"Only key column can be part of dictionary " + + s"and used in COLUMNDICT option.") + } + val preDictDimension = preDictDimensionOption(0) + if (preDictDimension.isComplex) { + val children = preDictDimension.getListOfChildDimensions.asScala.toArray + // for Array, user set ArrayFiled: path, while ArrayField has a child Array.val + val currentColName = { + preDictDimension.getDataType match { + case DataType.ARRAY => + if (children(0).isComplex) { + "val." + colName.substring(middleDimName.length + 1) + } else { + "val" + } + case _ => colName.substring(middleDimName.length + 1) + } + } + setPredefineDict(carbonLoadModel, children, table, currentColName, + colDictPath, dimParent) + } else { + carbonLoadModel.setPredefDictMap(preDictDimension, colDictPath) + } + } + + /** + * use external dimension column to generate global dictionary + * + * @param colDictFilePath external column dict file path + * @param table table identifier + * @param dimensions dimension column + * @param carbonLoadModel carbon load model + * @param sqlContext spark sql context + * @param hdfsLocation store location on hdfs + * @param dictFolderPath generated global dict file path + */ + private def generatePredefinedColDictionary(colDictFilePath: String, + table: CarbonTableIdentifier, + dimensions: Array[CarbonDimension], + carbonLoadModel: CarbonLoadModel, + sqlContext: SQLContext, + hdfsLocation: String, + dictFolderPath: String) = { + // set pre defined dictionary column + setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions) + val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions, + hdfsLocation, dictFolderPath, forPreDefDict = true) + // new RDD to achieve distributed column dict generation + val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel, + sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath) + .partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length)) + val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect() + // check result status + checkStatus(carbonLoadModel, sqlContext, dictLoadModel, statusList) + } + + /* generate Dimension Parsers + * + * @param model + * @param distinctValuesList + * @return dimensionParsers + */ + def createDimensionParsers(model: DictionaryLoadModel, + distinctValuesList: ArrayBuffer[(Int, HashSet[String])]): Array[GenericParser] = { + // local combine set + val dimNum = model.dimensions.length + val primDimNum = model.primDimensions.length + val columnValues = new Array[HashSet[String]](primDimNum) + val mapColumnValuesWithId = new HashMap[String, HashSet[String]] + for (i <- 0 until primDimNum) { + columnValues(i) = new HashSet[String] + distinctValuesList += ((i, columnValues(i))) + mapColumnValuesWithId.put(model.primDimensions(i).getColumnId, columnValues(i)) + } + val dimensionParsers = new Array[GenericParser](dimNum) + for (j <- 0 until dimNum) { + dimensionParsers(j) = GlobalDictionaryUtil.generateParserForDimension( + Some(model.dimensions(j)), + GlobalDictionaryUtil.createDataFormat(model.delimiters), + mapColumnValuesWithId).get + } + dimensionParsers + } + + /** + * parse records in dictionary file and validate record + * + * @param x + * @param accum + * @param csvFileColumns + */ + private def parseRecord(x: String, accum: Accumulator[Int], + csvFileColumns: Array[String]): (String, String) = { + val tokens = x.split("" + DEFAULT_SEPARATOR) + var columnName: String = "" + var value: String = "" + // such as "," , "", throw ex + if (tokens.isEmpty) { + LOGGER.error("Read a bad dictionary record: " + x) + accum += 1 + } else if (tokens.size == 1) { + // such as "1", "jone", throw ex + if (!x.contains(",")) { + accum += 1 + } else { + try { + columnName = csvFileColumns(tokens(0).toInt) + } catch { + case ex: Exception => + LOGGER.error("Read a bad dictionary record: " + x) + accum += 1 + } + } + } else { + try { + columnName = csvFileColumns(tokens(0).toInt) + value = tokens(1) + } catch { + case ex: Exception => + LOGGER.error("Read a bad dictionary record: " + x) + accum += 1 + } + } + (columnName, value) + } + + /** + * read local dictionary and prune column + * + * @param sqlContext + * @param csvFileColumns + * @param requireColumns + * @param allDictionaryPath + * @return allDictionaryRdd + */ + private def readAllDictionaryFiles(sqlContext: SQLContext, + csvFileColumns: Array[String], + requireColumns: Array[String], + allDictionaryPath: String, + accumulator: Accumulator[Int]) = { + var allDictionaryRdd: RDD[(String, Iterable[String])] = null + try { + // read local dictionary file, and spilt (columnIndex, columnValue) + val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath) + .map(x => parseRecord(x, accumulator, csvFileColumns)).persist() + + // group by column index, and filter required columns + val requireColumnsList = requireColumns.toList + allDictionaryRdd = basicRdd + .groupByKey() + .filter(x => requireColumnsList.contains(x._1)) + } catch { + case ex: Exception => + LOGGER.error("Read dictionary files failed. Caused by: " + ex.getMessage) + throw ex + } + allDictionaryRdd + } + + /** + * validate local dictionary files + * + * @param allDictionaryPath + * @return (isNonempty, isDirectory) + */ + private def validateAllDictionaryPath(allDictionaryPath: String): Boolean = { + val fileType = FileFactory.getFileType(allDictionaryPath) + val filePath = FileFactory.getCarbonFile(allDictionaryPath, fileType) + // filepath regex, look like "/path/*.dictionary" + if (filePath.getName.startsWith("*")) { + val dictExt = filePath.getName.substring(1) + if (filePath.getParentFile.exists()) { + val listFiles = filePath.getParentFile.listFiles() + if (listFiles.exists(file => + file.getName.endsWith(dictExt) && file.getSize > 0)) { + true + } else { + LOGGER.warn("No dictionary files found or empty dictionary files! " + + "Won't generate new dictionary.") + false + } + } else { + throw new FileNotFoundException( + "The given dictionary file path is not found!") + } + } else { + if (filePath.exists()) { + if (filePath.getSize > 0) { + true + } else { + LOGGER.warn("No dictionary files found or empty dictionary files! " + + "Won't generate new dictionary.") + false + } + } else { + throw new FileNotFoundException( + "The given dictionary file path is not found!") + } + } + } + + /** + * get file headers from fact file + * + * @param carbonLoadModel + * @return headers + */ + private def getHeaderFormFactFile(carbonLoadModel: CarbonLoadModel): Array[String] = { + var headers: Array[String] = null + val factFile: String = carbonLoadModel.getFactFilePath.split(",")(0) + val readLine = CarbonUtil.readHeader(factFile) + + if (null != readLine) { + val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) { + "" + DEFAULT_SEPARATOR + } else { + carbonLoadModel.getCsvDelimiter + } + headers = readLine.toLowerCase().split(delimiter) + } else { + LOGGER.error("Not found file header! Please set fileheader") + throw new IOException("Failed to get file header") + } + headers + } + + /** + * generate global dictionary with SQLContext and CarbonLoadModel + * + * @param sqlContext sql context + * @param carbonLoadModel carbon load model + */ + def generateGlobalDictionary(sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + storePath: String, + dataFrame: Option[DataFrame] = None): Unit = { + try { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier + // create dictionary folder if not exists + val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val dictfolderPath = carbonTablePath.getMetadataDirectoryPath + // columns which need to generate global dictionary file + val dimensions = carbonTable.getDimensionByTableName( + carbonTable.getFactTableName).asScala.toArray + // generate global dict from pre defined column dict file + carbonLoadModel.initPredefDictMap() + + val allDictionaryPath = carbonLoadModel.getAllDictPath + if (StringUtils.isEmpty(allDictionaryPath)) { + LOGGER.info("Generate global dictionary from source data files!") + // load data by using dataSource com.databricks.spark.csv + var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel)) + var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) { + df.columns + } else { + carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR) + } + headers = headers.map(headerName => headerName.trim) + val colDictFilePath = carbonLoadModel.getColDictFilePath + if (colDictFilePath != null) { + // generate predefined dictionary + generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier, + dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath) + } + if (headers.length > df.columns.length) { + val msg = "The number of columns in the file header do not match the " + + "number of columns in the data file; Either delimiter " + + "or fileheader provided is not correct" + LOGGER.error(msg) + throw new DataLoadingException(msg) + } + // use fact file to generate global dict + val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, + headers, df.columns) + if (requireDimension.nonEmpty) { + // select column to push down pruning + df = df.select(requireColumnNames.head, requireColumnNames.tail: _*) + val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier, + requireDimension, storePath, dictfolderPath, false) + // combine distinct value in a block and partition by column + val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model) + .partitionBy(new ColumnPartitioner(model.primDimensions.length)) + // generate global dictionary files + val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect() + // check result status + checkStatus(carbonLoadModel, sqlContext, model, statusList) + } else { + LOGGER.info("No column found for generating global dictionary in source data files") + } + // generate global dict from dimension file + if (carbonLoadModel.getDimFolderPath != null) { + val fileMapArray = carbonLoadModel.getDimFolderPath.split(",") + for (fileMap <- fileMapArray) { + val dimTableName = fileMap.split(":")(0) + var dimDataframe = loadDataFrame(sqlContext, carbonLoadModel) + val (requireDimensionForDim, requireColumnNamesForDim) = + pruneDimensions(dimensions, dimDataframe.columns, dimDataframe.columns) + if (requireDimensionForDim.length >= 1) { + dimDataframe = dimDataframe.select(requireColumnNamesForDim.head, + requireColumnNamesForDim.tail: _*) + val modelforDim = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier, + requireDimensionForDim, storePath, dictfolderPath, false) + val inputRDDforDim = new CarbonBlockDistinctValuesCombineRDD( + dimDataframe.rdd, modelforDim) + .partitionBy(new ColumnPartitioner(modelforDim.primDimensions.length)) + val statusListforDim = new CarbonGlobalDictionaryGenerateRDD( + inputRDDforDim, modelforDim).collect() + checkStatus(carbonLoadModel, sqlContext, modelforDim, statusListforDim) + } else { + LOGGER.info(s"No columns in dimension table $dimTableName " + + "to generate global dictionary") + } + } + } + } else { + LOGGER.info("Generate global dictionary from dictionary files!") + val isNonempty = validateAllDictionaryPath(allDictionaryPath) + if (isNonempty) { + var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) { + getHeaderFormFactFile(carbonLoadModel) + } else { + carbonLoadModel.getCsvHeader.toLowerCase.split("" + DEFAULT_SEPARATOR) + } + headers = headers.map(headerName => headerName.trim) + // prune columns according to the CSV file header, dimension columns + val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers) + if (requireDimension.nonEmpty) { + val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier, + requireDimension, storePath, dictfolderPath, false) + // check if dictionary files contains bad record + val accumulator = sqlContext.sparkContext.accumulator(0) + // read local dictionary file, and group by key + val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers, + requireColumnNames, allDictionaryPath, accumulator) + // read exist dictionary and combine + val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model) + .partitionBy(new ColumnPartitioner(model.primDimensions.length)) + // generate global dictionary files + val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect() + // check result status + checkStatus(carbonLoadModel, sqlContext, model, statusList) + // if the dictionary contains wrong format record, throw ex + if (accumulator.value > 0) { + throw new DataLoadingException("Data Loading failure, dictionary values are " + + "not in correct format!") + } + } else { + LOGGER.info("have no column need to generate global dictionary") + } + } + } + } catch { + case ex: Exception => + LOGGER.error(ex, "generate global dictionary failed") + throw ex + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala new file mode 100644 index 0000000..79c0cc8 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException + +/** + * Implicit functions for [TableIdentifier] + */ +object CarbonTableIdentifierImplicit { + def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) + + implicit def toTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = { + tableIdentifier match { + case Seq(dbName, tableName) => TableIdentifier(tableName, Some(dbName)) + case Seq(tableName) => TableIdentifier(tableName, None) + case _ => throw new IllegalArgumentException("invalid table identifier: " + tableIdentifier) + } + } + + implicit def toSequence(tableIdentifier: TableIdentifier): Seq[String] = { + tableIdentifier.database match { + case Some(dbName) => Seq(dbName, tableIdentifier.table) + case _ => Seq(tableIdentifier.table) + } + } +}