[CARBONDATA-1997] Add CarbonWriter SDK API Added a new module called store-sdk, and added a CarbonWriter API, it can be used to write Carbondata files to a specified folder, without Spark and Hadoop dependency. User can use this API in any environment.
This closes #1967 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bf4973c6 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bf4973c6 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bf4973c6 Branch: refs/heads/carbonstore-rebase5 Commit: bf4973c62fee3e51749b3a1a2d939b05581bb069 Parents: 653c51b Author: Jacky Li <jacky.li...@qq.com> Authored: Sat Feb 10 19:44:23 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Mar 2 15:52:36 2018 +0800 ---------------------------------------------------------------------- .../org/apache/carbondata/common/Strings.java | 40 ++++ .../apache/carbondata/common/StringsSuite.java | 53 +++++ .../core/metadata/schema/table/CarbonTable.java | 7 + .../schema/table/CarbonTableBuilder.java | 72 +++++++ .../core/metadata/schema/table/TableSchema.java | 7 + .../schema/table/TableSchemaBuilder.java | 107 ++++++++++ .../schema/table/CarbonTableBuilderSuite.java | 86 ++++++++ .../metadata/schema/table/CarbonTableTest.java | 12 +- .../schema/table/TableSchemaBuilderSuite.java | 56 ++++++ .../carbondata/spark/util/DataLoadingUtil.scala | 45 +++++ pom.xml | 7 + store/sdk/pom.xml | 130 +++++++++++++ .../carbondata/sdk/file/CSVCarbonWriter.java | 89 +++++++++ .../carbondata/sdk/file/CarbonWriter.java | 51 +++++ .../sdk/file/CarbonWriterBuilder.java | 194 +++++++++++++++++++ .../org/apache/carbondata/sdk/file/Field.java | 74 +++++++ .../org/apache/carbondata/sdk/file/Schema.java | 74 +++++++ .../sdk/file/CSVCarbonWriterSuite.java | 127 ++++++++++++ 18 files changed, 1225 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/common/src/main/java/org/apache/carbondata/common/Strings.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java b/common/src/main/java/org/apache/carbondata/common/Strings.java new file mode 100644 index 0000000..23288dd --- /dev/null +++ b/common/src/main/java/org/apache/carbondata/common/Strings.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common; + +import java.util.Objects; + +public class Strings { + + /** + * Provide same function as mkString in Scala. + * This is added to avoid JDK 8 dependency. + */ + public static String mkString(String[] strings, String delimeter) { + Objects.requireNonNull(strings); + Objects.requireNonNull(delimeter); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < strings.length; i++) { + builder.append(strings[i]); + if (i != strings.length - 1) { + builder.append(delimeter); + } + } + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/common/src/test/java/org/apache/carbondata/common/StringsSuite.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/carbondata/common/StringsSuite.java b/common/src/test/java/org/apache/carbondata/common/StringsSuite.java new file mode 100644 index 0000000..65da32b --- /dev/null +++ b/common/src/test/java/org/apache/carbondata/common/StringsSuite.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common; + +import org.junit.Assert; +import org.junit.Test; + +public class StringsSuite { + + @Test(expected = NullPointerException.class) + public void testMkStringNullString() { + Strings.mkString(null, ","); + } + + @Test(expected = NullPointerException.class) + public void testMkStringNullDelimeter() { + Strings.mkString(new String[]{"abc"}, null); + } + + @Test + public void testMkString() { + String[] strings = new String[]{}; + String output = Strings.mkString(strings, ","); + Assert.assertTrue(output.length() == 0); + + strings = new String[]{"abc"}; + output = Strings.mkString(strings, ","); + Assert.assertEquals("abc", output); + + strings = new String[]{"abc", "def"}; + output = Strings.mkString(strings, ","); + Assert.assertEquals("abc,def", output); + + strings = new String[]{"abc", "def", "ghj"}; + output = Strings.mkString(strings, ","); + Assert.assertEquals("abc,def,ghj", output); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 6ed1197..4d54ad5 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -875,4 +875,11 @@ public class CarbonTable implements Serializable { return projection; } + + /** + * Create a {@link CarbonTableBuilder} to create {@link CarbonTable} + */ + public static CarbonTableBuilder builder() { + return new CarbonTableBuilder(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java new file mode 100644 index 0000000..27808f8 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.metadata.schema.table; + +import java.util.ArrayList; +import java.util.Objects; + +/** + * Builder for {@link CarbonTable} + */ +public class CarbonTableBuilder { + + private String tableName; + private String databaseName; + private String tablePath; + private TableSchema tableSchema; + + public CarbonTableBuilder tableName(String tableName) { + Objects.requireNonNull(tableName, "tableName should not be null"); + this.tableName = tableName; + return this; + } + + public CarbonTableBuilder databaseName(String databaseName) { + Objects.requireNonNull(databaseName, "databaseName should not be null"); + this.databaseName = databaseName; + return this; + } + + public CarbonTableBuilder tablePath(String tablePath) { + Objects.requireNonNull(tablePath, "tablePath should not be null"); + this.tablePath = tablePath; + return this; + } + + public CarbonTableBuilder tableSchema(TableSchema tableSchema) { + Objects.requireNonNull(tableSchema, "tableSchema should not be null"); + this.tableSchema = tableSchema; + return this; + } + + public CarbonTable build() { + Objects.requireNonNull(tableName, "tableName should not be null"); + Objects.requireNonNull(databaseName, "databaseName should not be null"); + Objects.requireNonNull(tablePath, "tablePath should not be null"); + Objects.requireNonNull(tableSchema, "tableSchema should not be null"); + + TableInfo tableInfo = new TableInfo(); + tableInfo.setDatabaseName(databaseName); + tableInfo.setTableUniqueName(databaseName + "_" + tableName); + tableInfo.setFactTable(tableSchema); + tableInfo.setTablePath(tablePath); + tableInfo.setLastUpdatedTime(System.currentTimeMillis()); + tableInfo.setDataMapSchemaList(new ArrayList<DataMapSchema>(0)); + return CarbonTable.buildFromTableInfo(tableInfo); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java index 8fdfbab..fff1a74 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java @@ -294,4 +294,11 @@ public class TableSchema implements Serializable, Writable { return dataMapSchema; } + /** + * Create a {@link TableSchemaBuilder} to create {@link TableSchema} + */ + public static TableSchemaBuilder builder() { + return new TableSchemaBuilder(); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java new file mode 100644 index 0000000..88774ec --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.metadata.schema.table; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.SchemaEvolution; +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; + +/** + * Builder for {@link TableSchema} + */ +public class TableSchemaBuilder { + + private int ordinal = 0; + + private List<ColumnSchema> sortColumns = new LinkedList<>(); + + private List<ColumnSchema> otherColumns = new LinkedList<>(); + + public TableSchema build() { + TableSchema schema = new TableSchema(); + schema.setTableId(UUID.randomUUID().toString()); + schema.setPartitionInfo(null); + schema.setBucketingInfo(null); + SchemaEvolution schemaEvol = new SchemaEvolution(); + schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>()); + schema.setSchemaEvalution(schemaEvol); + List<ColumnSchema> allColumns = new LinkedList<>(sortColumns); + allColumns.addAll(otherColumns); + schema.setListOfColumns(allColumns); + return schema; + } + + public TableSchemaBuilder addColumn(StructField field, boolean isSortColumn) { + Objects.requireNonNull(field); + checkRepeatColumnName(field); + ColumnSchema newColumn = new ColumnSchema(); + newColumn.setColumnName(field.getFieldName()); + newColumn.setDataType(field.getDataType()); + newColumn.setDimensionColumn(isSortColumn || field.getDataType() == DataTypes.STRING); + newColumn.setSchemaOrdinal(ordinal++); + newColumn.setColumnar(true); + newColumn.setColumnUniqueId(UUID.randomUUID().toString()); + newColumn.setColumnReferenceId(newColumn.getColumnUniqueId()); + newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn)); + + if (isSortColumn) { + sortColumns.add(newColumn); + } else { + otherColumns.add(newColumn); + } + return this; + } + + /** + * Throw exception if {@param field} name is repeated + */ + private void checkRepeatColumnName(StructField field) { + for (ColumnSchema column : sortColumns) { + if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) { + throw new IllegalArgumentException("column name already exists"); + } + } + for (ColumnSchema column : otherColumns) { + if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) { + throw new IllegalArgumentException("column name already exists"); + } + } + } + + private List<Encoding> createEncoding(DataType dataType, boolean isSortColumn) { + List<Encoding> encodings = new LinkedList<>(); + if (dataType == DataTypes.TIMESTAMP || dataType == DataTypes.DATE) { + encodings.add(Encoding.DIRECT_DICTIONARY); + } + if (isSortColumn) { + encodings.add(Encoding.INVERTED_INDEX); + } + return encodings; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilderSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilderSuite.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilderSuite.java new file mode 100644 index 0000000..83b65a0 --- /dev/null +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilderSuite.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.metadata.schema.table; + +import org.junit.Assert; +import org.junit.Test; + +public class CarbonTableBuilderSuite { + + TableSchema schema = CarbonTableTest.getTableSchema("t1"); + + @Test(expected = NullPointerException.class) + public void testNullTableName() { + TableSchema schema = CarbonTableTest.getTableSchema(null); + CarbonTable table = CarbonTable.builder() + .tableName(null) + .databaseName("db1") + .tableSchema(schema) + .tablePath("_temp") + .build(); + } + + @Test(expected = NullPointerException.class) + public void testNullDbName() { + CarbonTable table = CarbonTable.builder() + .tableName(schema.getTableName()) + .databaseName(null) + .tableSchema(schema) + .tablePath("_temp") + .build(); + } + + @Test(expected = NullPointerException.class) + public void testNullSchema() { + CarbonTable table = CarbonTable.builder() + .tableName(schema.getTableName()) + .databaseName("db1") + .tableSchema(null) + .tablePath("_temp") + .build(); + } + + @Test(expected = NullPointerException.class) + public void testNullTablePath() { + CarbonTable table = CarbonTable.builder() + .tableName(schema.getTableName()) + .databaseName("db1") + .tableSchema(schema) + .tablePath(null) + .build(); + } + + @Test + public void testBuilder() { + CarbonTable table = CarbonTable.builder() + .tableName(schema.getTableName()) + .databaseName("db1") + .tableSchema(schema) + .tablePath("_temp") + .build(); + Assert.assertEquals(schema.getTableName(), table.getTableName()); + Assert.assertEquals("db1", table.getDatabaseName()); + Assert.assertEquals("_temp", table.getTablePath()); + Assert.assertEquals(schema.getTableName(), table.getAbsoluteTableIdentifier().getTableName()); + Assert.assertEquals("db1", table.getAbsoluteTableIdentifier().getDatabaseName()); + Assert.assertEquals("_temp", table.getAbsoluteTableIdentifier().getTablePath()); + Assert.assertEquals("db1_t1", table.getTableUniqueName()); + Assert.assertEquals(schema, table.getTableInfo().getFactTable()); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java index a47b7fd..0266df4 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java @@ -68,7 +68,7 @@ public class CarbonTableTest extends TestCase { assertTrue(carbonTable.getDimensionByName("carbonTestTable", "IMEI").equals(dimension)); } - private ColumnSchema getColumnarDimensionColumn() { + static ColumnSchema getColumnarDimensionColumn() { ColumnSchema dimColumn = new ColumnSchema(); dimColumn.setColumnar(true); dimColumn.setColumnName("IMEI"); @@ -83,7 +83,7 @@ public class CarbonTableTest extends TestCase { return dimColumn; } - private ColumnSchema getColumnarMeasureColumn() { + static ColumnSchema getColumnarMeasureColumn() { ColumnSchema dimColumn = new ColumnSchema(); dimColumn.setColumnName("IMEI_COUNT"); dimColumn.setColumnUniqueId(UUID.randomUUID().toString()); @@ -91,23 +91,23 @@ public class CarbonTableTest extends TestCase { return dimColumn; } - private TableSchema getTableSchema() { + static TableSchema getTableSchema(String tableName) { TableSchema tableSchema = new TableSchema(); List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); columnSchemaList.add(getColumnarDimensionColumn()); columnSchemaList.add(getColumnarMeasureColumn()); tableSchema.setListOfColumns(columnSchemaList); tableSchema.setTableId(UUID.randomUUID().toString()); - tableSchema.setTableName("carbonTestTable"); + tableSchema.setTableName(tableName); return tableSchema; } - private TableInfo getTableInfo(long timeStamp) { + static private TableInfo getTableInfo(long timeStamp) { TableInfo info = new TableInfo(); info.setDatabaseName("carbonTestDatabase"); info.setLastUpdatedTime(timeStamp); info.setTableUniqueName("carbonTestDatabase_carbonTestTable"); - info.setFactTable(getTableSchema()); + info.setFactTable(getTableSchema("carbonTestTable")); info.setTablePath("testore"); return info; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java new file mode 100644 index 0000000..34fa920 --- /dev/null +++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.metadata.schema.table; + +import java.util.List; + +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; + +import org.junit.Assert; +import org.junit.Test; + +public class TableSchemaBuilderSuite { + + @Test(expected = NullPointerException.class) + public void testNullField() { + TableSchemaBuilder builder = TableSchema.builder(); + builder.addColumn(null, true); + } + + @Test + public void testBuilder() { + TableSchemaBuilder builder = TableSchema.builder(); + builder.addColumn(new StructField("a", DataTypes.INT), true); + builder.addColumn(new StructField("b", DataTypes.DOUBLE), false); + TableSchema schema = builder.build(); + Assert.assertEquals(2, schema.getListOfColumns().size()); + List<ColumnSchema> columns = schema.getListOfColumns(); + Assert.assertEquals("a", columns.get(0).getColumnName()); + Assert.assertEquals("b", columns.get(1).getColumnName()); + } + + @Test(expected = IllegalArgumentException.class) + public void testRepeatedColumn() { + TableSchemaBuilder builder = TableSchema.builder(); + builder.addColumn(new StructField("a", DataTypes.INT), true); + builder.addColumn(new StructField("a", DataTypes.DOUBLE), false); + TableSchema schema = builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index 49e4420..8d394db 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -217,6 +217,51 @@ object DataLoadingUtil { /** * build CarbonLoadModel for data loading + * @param table CarbonTable object containing all metadata information for the table + * like table name, table path, schema, etc + * @param options Load options from user input + * @return a new CarbonLoadModel instance + */ + def buildCarbonLoadModelJava( + table: CarbonTable, + options: java.util.Map[String, String] + ): CarbonLoadModel = { + val carbonProperty: CarbonProperties = CarbonProperties.getInstance + val optionsFinal = getDataLoadingOptions(carbonProperty, options.asScala.toMap) + optionsFinal.put("sort_scope", "no_sort") + if (!options.containsKey("fileheader")) { + val csvHeader = table.getCreateOrderColumn(table.getTableName) + .asScala.map(_.getColName).mkString(",") + optionsFinal.put("fileheader", csvHeader) + } + val model = new CarbonLoadModel() + buildCarbonLoadModel( + table = table, + carbonProperty = carbonProperty, + options = options.asScala.toMap, + optionsFinal = optionsFinal, + carbonLoadModel = model, + hadoopConf = null) // we have provided 'fileheader', so it can be null + + // set default values + model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + model.setUseOnePass(options.asScala.getOrElse("onepass", "false").toBoolean) + model.setDictionaryServerHost(options.asScala.getOrElse("dicthost", null)) + model.setDictionaryServerPort(options.asScala.getOrElse("dictport", "-1").toInt) + model + } + + /** + * build CarbonLoadModel for data loading + * @param table CarbonTable object containing all metadata information for the table + * like table name, table path, schema, etc + * @param carbonProperty Carbon property instance + * @param options Load options from user input + * @param optionsFinal Load options that populated with default values for optional options + * @param carbonLoadModel The output load model + * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in + * user provided load options */ def buildCarbonLoadModel( table: CarbonTable, http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0b9917d..520d770 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,7 @@ <module>core</module> <module>processing</module> <module>hadoop</module> + <module>store/sdk</module> <module>integration/spark-common</module> <module>integration/spark-common-test</module> <module>assembly</module> @@ -571,6 +572,12 @@ <id>include-all</id> </profile> <profile> + <id>store-sdk</id> + <modules> + <module>store/sdk</module> + </modules> + </profile> + <profile> <id>sdvtest</id> <modules> <module>integration/spark-common-cluster-test</module> http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/store/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml new file mode 100644 index 0000000..6663683 --- /dev/null +++ b/store/sdk/pom.xml @@ -0,0 +1,130 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.3.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-store-sdk</artifactId> + <name>Apache CarbonData :: Store SDK</name> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-spark-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <version>2.2.1</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>src/resources</directory> + </resource> + <resource> + <directory>.</directory> + <includes> + <include>CARBON_SPARK_INTERFACELogResource.properties</include> + </includes> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.18</version> + <!-- Note config is repeated in scalatest config --> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + </systemProperties> + <failIfNoTests>false</failIfNoTests> + </configuration> + </plugin> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <!-- Note config is repeated in surefire config --> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <filereports>CarbonTestSuite.txt</filereports> + <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + </argLine> + <stderr /> + <environmentVariables> + </environmentVariables> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + </systemProperties> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java new file mode 100644 index 0000000..daea733 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.Random; +import java.util.UUID; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Implementation to write rows in CSV format to carbondata file. + */ +@InterfaceAudience.Developer +@InterfaceStability.Unstable +class CSVCarbonWriter extends CarbonWriter { + + private RecordWriter<NullWritable, StringArrayWritable> recordWriter; + private TaskAttemptContext context; + private StringArrayWritable writable; + + CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException { + Configuration hadoopConf = new Configuration(); + CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel); + CarbonTableOutputFormat format = new CarbonTableOutputFormat(); + JobID jobId = new JobID(UUID.randomUUID().toString(), 0); + Random random = new Random(); + TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); + TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(hadoopConf, attemptID); + this.recordWriter = format.getRecordWriter(context); + this.context = context; + this.writable = new StringArrayWritable(); + } + + /** + * Write single row data, input row is of type String[] + */ + @Override + public void write(Object object) throws IOException { + writable.set((String[]) object); + try { + recordWriter.write(NullWritable.get(), writable); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Flush and close the writer + */ + @Override + public void close() throws IOException { + try { + recordWriter.close(context); + } catch (InterruptedException e) { + throw new IOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java new file mode 100644 index 0000000..e29aa18 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +/** + * Writer to write row data to carbondata file. Call {@link #builder()} to get + * a build to create instance of writer. + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public abstract class CarbonWriter { + + /** + * Write an object to the file, the format of the object depends on the + * implementation + */ + public abstract void write(Object object) throws IOException; + + /** + * Flush and close the writer + */ + public abstract void close() throws IOException; + + /** + * Create a {@link CarbonWriterBuilder} to build a {@link CarbonWriter} + */ + public static CarbonWriterBuilder builder() { + return new CarbonWriterBuilder(); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java new file mode 100644 index 0000000..51ca09c --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.carbondata.common.Strings; +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.CarbonMetadata; +import org.apache.carbondata.core.metadata.converter.SchemaConverter; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.TableSchema; +import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.ThriftWriter; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.spark.util.DataLoadingUtil; + +/** + * Biulder for {@link CarbonWriter} + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public class CarbonWriterBuilder { + private Schema schema; + private String path; + private String[] sortColumns; + private boolean persistSchemaFile; + + public CarbonWriterBuilder withSchema(Schema schema) { + Objects.requireNonNull(schema, "schema should not be null"); + this.schema = schema; + return this; + } + + public CarbonWriterBuilder outputPath(String path) { + Objects.requireNonNull(path, "path should not be null"); + this.path = path; + return this; + } + + public CarbonWriterBuilder sortBy(String[] sortColumns) { + this.sortColumns = sortColumns; + return this; + } + + public CarbonWriterBuilder partitionBy(String[] partitionColumns) { + throw new UnsupportedOperationException(); + } + + public CarbonWriterBuilder persistSchemaFile(boolean persist) { + this.persistSchemaFile = persist; + return this; + } + + public CarbonWriterBuilder withBlockSize(int blockSize) { + if (blockSize <= 0) { + throw new IllegalArgumentException("blockSize should be greater than zero"); + } + throw new UnsupportedOperationException(); + } + + public CarbonWriterBuilder withBlockletSize(int blockletSize) { + if (blockletSize <= 0) { + throw new IllegalArgumentException("blockletSize should be greater than zero"); + } + throw new UnsupportedOperationException(); + } + + /** + * Build a {@link CSVCarbonWriter}, which accepts row in CSV format + */ + public CarbonWriter buildWriterForCSVInput() throws IOException { + Objects.requireNonNull(schema, "schema should not be null"); + Objects.requireNonNull(path, "path should not be null"); + + // build CarbonTable using schema + CarbonTable table = buildCarbonTable(); + if (persistSchemaFile) { + // we are still using the traditional carbon table folder structure + persistSchemaFile(table, CarbonTablePath.getSchemaFilePath(path)); + } + + // build LoadModel + CarbonLoadModel loadModel = buildLoadModel(table); + return new CSVCarbonWriter(loadModel); + } + + /** + * Build a {@link AvroCarbonWriter}, which accepts Avro object + * @return + * @throws IOException + */ + public CarbonWriter buildWriterForAvroInput() throws IOException { + // TODO + throw new UnsupportedOperationException(); + } + + /** + * Build a {@link CarbonTable} + */ + private CarbonTable buildCarbonTable() { + TableSchemaBuilder tableSchemaBuilder = TableSchema.builder(); + List<String> sortColumnsList; + if (sortColumns != null) { + sortColumnsList = Arrays.asList(sortColumns); + } else { + sortColumnsList = new LinkedList<>(); + } + for (Field field : schema.getFields()) { + tableSchemaBuilder.addColumn( + new StructField(field.getFieldName(), field.getDataType()), + sortColumnsList.contains(field.getFieldName())); + } + String tableName = "_tempTable"; + String dbName = "_tempDB"; + TableSchema schema = tableSchemaBuilder.build(); + schema.setTableName(tableName); + CarbonTable table = CarbonTable.builder() + .tableName(schema.getTableName()) + .databaseName(dbName) + .tablePath(path) + .tableSchema(schema) + .build(); + return table; + } + + /** + * Save the schema of the {@param table} to {@param persistFilePath} + * @param table table object containing schema + * @param persistFilePath absolute file path with file name + */ + private void persistSchemaFile(CarbonTable table, String persistFilePath) throws IOException { + TableInfo tableInfo = table.getTableInfo(); + String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(persistFilePath); + CarbonMetadata.getInstance().loadTableMetadata(tableInfo); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + org.apache.carbondata.format.TableInfo thriftTableInfo = + schemaConverter.fromWrapperToExternalTableInfo( + tableInfo, + tableInfo.getDatabaseName(), + tableInfo.getFactTable().getTableName()); + org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry = + new org.apache.carbondata.format.SchemaEvolutionEntry( + tableInfo.getLastUpdatedTime()); + thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history() + .add(schemaEvolutionEntry); + FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath); + if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { + FileFactory.mkdirs(schemaMetadataPath, fileType); + } + ThriftWriter thriftWriter = new ThriftWriter(persistFilePath, false); + thriftWriter.open(); + thriftWriter.write(thriftTableInfo); + thriftWriter.close(); + } + + /** + * Build a {@link CarbonLoadModel} + */ + private CarbonLoadModel buildLoadModel(CarbonTable table) { + Map<String, String> options = new HashMap<>(); + if (sortColumns != null) { + options.put("sort_columns", Strings.mkString(sortColumns, ",")); + } + return DataLoadingUtil.buildCarbonLoadModelJava(table, options); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java new file mode 100644 index 0000000..6742fa7 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; + +/** + * A field represent one column + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public class Field { + + private String name; + private DataType type; + + public Field(String name, String type) { + this.name = name; + if (type.equalsIgnoreCase("string")) { + this.type = DataTypes.STRING; + } else if (type.equalsIgnoreCase("date")) { + this.type = DataTypes.DATE; + } else if (type.equalsIgnoreCase("timestamp")) { + this.type = DataTypes.TIMESTAMP; + } else if (type.equalsIgnoreCase("boolean")) { + this.type = DataTypes.BOOLEAN; + } else if (type.equalsIgnoreCase("byte")) { + this.type = DataTypes.BYTE; + } else if (type.equalsIgnoreCase("short")) { + this.type = DataTypes.SHORT; + } else if (type.equalsIgnoreCase("int")) { + this.type = DataTypes.INT; + } else if (type.equalsIgnoreCase("long")) { + this.type = DataTypes.LONG; + } else if (type.equalsIgnoreCase("float")) { + this.type = DataTypes.FLOAT; + } else if (type.equalsIgnoreCase("double")) { + this.type = DataTypes.DOUBLE; + } else { + throw new IllegalArgumentException("unsupported data type: " + type); + } + } + + public Field(String name, DataType type) { + this.name = name; + this.type = type; + } + + public String getFieldName() { + return name; + } + + public DataType getDataType() { + return type; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java new file mode 100644 index 0000000..52a4611 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; + +/** + * A schema used to write and read data files + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public class Schema { + + private Field[] fields; + + public Schema(Field[] fields) { + this.fields = fields; + } + + /** + * Create a Schema using JSON string, for example: + * [ + * {"name":"string"}, + * {"age":"int"} + * ] + */ + public static Schema parseJson(String json) { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapter(Field.class, new TypeAdapter<Field>() { + @Override + public void write(JsonWriter out, Field field) throws IOException { + // noop + } + + @Override + public Field read(JsonReader in) throws IOException { + in.beginObject(); + Field field = new Field(in.nextName(), in.nextString()); + in.endObject(); + return field; + } + }); + + Field[] fields = gsonBuilder.create().fromJson(json, Field[].class); + return new Schema(fields); + } + + public Field[] getFields() { + return fields; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf4973c6/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java new file mode 100644 index 0000000..531ec7c --- /dev/null +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test suite for {@link CSVCarbonWriter} + */ +public class CSVCarbonWriterSuite { + + @Test + public void testWriteFiles() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testWriteFilesJsonSchema() throws IOException { + String path = "./testWriteFilesJsonSchema"; + FileUtils.deleteDirectory(new File(path)); + + String schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString(); + + writeFilesAndVerify(Schema.parseJson(schema), path); + + FileUtils.deleteDirectory(new File(path)); + } + + private void writeFilesAndVerify(Schema schema, String path) { + try { + CarbonWriter writer = CarbonWriter.builder() + .withSchema(schema) + .outputPath(path) + .buildWriterForCSVInput(); + + for (int i = 0; i < 100; i++) { + writer.write(new String[]{"robot" + i, String.valueOf(i), String.valueOf((double) i / 2)}); + } + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(1, dataFiles.length); + } + + @Test + public void testAllPrimitiveDataType() { + // TODO: write all data type and read by CarbonRecordReader to verify the content + } + + @Test + public void test2Blocklet() { + // TODO: write data with more than one blocklet + } + + @Test + public void test2Block() { + // TODO: write data with more than one block + } + + @Test + public void testSortColumns() { + // TODO: test sort column + } + + @Test + public void testPartitionOutput() { + // TODO: test write data with partition + } + + @Test + public void testSchemaPersistence() { + // TODO: verify schema file is persisted in specified location + } + +}