[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user sounakr closed the pull request at: https://github.com/apache/carbondata/pull/2055 ---
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174694130 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java --- @@ -2068,6 +2079,202 @@ private static void updateDecimalType(TableInfo tableInfo) { return tableInfo; } + public static ColumnSchema thriftColumnSchmeaToWrapperColumnSchema( + org.apache.carbondata.format.ColumnSchema externalColumnSchema) { +ColumnSchema wrapperColumnSchema = new ColumnSchema(); + wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id()); + wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name()); +wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar()); +DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type); +if (DataTypes.isDecimal(dataType)) { + DecimalType decimalType = (DecimalType) dataType; + decimalType.setPrecision(externalColumnSchema.getPrecision()); + decimalType.setScale(externalColumnSchema.getScale()); +} +wrapperColumnSchema.setDataType(dataType); + wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension()); +List encoders = new ArrayList(); +for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) { + encoders.add(fromExternalToWrapperEncoding(encoder)); +} +wrapperColumnSchema.setEncodingList(encoders); + wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child()); +wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision()); + wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id()); +wrapperColumnSchema.setScale(externalColumnSchema.getScale()); + wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value()); + wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal()); +Mapproperties = externalColumnSchema.getColumnProperties(); +if (properties != null) { + if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) { +wrapperColumnSchema.setSortColumn(true); + } +} + wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function()); +List parentColumnTableRelation = +externalColumnSchema.getParentColumnTableRelations(); +if (null != parentColumnTableRelation) { + wrapperColumnSchema.setParentColumnTableRelations( + fromThriftToWrapperParentTableColumnRelations(parentColumnTableRelation)); +} +return wrapperColumnSchema; + } + + static List fromThriftToWrapperParentTableColumnRelations( + List thirftParentColumnRelation) { +List parentColumnTableRelationList = new ArrayList<>(); +for (org.apache.carbondata.format.ParentColumnTableRelation carbonTableRelation : +thirftParentColumnRelation) { + RelationIdentifier relationIdentifier = + new RelationIdentifier(carbonTableRelation.getRelationIdentifier().getDatabaseName(), + carbonTableRelation.getRelationIdentifier().getTableName(), + carbonTableRelation.getRelationIdentifier().getTableId()); + ParentColumnTableRelation parentColumnTableRelation = + new ParentColumnTableRelation(relationIdentifier, carbonTableRelation.getColumnId(), + carbonTableRelation.getColumnName()); + parentColumnTableRelationList.add(parentColumnTableRelation); +} +return parentColumnTableRelationList; + } + + static Encoding fromExternalToWrapperEncoding( + org.apache.carbondata.format.Encoding encoderThrift) { +switch (encoderThrift) { + case DICTIONARY: +return Encoding.DICTIONARY; + case DELTA: +return Encoding.DELTA; + case RLE: +return Encoding.RLE; + case INVERTED_INDEX: +return Encoding.INVERTED_INDEX; + case BIT_PACKED: +return Encoding.BIT_PACKED; + case DIRECT_DICTIONARY: +return Encoding.DIRECT_DICTIONARY; + default: +throw new IllegalArgumentException(encoderThrift.toString() + " is not supported"); +} + } + + static DataType thriftDataTyopeToWrapperDataType( + org.apache.carbondata.format.DataType dataTypeThrift) { +switch (dataTypeThrift) { + case BOOLEAN: +return DataTypes.BOOLEAN; + case STRING: +return DataTypes.STRING; + case SHORT: +return DataTypes.SHORT; + case INT: +return DataTypes.INT; + case LONG: +return DataTypes.LONG;
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174694029 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java --- @@ -826,6 +826,12 @@ public boolean isExternalTable() { return external != null && external.equalsIgnoreCase("true"); } + public boolean isFileLevelExternalTable() { --- End diff -- Do not call it external, just change to `isFileLevelFormat` is ok ---
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174693784 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -0,0 +1,682 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174693583 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextOutputWriter +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} + +import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment} +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion} +import org.apache.carbondata.core.reader.CarbonHeaderReader +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.scan.model.QueryModel +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +@InterfaceAudience.User +@InterfaceStability.Evolving +class SparkCarbonFileFormat extends FileFormat + with DataSourceRegister + with Logging + with Serializable { + + @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { +val filePaths = CarbonUtil.getFilePathExternalFilePath( + options.get("path").get) +if (filePaths.size() == 0) { + throw new SparkException("CarbonData file is not present in the location mentioned in DDL") +} +val carbonHeaderReader: CarbonHeaderReader = new CarbonHeaderReader(filePaths.get(0)) +val fileHeader = carbonHeaderReader.readHeader +val table_columns: java.util.List[org.apache.carbondata.format.ColumnSchema] = fileHeader + .getColumn_schema +var colArray = ArrayBuffer[StructField]() +for (i <- 0 to table_columns.size() - 1) { + val col = CarbonUtil.thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)) + colArray += (new StructField(col.getColumnName, +CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType), false)) +} +colArray.+:(Nil) + +Some(StructType(colArray)) + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], +
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174691560 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + +class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAndAfterAll { + + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath ++ +"../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") +.getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + val filePath = writerPath + "/Fact/Part0/Segment_null/" + + def buildTestData(persistSchema:Boolean) = { + +FileUtils.deleteDirectory(new File(writerPath)) + +val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + +try { + val builder = CarbonWriter.builder() + val writer = +if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() +} else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() +} + + var i = 0 + while (i < 100) { +writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) +i += 1 + } + writer.close() +} catch { + case ex: Exception => None + case _ => None +} + } + + def cleanTestData() = { +FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { +val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + +for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { +if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) +} + } else { +deleteIndexFile(eachDir.getPath, extension) + } +} + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { +buildTestData(false) +assert(new File(filePath).exists()) +sql("DROP TABLE IF EXISTS sdkOutputTable") + +//data source file format +if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174691411 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + +class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAndAfterAll { + + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath ++ +"../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") +.getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + val filePath = writerPath + "/Fact/Part0/Segment_null/" + + def buildTestData(persistSchema:Boolean) = { + +FileUtils.deleteDirectory(new File(writerPath)) + +val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + +try { + val builder = CarbonWriter.builder() + val writer = +if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() +} else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() +} + + var i = 0 + while (i < 100) { +writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) +i += 1 + } + writer.close() +} catch { + case ex: Exception => None + case _ => None +} + } + + def cleanTestData() = { +FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { +val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + +for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { +if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) +} + } else { +deleteIndexFile(eachDir.getPath, extension) + } +} + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { +buildTestData(false) +assert(new File(filePath).exists()) +sql("DROP TABLE IF EXISTS sdkOutputTable") + +//data source file format +if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174690840 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath ++ +"../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") +.getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + + def buildTestData(persistSchema:Boolean) = { + +FileUtils.deleteDirectory(new File(writerPath)) + +val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + +try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { +builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { +writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) +i += 1 + } + writer.close() +} catch { + case ex: Exception => None + case _ => None +} + } + + def cleanTestData() = { +FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { +val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + +for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { +if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) +} + } else { +deleteIndexFile(eachDir.getPath, extension) + } +} + } + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") +// create carbon table and insert data + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the Carbonfile ") { +buildTestData(false) +assert(new File(writerPath).exists()) +sql("DROP TABLE IF EXISTS sdkOutputTable") + +//new provider Carbonfile +sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + +sql("Describe formatted sdkOutputTable").show(false) + +
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174690144 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath ++ +"../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") +.getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + + def buildTestData(persistSchema:Boolean) = { + +FileUtils.deleteDirectory(new File(writerPath)) + +val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + +try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { +builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { +writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) +i += 1 + } + writer.close() +} catch { + case ex: Exception => None + case _ => None +} + } + + def cleanTestData() = { +FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { +val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + +for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { +if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) +} + } else { +deleteIndexFile(eachDir.getPath, extension) + } +} + } + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") +// create carbon table and insert data + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the Carbonfile ") { +buildTestData(false) +assert(new File(writerPath).exists()) +sql("DROP TABLE IF EXISTS sdkOutputTable") + +//new provider Carbonfile +sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + +sql("Describe formatted sdkOutputTable").show(false) + +
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174690041 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath ++ +"../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") +.getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + + def buildTestData(persistSchema:Boolean) = { + +FileUtils.deleteDirectory(new File(writerPath)) + +val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + +try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { +builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { +writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) +i += 1 + } + writer.close() +} catch { + case ex: Exception => None + case _ => None +} + } + + def cleanTestData() = { +FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { +val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + +for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { +if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) +} + } else { +deleteIndexFile(eachDir.getPath, extension) + } +} + } + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") +// create carbon table and insert data + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the Carbonfile ") { +buildTestData(false) +assert(new File(writerPath).exists()) +sql("DROP TABLE IF EXISTS sdkOutputTable") + +//new provider Carbonfile +sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + +sql("Describe formatted sdkOutputTable").show(false) + +
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174689415 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath ++ +"../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") +.getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + + def buildTestData(persistSchema:Boolean) = { + +FileUtils.deleteDirectory(new File(writerPath)) + +val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + +try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { +builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { +writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) +i += 1 + } + writer.close() +} catch { + case ex: Exception => None + case _ => None +} + } + + def cleanTestData() = { +FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { +val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + +for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { +if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) +} + } else { +deleteIndexFile(eachDir.getPath, extension) + } +} + } + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") +// create carbon table and insert data + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the Carbonfile ") { +buildTestData(false) +assert(new File(writerPath).exists()) +sql("DROP TABLE IF EXISTS sdkOutputTable") + +//new provider Carbonfile +sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + +sql("Describe formatted sdkOutputTable").show(false) --- End
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user ajantha-bhat commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174667420 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -0,0 +1,678 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader;
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174212571 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileLevelFormat.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextOutputWriter +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment} +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion} +import org.apache.carbondata.core.reader.CarbonHeaderReader +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.scan.model.QueryModel +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +class CarbonFileLevelFormat extends FileFormat --- End diff -- Please rename current `CarbonFileFormat` class to `CarbonTableLevelFormat` ---
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174212055 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileLevelFormat.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextOutputWriter +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment} +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion} +import org.apache.carbondata.core.reader.CarbonHeaderReader +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.scan.model.QueryModel +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +class CarbonFileLevelFormat extends FileFormat + with DataSourceRegister + with Logging + with Serializable { + + @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { +val filePaths = CarbonUtil.getFilePathExternalFilePath( + options.get("path").get) +if (filePaths.size() == 0) { + throw new SparkException("CarbonData file is not present in the location mentioned in DDL") +} +val carbonHeaderReader: CarbonHeaderReader = new CarbonHeaderReader(filePaths.get(0)) +val fileHeader = carbonHeaderReader.readHeader +val table_columns: java.util.List[org.apache.carbondata.format.ColumnSchema] = fileHeader + .getColumn_schema +var colArray = ArrayBuffer[StructField]() +for (i <- 0 to table_columns.size() - 1) { + val col = CarbonUtil.thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)) + colArray += (new StructField(col.getColumnName, +CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType), false)) +} +colArray.+:(Nil) + +Some(StructType(colArray)) + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + +new OutputWriterFactory { + override def newInstance( + path: String, +
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174211199 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingCarbonFileLevelFormat.scala --- @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.spark.sql.{AnalysisException, CarbonEnv} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException + +class TestCreateTableUsingCarbonFileLevelFormat extends QueryTest with BeforeAndAfterAll { --- End diff -- This suite is fine, but can you add one suite using SparkSession instead of CarbonSession? ---
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174208978 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java --- @@ -79,4 +79,19 @@ public static TableInfo getTableInfo(AbsoluteTableIdentifier identifier) carbonTableIdentifier.getTableName(), identifier.getTablePath()); } + + + public static TableInfo inferSchemaForExternalTable(AbsoluteTableIdentifier identifier) --- End diff -- rename to `inferSchema`, and can you pass the tablePath only ---
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174208671 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/mapred/TestMapReduceCarbonFileInputFormat.java --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mapred; + + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileFilter; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.api.CarbonFileInputFormat; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Assert; +import org.junit.Test; + +public class TestMapReduceCarbonFileInputFormat { + + private static final Log LOG = LogFactory.getLog(TestMapReduceCarbonFileInputFormat.class); + + private int countTheLines(String outPath) throws Exception { +File file = new File(outPath); +if (file.exists()) { + BufferedReader reader = new BufferedReader(new FileReader(file)); + int i = 0; + while (reader.readLine() != null) { +i++; + } + reader.close(); + return i; +} +return 0; + } + + private int countTheColumns(String outPath) throws Exception { +File file = new File(outPath); +if (file.exists()) { + BufferedReader reader = new BufferedReader(new FileReader(file)); + String[] split = reader.readLine().split(","); + reader.close(); + return split.length; +} +return 0; + } + + --- End diff -- remove empty lines ---
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174208602 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/mapred/TestMapReduceCarbonFileInputFormat.java --- @@ -0,0 +1,193 @@ +/* --- End diff -- There are some binary files in this PR, please remove them ---
[GitHub] carbondata pull request #2055: [CARBONDATA-2224][File Level Reader Support] ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2055#discussion_r174208296 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -0,0 +1,678 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import