Repository: incubator-carbondata Updated Branches: refs/heads/master 91068c56c -> 20186ca1e
change INPUT_DIR to tablePath instead of storePath and add example Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/fad5e1f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/fad5e1f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/fad5e1f8 Branch: refs/heads/master Commit: fad5e1f840f66ff1d618f7a89b963918130a0698 Parents: 91068c5 Author: jackylk <jacky.li...@huawei.com> Authored: Tue Oct 4 12:30:28 2016 +0800 Committer: jackylk <jacky.li...@huawei.com> Committed: Tue Oct 4 12:30:28 2016 +0800 ---------------------------------------------------------------------- .../core/carbon/AbsoluteTableIdentifier.java | 6 ++ .../core/carbon/path/CarbonStorePath.java | 6 ++ .../examples/AllDictionaryExample.scala | 8 +- .../carbondata/examples/CarbonExample.scala | 6 +- .../examples/ComplexTypeExample.scala | 6 +- .../examples/DataFrameAPIExample.scala | 31 ++------ .../carbondata/examples/DatasourceExample.scala | 25 ++----- .../examples/GenerateDictionaryExample.scala | 8 +- .../carbondata/examples/HadoopFileExample.scala | 40 ++++++++++ .../apache/carbondata/examples/PerfTest.scala | 6 +- .../carbondata/examples/util/ExampleUitls.scala | 79 ++++++++++++++++++++ .../examples/util/InitForExamples.scala | 55 -------------- .../carbondata/hadoop/CarbonInputFormat.java | 69 +++-------------- .../impl/DictionaryDecodedReadSupportImpl.java | 1 + .../carbondata/hadoop/util/SchemaReader.java | 6 +- .../hadoop/ft/CarbonInputFormat_FT.java | 8 +- .../hadoop/ft/CarbonInputMapperTest.java | 3 +- .../carbondata/spark/util/QueryPlanUtil.scala | 7 +- .../sql/CarbonDatasourceHadoopRelation.scala | 12 +-- .../execution/command/carbonTableSchema.scala | 3 +- 20 files changed, 186 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java index 0e1481d..c935e79 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java @@ -18,6 +18,7 @@ */ package org.apache.carbondata.core.carbon; +import java.io.File; import java.io.Serializable; import org.apache.carbondata.core.datastorage.store.impl.FileFactory; @@ -78,6 +79,11 @@ public class AbsoluteTableIdentifier implements Serializable { return new AbsoluteTableIdentifier(storePath, identifier); } + public String getTablePath() { + return getStorePath() + File.separator + getCarbonTableIdentifier().getDatabaseName() + + File.separator + getCarbonTableIdentifier().getTableName(); + } + /** * to get the hash code */ http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java index 567602b..214c633 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java @@ -20,6 +20,7 @@ package org.apache.carbondata.core.carbon.path; import java.io.File; +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.hadoop.fs.Path; @@ -48,6 +49,11 @@ public class CarbonStorePath extends Path { return carbonTablePath; } + public static CarbonTablePath getCarbonTablePath(AbsoluteTableIdentifier identifier) { + CarbonTableIdentifier id = identifier.getCarbonTableIdentifier(); + return new CarbonTablePath(id, identifier.getTablePath()); + } + /** * gets CarbonTablePath object to manage table paths */ http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala index a2b72e3..dfba300 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala @@ -18,15 +18,15 @@ package org.apache.carbondata.examples import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.examples.util.{AllDictionaryUtil, InitForExamples} +import org.apache.carbondata.examples.util.{AllDictionaryUtil, ExampleUitls} object AllDictionaryExample { def main(args: Array[String]) { - val cc = InitForExamples.createCarbonContext("CarbonExample") - val testData = InitForExamples.currentPath + "/src/main/resources/data.csv" + val cc = ExampleUitls.createCarbonContext("CarbonExample") + val testData = ExampleUitls.currentPath + "/src/main/resources/data.csv" val csvHeader = "ID,date,country,name,phonetype,serialname,salary" val dictCol = "|date|country|name|phonetype|serialname|" - val allDictFile = InitForExamples.currentPath + "/src/main/resources/data.dictionary" + val allDictFile = ExampleUitls.currentPath + "/src/main/resources/data.dictionary" // extract all dictionary files from source data AllDictionaryUtil.extractDictionary(cc.sparkContext, testData, allDictFile, csvHeader, dictCol) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala index 52407a5..444bc0d 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala @@ -19,12 +19,12 @@ package org.apache.carbondata.examples import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.examples.util.InitForExamples +import org.apache.carbondata.examples.util.ExampleUitls object CarbonExample { def main(args: Array[String]) { - val cc = InitForExamples.createCarbonContext("CarbonExample") - val testData = InitForExamples.currentPath + "/src/main/resources/data.csv" + val cc = ExampleUitls.createCarbonContext("CarbonExample") + val testData = ExampleUitls.currentPath + "/src/main/resources/data.csv" // Specify timestamp format based on raw data CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/ComplexTypeExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/ComplexTypeExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/ComplexTypeExample.scala index fc7d83c..bbdae68 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/ComplexTypeExample.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/ComplexTypeExample.scala @@ -17,7 +17,7 @@ package org.apache.carbondata.examples -import org.apache.carbondata.examples.util.InitForExamples +import org.apache.carbondata.examples.util.ExampleUitls /** * Carbon supports the complex types ARRAY and STRUCT. @@ -26,8 +26,8 @@ import org.apache.carbondata.examples.util.InitForExamples object ComplexTypeExample { def main(args: Array[String]) { - val cc = InitForExamples.createCarbonContext("ComplexTypeExample") - val dataPath = InitForExamples.currentPath + "/src/main/resources/complexdata.csv" + val cc = ExampleUitls.createCarbonContext("ComplexTypeExample") + val dataPath = ExampleUitls.currentPath + "/src/main/resources/complexdata.csv" val tableName = "complexTypeTable" cc.sql(s"DROP TABLE IF EXISTS $tableName") http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala index 31750ac..f3fcadc 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala @@ -19,29 +19,14 @@ package org.apache.carbondata.examples import org.apache.spark.sql.SaveMode -import org.apache.carbondata.examples.util.InitForExamples +import org.apache.carbondata.examples.util.ExampleUitls // scalastyle:off println object DataFrameAPIExample { def main(args: Array[String]) { - val cc = InitForExamples.createCarbonContext("DataFrameAPIExample") - val sc = cc.sc - - import cc.implicits._ - - // create a dataframe, it can be from parquet or hive table - val df = sc.parallelize(1 to 1000) - .map(x => ("a", "b", x)) - .toDF("c1", "c2", "c3") - - // save dataframe to carbon file - df.write - .format("carbondata") - .option("tableName", "carbon1") - .option("compress", "true") - .mode(SaveMode.Overwrite) - .save() + val cc = ExampleUitls.createCarbonContext("DataFrameAPIExample") + ExampleUitls.writeSampleCarbonFile(cc, "carbon1") // use datasource api to read val in = cc.read @@ -49,19 +34,13 @@ object DataFrameAPIExample { .option("tableName", "carbon1") .load() + import cc.implicits._ val count = in.where($"c3" > 500).select($"*").count() - println(s"count using dataframe.read: $count") + println(s"count using dataframe: $count") // use SQL to read cc.sql("SELECT count(*) FROM carbon1 WHERE c3 > 500").show cc.sql("DROP TABLE IF EXISTS carbon1") - - // also support a implicit function for easier access - import org.apache.carbondata.spark._ - df.saveAsCarbonFile(Map("tableName" -> "carbon2", "compress" -> "true")) - - cc.sql("SELECT count(*) FROM carbon2 WHERE c3 > 100").show - cc.sql("DROP TABLE IF EXISTS carbon2") } } // scalastyle:on println http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala index f19dc13..112c274 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala @@ -19,36 +19,23 @@ package org.apache.carbondata.examples import org.apache.spark.sql.{SaveMode, SQLContext} -import org.apache.carbondata.examples.util.InitForExamples +import org.apache.carbondata.examples.util.ExampleUitls object DatasourceExample { def main(args: Array[String]) { // use CarbonContext to write CarbonData files - val cc = InitForExamples.createCarbonContext("DatasourceExample") - import cc.implicits._ - val sc = cc.sparkContext - // create a dataframe, it can be from parquet or hive table - val df = sc.parallelize(1 to 1000) - .map(x => ("a", "b", x)) - .toDF("c1", "c2", "c3") - - // save dataframe to CarbonData files - df.write - .format("carbondata") - .option("tableName", "table1") - .mode(SaveMode.Overwrite) - .save() + val cc = ExampleUitls.createCarbonContext("DatasourceExample") + ExampleUitls.writeSampleCarbonFile(cc, "table1") // Use SQLContext to read CarbonData files - val sqlContext = new SQLContext(sc) + val sqlContext = new SQLContext(cc.sparkContext) sqlContext.sql( - """ + s""" | CREATE TEMPORARY TABLE source | USING org.apache.spark.sql.CarbonSource - | OPTIONS (path './examples/target/store/default/table1') + | OPTIONS (path '${cc.storePath}/default/table1') """.stripMargin) sqlContext.sql("SELECT c1, c2, count(*) FROM source WHERE c3 > 100 GROUP BY c1, c2").show - } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala index 1d21601..20ebc8c 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala @@ -23,7 +23,7 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentif import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier} import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.carbon.path.CarbonStorePath -import org.apache.carbondata.examples.util.InitForExamples +import org.apache.carbondata.examples.util.ExampleUitls import org.apache.carbondata.spark.load.CarbonLoaderUtil /** @@ -34,9 +34,9 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil object GenerateDictionaryExample { def main(args: Array[String]) { - val cc = InitForExamples.createCarbonContext("GenerateDictionaryExample") - val factFilePath = InitForExamples.currentPath + "/src/main/resources/factSample.csv" - val carbonTablePath = CarbonStorePath.getCarbonTablePath(InitForExamples.storeLocation, + val cc = ExampleUitls.createCarbonContext("GenerateDictionaryExample") + val factFilePath = ExampleUitls.currentPath + "/src/main/resources/factSample.csv" + val carbonTablePath = CarbonStorePath.getCarbonTablePath(ExampleUitls.storeLocation, new CarbonTableIdentifier("default", "dictSample", "1")) val dictFolderPath = carbonTablePath.getMetadataDirectoryPath http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala new file mode 100644 index 0000000..2099906 --- /dev/null +++ b/examples/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import org.apache.carbondata.examples.util.ExampleUitls +import org.apache.carbondata.hadoop.CarbonInputFormat + +// scalastyle:off println +object HadoopFileExample { + + def main(args: Array[String]): Unit = { + val cc = ExampleUitls.createCarbonContext("DataFrameAPIExample") + ExampleUitls.writeSampleCarbonFile(cc, "carbon1") + + val sc = cc.sparkContext + val input = sc.newAPIHadoopFile(s"${cc.storePath}/default/carbon1", + classOf[CarbonInputFormat[Array[Object]]], + classOf[Void], + classOf[Array[Object]]) + val result = input.map(x => x._2.toList).collect + result.foreach(x => println(x.mkString(", "))) + } +} +// scalastyle:on println + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala b/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala index cfc5814..945a75e 100644 --- a/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala +++ b/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.{CarbonContext, DataFrame, Row, SaveMode, SQLContext import org.apache.spark.sql.types.{DataTypes, StructType} import org.apache.carbondata.examples.PerfTest._ -import org.apache.carbondata.examples.util.InitForExamples +import org.apache.carbondata.examples.util.ExampleUitls // scalastyle:off println @@ -268,7 +268,7 @@ object PerfTest { ) def main(args: Array[String]) { - val cc = InitForExamples.createCarbonContext("PerfTest") + val cc = ExampleUitls.createCarbonContext("PerfTest") // prepare performance queries var workload = Seq[Query]() @@ -318,7 +318,7 @@ object PerfTest { } def savePath(datasource: String): String = - s"${InitForExamples.currentPath}/target/perftest/${datasource}" + s"${ExampleUitls.currentPath}/target/perftest/${datasource}" def withTime(body: => Unit): Long = { val start = System.nanoTime() http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUitls.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUitls.scala b/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUitls.scala new file mode 100644 index 0000000..f2fe8d6 --- /dev/null +++ b/examples/src/main/scala/org/apache/carbondata/examples/util/ExampleUitls.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples.util + +import java.io.File + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{CarbonContext, SaveMode} + +import org.apache.carbondata.core.util.CarbonProperties + +// scalastyle:off println + +object ExampleUitls { + + def currentPath: String = new File(this.getClass.getResource("/").getPath + "/../../") + .getCanonicalPath + val storeLocation = currentPath + "/target/store" + val kettleHome = new File(currentPath + "/../processing/carbonplugins").getCanonicalPath + + def createCarbonContext(appName: String): CarbonContext = { + val sc = new SparkContext(new SparkConf() + .setAppName(appName) + .setMaster("local[2]")) + sc.setLogLevel("ERROR") + + println(s"Starting $appName using spark version ${sc.version}") + + val cc = new CarbonContext(sc, storeLocation, currentPath + "/target/carbonmetastore") + cc.setConf("carbon.kettle.home", kettleHome) + + // whether use table split partition + // true -> use table split partition, support multiple partition loading + // false -> use node split partition, support data load by host partition + CarbonProperties.getInstance().addProperty("carbon.table.split.partition.enable", "false") + cc + } + + /** + * This func will write a sample CarbonData file containing following schema: + * c1: String, c2: String, c3: Double + */ + def writeSampleCarbonFile(cc: CarbonContext, tableName: String): Unit = { + // use CarbonContext to write CarbonData files + import cc.implicits._ + val sc = cc.sparkContext + // create a dataframe, it can be from parquet or hive table + val df = sc.parallelize(1 to 1000, 2) + .map(x => ("a", "b", x)) + .toDF("c1", "c2", "c3") + + cc.sql(s"DROP TABLE IF EXISTS $tableName") + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", tableName) + .option("compress", "true") + .mode(SaveMode.Overwrite) + .save() + } +} +// scalastyle:on println + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala b/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala deleted file mode 100644 index 46d2bc1..0000000 --- a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples.util - -import java.io.File - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.CarbonContext - -import org.apache.carbondata.core.util.CarbonProperties - -// scalastyle:off println - -object InitForExamples { - - def currentPath: String = new File(this.getClass.getResource("/").getPath + "/../../") - .getCanonicalPath - val storeLocation = currentPath + "/target/store" - val kettleHome = new File(currentPath + "/../processing/carbonplugins").getCanonicalPath - - def createCarbonContext(appName: String): CarbonContext = { - val sc = new SparkContext(new SparkConf() - .setAppName(appName) - .setMaster("local[2]")) - sc.setLogLevel("ERROR") - - println(s"Starting $appName using spark version ${sc.version}") - - val cc = new CarbonContext(sc, storeLocation, currentPath + "/target/carbonmetastore") - cc.setConf("carbon.kettle.home", kettleHome) - - // whether use table split partition - // true -> use table split partition, support multiple partition loading - // false -> use node split partition, support data load by host partition - CarbonProperties.getInstance().addProperty("carbon.table.split.partition.enable", "false") - cc - } -} -// scalastyle:on println - http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 2e18629..13df439 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -33,7 +33,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.datastore.DataRefNode; import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder; import org.apache.carbondata.core.carbon.datastore.IndexKey; @@ -70,8 +69,6 @@ import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.scan.model.CarbonQueryPlan; import org.apache.carbondata.scan.model.QueryModel; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.INVALID_SEGMENT_ID; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -93,10 +90,6 @@ import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.util.StringUtils; - - - - /** * Carbon Input format class representing one carbon table */ @@ -106,35 +99,11 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { public static final String INPUT_SEGMENT_NUMBERS = "mapreduce.input.carboninputformat.segmentnumbers"; private static final Log LOG = LogFactory.getLog(CarbonInputFormat.class); - private static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databasename"; - private static final String TABLE_NAME = "mapreduce.input.carboninputformat.tablename"; private static final String FILTER_PREDICATE = "mapreduce.input.carboninputformat.filter.predicate"; private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table"; private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; - private static final String TABLE_ID = "mapreduce.input.carboninputformat.tableId"; - - public static void setTableToAccess(Configuration configuration, - CarbonTableIdentifier tableIdentifier) { - configuration.set(CarbonInputFormat.DATABASE_NAME, tableIdentifier.getDatabaseName()); - configuration.set(CarbonInputFormat.TABLE_NAME, tableIdentifier.getTableName()); - configuration.set(CarbonInputFormat.TABLE_ID, tableIdentifier.getTableId()); - } - - /** - * Get CarbonTableIdentifier from job configuration - */ - public static CarbonTableIdentifier getTableToAccess(Configuration configuration) { - String databaseName = configuration.get(CarbonInputFormat.DATABASE_NAME); - String tableName = configuration.get(CarbonInputFormat.TABLE_NAME); - String tableId = configuration.get(CarbonInputFormat.TABLE_ID); - if (databaseName != null && tableName != null) { - return new CarbonTableIdentifier(databaseName, tableName, tableId); - } - //TODO: better raise exception - return null; - } /** * It is optional, if user does not set then it reads from store @@ -154,11 +123,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { String carbonTableStr = configuration.get(CARBON_TABLE); if (carbonTableStr == null) { // read it from schema file in the store - String storePath = configuration.get(INPUT_DIR, ""); - AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier( - storePath, getTableToAccess(configuration)); - CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore( - getTablePath(configuration), identifier); + AbsoluteTableIdentifier absIdentifier = getAbsoluteTableIdentifier(configuration); + CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier); setCarbonTable(configuration, carbonTable); return carbonTable; } @@ -221,23 +187,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { } public static CarbonTablePath getTablePath(Configuration configuration) throws IOException { - - String storePathString = getStorePathString(configuration); - CarbonTableIdentifier tableIdentifier = CarbonInputFormat.getTableToAccess(configuration); - if (tableIdentifier == null) { - throw new IOException("Could not find " + DATABASE_NAME + "," + TABLE_NAME); - } - return CarbonStorePath.getCarbonTablePath(storePathString, tableIdentifier); - } - - private static String getStorePathString(Configuration configuration) throws IOException { - - String dirs = configuration.get(INPUT_DIR, ""); - String[] inputPaths = StringUtils.split(dirs); - if (inputPaths.length == 0) { - throw new IOException("No input paths specified in job"); - } - return CarbonInputFormatUtil.processPath(inputPaths[0]); + AbsoluteTableIdentifier absIdentifier = getAbsoluteTableIdentifier(configuration); + return CarbonStorePath.getCarbonTablePath(absIdentifier); } /** @@ -319,7 +270,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { for (InputSplit inputSplit : splits) { FileSplit fileSplit = (FileSplit) inputSplit; String segmentId = CarbonTablePath.DataPathUtil.getSegmentId(fileSplit.getPath().toString()); - if (INVALID_SEGMENT_ID == segmentId) { + if (segmentId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) { continue; } carbonSplits.add(CarbonInputSplit.from(segmentId, fileSplit)); @@ -435,10 +386,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { } } - private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) + private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) throws IOException { - return new AbsoluteTableIdentifier(getStorePathString(configuration), - getTableToAccess(configuration)); + String dirs = configuration.get(INPUT_DIR, ""); + String[] inputPaths = StringUtils.split(dirs); + if (inputPaths.length == 0) { + throw new IOException("No input paths specified in job"); + } + return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); } private Object getFilterPredicates(Configuration configuration) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java index beeaf4b..d81dd7f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java @@ -25,6 +25,7 @@ public class DictionaryDecodedReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Object[]> { @Override public Object[] readRow(Object[] data) { + assert(data.length == dictionaries.length); for (int i = 0; i < dictionaries.length; i++) { if (dictionaries[i] != null) { data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java index 5d7e125..78ec752 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.carbon.metadata.converter.SchemaConverter; import org.apache.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.carbon.path.CarbonStorePath; import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.datastorage.store.impl.FileFactory; import org.apache.carbondata.core.reader.ThriftReader; @@ -37,8 +38,9 @@ import org.apache.thrift.TBase; */ public class SchemaReader { - public static CarbonTable readCarbonTableFromStore(CarbonTablePath carbonTablePath, - AbsoluteTableIdentifier identifier) throws IOException { + public static CarbonTable readCarbonTableFromStore(AbsoluteTableIdentifier identifier) + throws IOException { + CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier); String schemaFilePath = carbonTablePath.getSchemaFilePath(); if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) || FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) || http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputFormat_FT.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputFormat_FT.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputFormat_FT.java index bfca3b1..aed7d79 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputFormat_FT.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputFormat_FT.java @@ -52,9 +52,7 @@ public class CarbonInputFormat_FT extends TestCase { CarbonInputFormat carbonInputFormat = new CarbonInputFormat(); JobConf jobConf = new JobConf(new Configuration()); Job job = new Job(jobConf); - CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("db", "table1", UUID.randomUUID().toString()); - FileInputFormat.addInputPath(job, new Path("/opt/carbonstore/")); - carbonInputFormat.setTableToAccess(job.getConfiguration(), tableIdentifier); + FileInputFormat.addInputPath(job, new Path("/opt/carbonstore/db/table1")); job.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, "1,2"); List splits = carbonInputFormat.getSplits(job); @@ -66,9 +64,7 @@ public class CarbonInputFormat_FT extends TestCase { CarbonInputFormat carbonInputFormat = new CarbonInputFormat(); JobConf jobConf = new JobConf(new Configuration()); Job job = new Job(jobConf); - CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("db", "table1", UUID.randomUUID().toString()); - FileInputFormat.addInputPath(job, new Path("/opt/carbonstore/")); - carbonInputFormat.setTableToAccess(job.getConfiguration(), tableIdentifier); + FileInputFormat.addInputPath(job, new Path("/opt/carbonstore/db/table1")); job.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, "1,2"); Expression expression = new EqualToExpression(new ColumnExpression("c1", DataType.STRING), new LiteralExpression("a", DataType.STRING)); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java index 435b9c1..1c21a50 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java @@ -170,14 +170,13 @@ public class CarbonInputMapperTest extends TestCase { job.setInputFormatClass(CarbonInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier(); - CarbonInputFormat.setTableToAccess(job.getConfiguration(), abs.getCarbonTableIdentifier()); if (projection != null) { CarbonInputFormat.setColumnProjection(projection, job.getConfiguration()); } if (filter != null) { CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter); } - FileInputFormat.addInputPath(job, new Path(abs.getStorePath())); + FileInputFormat.addInputPath(job, new Path(abs.getTablePath())); CarbonUtil.deleteFoldersAndFiles(new File(outPath + "1")); FileOutputFormat.setOutputPath(job, new Path(outPath + "1")); job.getConfiguration().set("outpath", outPath); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala index 4dfcd3f..c55c807 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala @@ -42,9 +42,7 @@ object QueryPlanUtil { val carbonInputFormat = new CarbonInputFormat[Array[Object]]() val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) - FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getStorePath)) - CarbonInputFormat.setTableToAccess(job.getConfiguration, - absoluteTableIdentifier.getCarbonTableIdentifier) + FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) (carbonInputFormat, job) } @@ -52,8 +50,7 @@ object QueryPlanUtil { conf: Configuration) : CarbonInputFormat[V] = { val carbonInputFormat = new CarbonInputFormat[V]() val job: Job = new Job(conf) - FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getStorePath)) - CarbonInputFormat.setTableToAccess(conf, absoluteTableIdentifier.getCarbonTableIdentifier) + FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) carbonInputFormat } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index bb83cee..2888cb3 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -63,12 +63,8 @@ private[sql] case class CarbonDatasourceHadoopRelation( lazy val job = new Job(new JobConf()) lazy val options = new CarbonOption(parameters) lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head) - lazy val identifier = absIdentifier.getCarbonTableIdentifier lazy val relationRaw: CarbonRelation = { - val carbonTable = SchemaReader.readCarbonTableFromStore( - CarbonStorePath.getCarbonTablePath(absIdentifier.getStorePath, identifier), - absIdentifier - ) + val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier) if (carbonTable == null) { sys.error(s"CarbonData file path ${paths.head} is not valid") } @@ -76,7 +72,7 @@ private[sql] case class CarbonDatasourceHadoopRelation( carbonTable.getDatabaseName, carbonTable.getFactTableName, CarbonSparkUtil.createSparkMeta(carbonTable), - TableMeta(identifier, + TableMeta(absIdentifier.getCarbonTableIdentifier, paths.head, carbonTable, Partitioner(options.partitionClass, @@ -154,7 +150,7 @@ class CarbonHadoopFSRDD[V: ClassTag]( val inputFormat = QueryPlanUtil.createCarbonInputFormat(identifier, hadoopAttemptContext.getConfiguration ) - hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getStorePath) + hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath) val reader = inputFormat.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value, hadoopAttemptContext @@ -195,7 +191,7 @@ class CarbonHadoopFSRDD[V: ClassTag]( val carbonInputFormat = QueryPlanUtil.createCarbonInputFormat(identifier, jobContext.getConfiguration ) - jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getStorePath) + jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath) val splits = carbonInputFormat.getSplits(jobContext).toArray val carbonInputSplits = splits .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit])) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fad5e1f8/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 11ec586..377f419 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -1212,8 +1212,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext) val identifier = TableIdentifier(tableName, Option(dbName)) - val tmpTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance - .getCarbonTable(dbName + "_" + tableName) + val tmpTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null == tmpTable) { if (!ifExistsSet) { LOGGER