[CARBONDATA-2961] Simplify SDK API interfaces [CARBONDATA-2961] Simplify SDK API interfaces
problem: current SDK API interfaces are not simpler and don't follow builder pattern. If new features are added, it will become more complex. Solution: Simplify the SDK interfaces as per builder pattern. Refer the latest sdk-guide. Added: changes in Carbon Writer: public CarbonWriterBuilder withThreadSafe(short numOfThreads) public CarbonWriterBuilder withHadoopConf(Configuration conf) public CarbonWriterBuilder withCsvInput(Schema schema) public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema) public CarbonWriterBuilder withJsonInput(Schema carbonSchema) public CarbonWriter build() throws IOException, InvalidLoadOptionException Changes in carbon Reader public CarbonReaderBuilder withHadoopConf(Configuration conf) public CarbonWriter build() throws IOException, InvalidLoadOptionException Removed: changes in Carbon Writer: public CarbonWriterBuilder isTransactionalTable(boolean isTransactionalTable) public CarbonWriterBuilder persistSchemaFile(boolean persist) setAccessKey setAccessKey setSecretKey setSecretKey setEndPoint setEndPoint public CarbonWriter buildWriterForCSVInput(Schema schema, Configuration configuration) public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads,Configuration configuration) public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema avroSchema,Configuration configuration) public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema,short numOfThreads, Configuration configuration) public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration) public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads,Configuration configuration) Changes in carbon Reader public CarbonReaderBuilder isTransactionalTable(boolean isTransactionalTable) public CarbonWriter build(Configuration conf) throws IOException, InvalidLoadOptionException This closes #2961 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6a2a94d0 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6a2a94d0 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6a2a94d0 Branch: refs/heads/master Commit: 6a2a94d057d33435c16405d1da2e682cb748a77d Parents: 1d4d240 Author: ajantha-bhat <ajanthab...@gmail.com> Authored: Fri Sep 21 16:32:56 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Sep 25 20:59:50 2018 +0530 ---------------------------------------------------------------------- docs/sdk-guide.md | 208 +++------ .../examples/sdk/CarbonReaderExample.java | 7 +- .../carbondata/examples/sdk/SDKS3Example.java | 21 +- .../examples/sdk/SDKS3ReadExample.java | 6 +- .../carbondata/examples/DirectSQLExample.scala | 12 +- .../carbondata/examples/S3UsingSDkExample.scala | 18 +- .../sdv/generated/SDKwriterTestCase.scala | 67 +-- ...FileInputFormatWithExternalCarbonTable.scala | 30 +- .../TestNonTransactionalCarbonTable.scala | 99 ++--- ...tNonTransactionalCarbonTableForMapType.scala | 6 +- ...tNonTransactionalCarbonTableJsonWriter.scala | 5 +- ...ansactionalCarbonTableWithAvroDataType.scala | 86 ++-- ...ransactionalCarbonTableWithComplexType.scala | 13 +- ...tSparkCarbonFileFormatWithSparkSession.scala | 176 -------- .../datasources/CarbonSparkDataSourceUtil.scala | 1 - .../datasource/SparkCarbonDataSourceTest.scala | 17 +- ...tCreateTableUsingSparkCarbonFileFormat.scala | 85 ++-- .../sdk/file/CarbonReaderBuilder.java | 105 +---- .../sdk/file/CarbonWriterBuilder.java | 342 ++++----------- .../apache/carbondata/sdk/file/TestUtil.java | 44 +- .../carbondata/store/MetaCachedCarbonStore.java | 21 +- .../sdk/file/AvroCarbonWriterTest.java | 72 +-- .../sdk/file/CSVCarbonWriterTest.java | 86 ++-- .../CSVNonTransactionalCarbonWriterTest.java | 298 ------------- .../carbondata/sdk/file/CarbonReaderTest.java | 434 ++++--------------- .../sdk/file/ConcurrentAvroSdkWriterTest.java | 9 +- .../sdk/file/ConcurrentSdkWriterTest.java | 9 +- .../carbondata/store/LocalCarbonStoreTest.java | 2 +- 28 files changed, 486 insertions(+), 1793 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/docs/sdk-guide.md ---------------------------------------------------------------------- diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md index dc21efd..d1e4bc5 100644 --- a/docs/sdk-guide.md +++ b/docs/sdk-guide.md @@ -67,9 +67,9 @@ These SDK writer output contains just a carbondata and carbonindex files. No met CarbonProperties.getInstance().addProperty("enable.offheap.sort", enableOffheap); - CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path); + CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path).withCsvInput(schema); - CarbonWriter writer = builder.buildWriterForCSVInput(schema); + CarbonWriter writer = builder.build(); int rows = 5; for (int i = 0; i < rows; i++) { @@ -124,7 +124,7 @@ public class TestSdkAvro { try { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) - .buildWriterForAvroInput(new org.apache.avro.Schema.Parser().parse(avroSchema)); + .withAvroInput(new org.apache.avro.Schema.Parser().parse(avroSchema)).build(); for (int i = 0; i < 100; i++) { writer.write(record); @@ -164,10 +164,10 @@ public class TestSdkJson { Schema CarbonSchema = new Schema(fields); - CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path); + CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path).withJsonInput(CarbonSchema); // initialize json writer with carbon schema - CarbonWriter writer = builder.buildWriterForJsonInput(CarbonSchema); + CarbonWriter writer = builder.build(); // one row of json Data as String String JsonRow = "{\"name\":\"abcd\", \"age\":10}"; @@ -199,7 +199,7 @@ Each of SQL data types are mapped into data types of SDK. Following are the mapp | DECIMAL | DataTypes.createDecimalType(precision, scale) | **NOTE:** - Carbon Supports below logical types of AVRO. + 1. Carbon Supports below logical types of AVRO. a. Date The date logical type represents a date within the calendar, with no reference to a particular time zone or time of day. A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, 1 January 1970 (ISO calendar). @@ -211,7 +211,9 @@ Each of SQL data types are mapped into data types of SDK. Following are the mapp A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds from the unix epoch, 1 January 1970 00:00:00.000000 UTC. Currently the values of logical types are not validated by carbon. - Expect that avro record passed by the user is already validated by avro record generator tools. + Expect that avro record passed by the user is already validated by avro record generator tools. + 2. If the string data is more than 32K in length, use withTableProperties() with "long_string_columns" property + or directly use DataTypes.VARCHAR if it is carbon schema. ## Run SQL on files directly Instead of creating table and query it, you can also query that file directly with SQL. @@ -236,20 +238,6 @@ public CarbonWriterBuilder outputPath(String path); ``` /** -* If set false, writes the carbondata and carbonindex files in a flat folder structure -* @param isTransactionalTable is a boolelan value -* if set to false, then writes the carbondata and carbonindex files -* in a flat folder structure. -* if set to true, then writes the carbondata and carbonindex files -* in segment folder structure.. -* By default set to false. -* @return updated CarbonWriterBuilder -*/ -public CarbonWriterBuilder isTransactionalTable(boolean isTransactionalTable); -``` - -``` -/** * to set the timestamp in the carbondata and carbonindex index files * @param UUID is a timestamp to be used in the carbondata and carbonindex index files. * By default set to zero. @@ -308,16 +296,6 @@ public CarbonWriterBuilder sortBy(String[] sortColumns); ``` /** -* If set, create a schema file in metadata folder. -* @param persist is a boolean value, If set to true, creates a schema file in metadata folder. -* By default set to false. will not create metadata folder -* @return updated CarbonWriterBuilder -*/ -public CarbonWriterBuilder persistSchemaFile(boolean persist); -``` - -``` -/** * sets the taskNo for the writer. SDKs concurrently running * will set taskNo in order to avoid conflicts in file's name during write. * @param taskNo is the TaskNo user wants to specify. @@ -370,6 +348,7 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options); * c. local_dictionary_threshold -- positive value, default is 10000 * d. local_dictionary_enable -- true / false. Default is false * e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted. + If empty string "" is passed. No columns are sorted * j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort" * k. long_string_columns -- comma separated string columns which are more than 32k length. * default value is null. @@ -379,91 +358,69 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options); public CarbonWriterBuilder withTableProperties(Map<String, String> options); ``` - ``` /** -* this writer is not thread safe, use buildThreadSafeWriterForCSVInput in multi thread environment -* Build a {@link CarbonWriter}, which accepts row in CSV format object -* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema} -* @param configuration hadoop configuration object. -* @return CSVCarbonWriter -* @throws IOException -* @throws InvalidLoadOptionException +* To make sdk writer thread safe. +* +* @param numOfThreads should number of threads in which writer is called in multi-thread scenario +* default sdk writer is not thread safe. +* can use one writer instance in one thread only. +* @return updated CarbonWriterBuilder */ -public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException; +public CarbonWriterBuilder withThreadSafe(short numOfThreads); ``` ``` /** -* Can use this writer in multi-thread instance. -* Build a {@link CarbonWriter}, which accepts row in CSV format -* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema} -* @param numOfThreads number of threads() in which .write will be called. -* @param configuration hadoop configuration object -* @return CSVCarbonWriter -* @throws IOException -* @throws InvalidLoadOptionException +* To support hadoop configuration +* +* @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this +* @return updated CarbonWriterBuilder */ -public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads, Configuration configuration) - throws IOException, InvalidLoadOptionException; +public CarbonWriterBuilder withHadoopConf(Configuration conf) ``` - -``` +``` /** -* this writer is not thread safe, use buildThreadSafeWriterForAvroInput in multi thread environment -* Build a {@link CarbonWriter}, which accepts Avro format object -* @param avroSchema avro Schema object {org.apache.avro.Schema} -* @param configuration hadoop configuration object -* @return AvroCarbonWriter -* @throws IOException -* @throws InvalidLoadOptionException +* to build a {@link CarbonWriter}, which accepts row in CSV format +* +* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema} +* @return CarbonWriterBuilder */ -public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException; +public CarbonWriterBuilder withCsvInput(Schema schema); ``` ``` /** -* Can use this writer in multi-thread instance. -* Build a {@link CarbonWriter}, which accepts Avro object +* to build a {@link CarbonWriter}, which accepts Avro object +* * @param avroSchema avro Schema object {org.apache.avro.Schema} -* @param numOfThreads number of threads() in which .write will be called. -* @param configuration hadoop configuration object -* @return AvroCarbonWriter -* @throws IOException -* @throws InvalidLoadOptionException +* @return CarbonWriterBuilder */ -public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short -numOfThreads, Configuration configuration) - throws IOException, InvalidLoadOptionException +public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema); ``` - ``` /** -* this writer is not thread safe, use buildThreadSafeWriterForJsonInput in multi thread environment -* Build a {@link CarbonWriter}, which accepts Json object +* to build a {@link CarbonWriter}, which accepts Json object +* * @param carbonSchema carbon Schema object -* @param configuration hadoop configuration object -* @return JsonCarbonWriter -* @throws IOException -* @throws InvalidLoadOptionException +* @return CarbonWriterBuilder */ -public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration); +public CarbonWriterBuilder withJsonInput(Schema carbonSchema); ``` ``` /** -* Can use this writer in multi-thread instance. -* Build a {@link CarbonWriter}, which accepts Json object -* @param carbonSchema carbon Schema object -* @param numOfThreads number of threads() in which .write will be called. -* @param configuration hadoop configuraiton object. -* @return JsonCarbonWriter +* Build a {@link CarbonWriter} +* This writer is not thread safe, +* use withThreadSafe() configuration in multi thread environment +* +* @return CarbonWriter {AvroCarbonWriter/CSVCarbonWriter/JsonCarbonWriter based on Input Type } * @throws IOException * @throws InvalidLoadOptionException */ -public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads, Configuration configuration) +public CarbonWriter build() throws IOException, InvalidLoadOptionException; ``` ### Class org.apache.carbondata.sdk.file.CarbonWriter @@ -474,7 +431,6 @@ public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, s * which is one row of data. * If CSVCarbonWriter, object is of type String[], which is one row of data * If JsonCarbonWriter, object is of type String, which is one row of json -* Note: This API is not thread safe if writer is not built with number of threads argument. * @param object * @throws IOException */ @@ -648,19 +604,6 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/ ``` ``` - /** - * Configure the transactional status of table - * If set to false, then reads the carbondata and carbonindex files from a flat folder structure. - * If set to true, then reads the carbondata and carbonindex files from segment folder structure. - * Default value is false - * - * @param isTransactionalTable whether is transactional table or not - * @return CarbonReaderBuilder object - */ - public CarbonReaderBuilder isTransactionalTable(boolean isTransactionalTable); -``` - -``` /** * Configure the filter expression for carbon reader * @@ -671,66 +614,13 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/ ``` ``` - /** - * Set the access key for S3 - * - * @param key the string of access key for different S3 type,like: fs.s3a.access.key - * @param value the value of access key - * @return CarbonWriterBuilder - */ - public CarbonReaderBuilder setAccessKey(String key, String value); -``` - -``` - /** - * Set the access key for S3. - * - * @param value the value of access key - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setAccessKey(String value); -``` - -``` - /** - * Set the secret key for S3 - * - * @param key the string of secret key for different S3 type,like: fs.s3a.secret.key - * @param value the value of secret key - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setSecretKey(String key, String value); -``` - -``` - /** - * Set the secret key for S3 - * - * @param value the value of secret key - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setSecretKey(String value); -``` - -``` - /** - * Set the endpoint for S3 - * - * @param key the string of endpoint for different S3 type,like: fs.s3a.endpoint - * @param value the value of endpoint - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setEndPoint(String key, String value); -``` - -``` - /** - * Set the endpoint for S3 - * - * @param value the value of endpoint - * @return CarbonWriterBuilder object - */ - public CarbonReaderBuilder setEndPoint(String value); +/** + * To support hadoop configuration + * + * @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this + * @return updated CarbonReaderBuilder + */ + public CarbonReaderBuilder withHadoopConf(Configuration conf); ``` ``` http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java index 76926ce..9e80567 100644 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.sdk.file.CarbonReader; @@ -62,7 +61,7 @@ public class CarbonReaderExample { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) .withLoadOptions(map) - .buildWriterForCSVInput(new Schema(fields), new Configuration(false)); + .withCsvInput(new Schema(fields)).build(); for (int i = 0; i < 10; i++) { String[] row2 = new String[]{ @@ -107,7 +106,7 @@ public class CarbonReaderExample { CarbonReader reader = CarbonReader .builder(path, "_temp") .projection(strings) - .build(new Configuration(false)); + .build(); System.out.println("\nData:"); long day = 24L * 3600 * 1000; @@ -132,7 +131,7 @@ public class CarbonReaderExample { // Read data CarbonReader reader2 = CarbonReader .builder(path, "_temp") - .build(new Configuration(false)); + .build(); System.out.println("\nData:"); i = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java index 3abc342..d4f49f5 100644 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; import org.apache.carbondata.sdk.file.*; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.Constants; /** * Example for testing CarbonWriter on S3 @@ -49,16 +50,16 @@ public class SDKS3Example { num = Integer.parseInt(args[4]); } + Configuration conf = new Configuration(false); + conf.set(Constants.ACCESS_KEY, args[0]); + conf.set(Constants.SECRET_KEY, args[1]); + conf.set(Constants.ENDPOINT, args[2]); + Field[] fields = new Field[2]; fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); - CarbonWriterBuilder builder = CarbonWriter.builder() - .setAccessKey(args[0]) - .setSecretKey(args[1]) - .setEndPoint(args[2]) - .outputPath(path); - - CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), new Configuration(false)); + CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path).withHadoopConf(conf); + CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build(); for (int i = 0; i < num; i++) { writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)}); @@ -74,10 +75,8 @@ public class SDKS3Example { .builder(path, "_temp") .projection(new String[]{"name", "age"}) .filter(equalToExpression) - .setAccessKey(args[0]) - .setSecretKey(args[1]) - .setEndPoint(args[2]) - .build(new Configuration(false)); + .withHadoopConf(conf) + .build(); System.out.println("\nData:"); int i = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java index a236175..1fac673 100644 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java @@ -62,7 +62,8 @@ public class SDKS3ReadExample { .builder(path, "_temp") .projection(new String[]{"name", "age"}) .filter(equalToExpression) - .build(configuration); + .withHadoopConf(configuration) + .build(); System.out.println("\nData:"); int i = 0; @@ -78,7 +79,8 @@ public class SDKS3ReadExample { CarbonReader reader2 = CarbonReader .builder(path, "_temp") .projection(new String[]{"name", "age"}) - .build(configuration); + .withHadoopConf(configuration) + .build(); System.out.println("\nData:"); i = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala index 86bf854..5ddcba0 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala @@ -37,7 +37,7 @@ object DirectSQLExample { def buildTestData( path: String, num: Int = 3, - persistSchema: Boolean = false, sparkSession: SparkSession): Any = { + sparkSession: SparkSession): Any = { // getCanonicalPath gives path with \, but the code expects /. val writerPath = path.replace("\\", "/"); @@ -51,14 +51,10 @@ object DirectSQLExample { val builder = CarbonWriter .builder() .outputPath(writerPath) - .isTransactionalTable(true) .uniqueIdentifier(System.currentTimeMillis) .withBlockSize(2) - if (persistSchema) { - builder.persistSchemaFile(true) - } - val writer = builder - .buildWriterForCSVInput(new Schema(fields), sparkSession.sparkContext.hadoopConfiguration) + .withCsvInput(new Schema(fields)) + val writer = builder.build() var i = 0 while (i < num) { writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) @@ -85,7 +81,7 @@ object DirectSQLExample { // 1. generate data file cleanTestData(path) buildTestData(path, 20, sparkSession = carbonSession) - val readPath = path + "Fact/Part0/Segment_null" + val readPath = path println("Running SQL on carbon files directly") try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala index c5c9710..014bec5 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala @@ -48,18 +48,10 @@ object S3UsingSDKExample { try { val builder = CarbonWriter.builder() val writer = - if (persistSchema) { - builder.persistSchemaFile(true) - builder.outputPath(writerPath).isTransactionalTable(true) - .uniqueIdentifier( - System.currentTimeMillis) - .buildWriterForCSVInput(new Schema(fields), new Configuration(false)) - } else { - builder.outputPath(writerPath).isTransactionalTable(true) - .uniqueIdentifier( - System.currentTimeMillis).withBlockSize(2) - .buildWriterForCSVInput(new Schema(fields), new Configuration(false)) - } + builder.outputPath(writerPath) + .uniqueIdentifier(System.currentTimeMillis) + .withBlockSize(2) + .withCsvInput(new Schema(fields)).build() var i = 0 var row = num while (i < row) { @@ -119,7 +111,7 @@ object S3UsingSDKExample { spark.sql("DROP TABLE IF EXISTS s3_sdk_table") spark.sql(s"CREATE EXTERNAL TABLE s3_sdk_table STORED BY 'carbondata'" + - s" LOCATION '$path/Fact/Part0/Segment_null'") + s" LOCATION '$path'") spark.sql("SELECT * FROM s3_sdk_table LIMIT 10").show() spark.stop() } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala index f785b36..668d9d1 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala @@ -70,31 +70,25 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { } def buildTestDataSingleFile(): Any = { - buildTestData(3, false, null) + buildTestData(3, null) } def buildTestDataWithBadRecordForce(writerPath: String): Any = { var options = Map("bAd_RECords_action" -> "FORCE").asJava - buildTestData(3, false, options) + buildTestData(3, options) } def buildTestDataWithBadRecordFail(writerPath: String): Any = { var options = Map("bAd_RECords_action" -> "FAIL").asJava - buildTestData(15001, false, options) + buildTestData(15001, options) } - def buildTestData(rows: Int, - persistSchema: Boolean, - options: util.Map[String, String]): Any = { - buildTestData(rows, persistSchema, options, List("name"), writerPath) + def buildTestData(rows: Int, options: util.Map[String, String]): Any = { + buildTestData(rows, options, List("name"), writerPath) } // prepare sdk writer output - def buildTestData(rows: Int, - persistSchema: Boolean, - options: util.Map[String, String], - sortColumns: List[String], - writerPath: String): Any = { + def buildTestData(rows: Int, options: util.Map[String, String], sortColumns: List[String], writerPath: String): Any = { val schema = new StringBuilder() .append("[ \n") .append(" {\"name\":\"string\"},\n") @@ -106,31 +100,18 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { try { val builder = CarbonWriter.builder() val writer = - if (persistSchema) { - builder.persistSchemaFile(true) - builder + if (options != null) { + builder.outputPath(writerPath) .sortBy(sortColumns.toArray) - .outputPath(writerPath) - .isTransactionalTable(false) - .uniqueIdentifier(System.currentTimeMillis) - .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration) + .uniqueIdentifier( + System.currentTimeMillis).withBlockSize(2).withLoadOptions(options) + .withCsvInput(Schema.parseJson(schema)).build() } else { - if (options != null) { - builder.outputPath(writerPath) - .isTransactionalTable(false) - .sortBy(sortColumns.toArray) - .uniqueIdentifier( - System.currentTimeMillis).withBlockSize(2).withLoadOptions(options) - .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration) - } else { - builder.outputPath(writerPath) - .isTransactionalTable(false) - .sortBy(sortColumns.toArray) - .uniqueIdentifier( - System.currentTimeMillis).withBlockSize(2) - .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext - .hadoopConfiguration) - } + builder.outputPath(writerPath) + .sortBy(sortColumns.toArray) + .uniqueIdentifier( + System.currentTimeMillis).withBlockSize(2) + .withCsvInput(Schema.parseJson(schema)).build() } var i = 0 while (i < rows) { @@ -156,12 +137,12 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { def buildTestDataWithBadRecordIgnore(writerPath: String): Any = { var options = Map("bAd_RECords_action" -> "IGNORE").asJava - buildTestData(3, false, options) + buildTestData(3, options) } def buildTestDataWithBadRecordRedirect(writerPath: String): Any = { var options = Map("bAd_RECords_action" -> "REDIRECT").asJava - buildTestData(3, false, options) + buildTestData(3, options) } def deleteFile(path: String, extension: String): Unit = { @@ -545,8 +526,8 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { try { val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false) - .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath) + .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build() var i = 0 while (i < rows) { writer.write(record) @@ -744,8 +725,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { .append("]") .toString() val builder = CarbonWriter.builder() - val writer = builder.outputPath(writerPath) - .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration) + val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build() for (i <- 0 until 5) { writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(33000), i.toString)) @@ -768,8 +748,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { .append("]") .toString() val builder = CarbonWriter.builder() - val writer = builder.outputPath(writerPath) - .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration) + val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build() val varCharLen = 4000000 for (i <- 0 until 3) { writer @@ -802,7 +781,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { val writer = builder .outputPath(writerPath) .sortBy(Array[String]()) - .buildWriterForCSVInput(Schema.parseJson(schema), new Configuration()) + .withCsvInput(Schema.parseJson(schema)).build() for (i <- 0 until 5) { writer http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala index 1b181bc..3ab956c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala @@ -33,16 +33,14 @@ import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { - var writerPath = new File(this.getClass.getResource("/").getPath - + - "../." + - "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + var writerPath = new File(this.getClass.getResource("/").getPath + + "../../src/test/resources/SparkCarbonFileFormat/WriterOutput") .getCanonicalPath //getCanonicalPath gives path with \, but the code expects /. writerPath = writerPath.replace("\\", "/"); - def buildTestData(persistSchema:Boolean) = { + def buildTestData(): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -55,16 +53,10 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be .toString() try { - val builder = CarbonWriter.builder().isTransactionalTable(true) + val builder = CarbonWriter.builder() val writer = - if (persistSchema) { - builder.persistSchemaFile(true) - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), - sqlContext.sparkContext.hadoopConfiguration) - } else { - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), - sqlContext.sparkContext.hadoopConfiguration) - } + builder.outputPath(writerPath + "/Fact/Part0/Segment_null") + .withCsvInput(Schema.parseJson(schema)).build() var i = 0 while (i < 100) { @@ -108,7 +100,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be //TO DO, need to remove segment dependency and tableIdentifier Dependency test("read carbondata files (sdk Writer Output) using the carbonfile ") { - buildTestData(false) + buildTestData() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -148,7 +140,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be } test("should not allow to alter datasource carbontable ") { - buildTestData(false) + buildTestData() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -171,7 +163,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be } test("Read sdk writer output file without index file should fail") { - buildTestData(false) + buildTestData() deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -196,7 +188,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be // TODO: Make the sparkCarbonFileFormat to work without index file test("Read sdk writer output file without Carbondata file should fail") { - buildTestData(false) + buildTestData() deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -218,7 +210,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be test("Read sdk writer output file without any file should fail") { - buildTestData(false) + buildTestData() deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) assert(new File(writerPath).exists()) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index f6d12ab..0c55d1b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -112,16 +112,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { def buildTestDataWithSortColumns(sortColumns: List[String]): Any = { FileUtils.deleteDirectory(new File(writerPath)) - buildTestData(3, false, null, sortColumns) + buildTestData(3, null, sortColumns) } def buildTestData(rows: Int, persistSchema: Boolean, options: util.Map[String, String]): Any = { - buildTestData(rows, persistSchema, options, List("name")) + buildTestData(rows, options, List("name")) } // prepare sdk writer output def buildTestData(rows: Int, - persistSchema: Boolean, options: util.Map[String, String], sortColumns: List[String]): Any = { val schema = new StringBuilder() @@ -135,33 +134,18 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { try { val builder = CarbonWriter.builder() val writer = - if (persistSchema) { - builder.persistSchemaFile(true) - builder + if (options != null) { + builder.outputPath(writerPath) .sortBy(sortColumns.toArray) - .outputPath(writerPath) - .isTransactionalTable(false) - .uniqueIdentifier(System.currentTimeMillis) - .buildWriterForCSVInput(Schema.parseJson(schema), - sqlContext.sparkContext.hadoopConfiguration) + .uniqueIdentifier( + System.currentTimeMillis).withBlockSize(2).withLoadOptions(options) + .withCsvInput(Schema.parseJson(schema)).build() } else { - if (options != null) { - builder.outputPath(writerPath) - .isTransactionalTable(false) - .sortBy(sortColumns.toArray) - .uniqueIdentifier( - System.currentTimeMillis).withBlockSize(2).withLoadOptions(options) - .buildWriterForCSVInput(Schema.parseJson(schema), - sqlContext.sparkContext.hadoopConfiguration) - } else { - builder.outputPath(writerPath) - .isTransactionalTable(false) - .sortBy(sortColumns.toArray) - .uniqueIdentifier( - System.currentTimeMillis).withBlockSize(2) - .buildWriterForCSVInput(Schema.parseJson(schema), - sqlContext.sparkContext.hadoopConfiguration) - } + builder.outputPath(writerPath) + .sortBy(sortColumns.toArray) + .uniqueIdentifier( + System.currentTimeMillis).withBlockSize(2) + .withCsvInput(Schema.parseJson(schema)).build() } var i = 0 while (i < rows) { @@ -195,11 +179,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val builder = CarbonWriter.builder() val writer = builder.outputPath(writerPath) - .isTransactionalTable(false) .uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns) - .buildWriterForCSVInput(new Schema(fields), - sqlContext.sparkContext.hadoopConfiguration) - + .withCsvInput(new Schema(fields)).build() var i = 0 while (i < rows) { writer.write(Array[String]("true", String.valueOf(i), String.valueOf(i.toDouble / 2))) @@ -228,12 +209,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val builder = CarbonWriter.builder() val writer = builder.outputPath(writerPath) - .isTransactionalTable(false) .sortBy(sortColumns.toArray) .uniqueIdentifier( 123).withBlockSize(2) - .buildWriterForCSVInput(Schema.parseJson(schema), - sqlContext.sparkContext.hadoopConfiguration) + .withCsvInput(Schema.parseJson(schema)).build() var i = 0 while (i < rows) { writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) @@ -1004,12 +983,11 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { fields(2) = new Field("mydate", DataTypes.DATE) fields(3) = new Field("mytime", DataTypes.TIMESTAMP) - val builder: CarbonWriterBuilder = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).withLoadOptions(options) + val builder: CarbonWriterBuilder = CarbonWriter.builder.outputPath(writerPath) + .withLoadOptions(options) - val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields), - sqlContext.sparkContext.hadoopConfiguration) - writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00")); + val writer: CarbonWriter = builder.withCsvInput(new Schema(fields)).build() + writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00")) writer.close() assert(new File(writerPath).exists()) @@ -1132,9 +1110,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json, mySchema) try { val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false) - .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath) + .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build() var i = 0 while (i < rows) { writer.write(record) @@ -2108,8 +2085,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { assert(intercept[RuntimeException] { val writer = CarbonWriter.builder.sortBy(Array("name", "id")) - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() }.getMessage.toLowerCase.contains("column: name specified in sort columns")) @@ -2149,8 +2125,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() } @@ -2188,8 +2163,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder.sortBy(Array("id")) - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() } @@ -2233,8 +2207,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() } @@ -2274,8 +2247,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -2321,8 +2293,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val record = testUtil.jsonToAvro(json1, schema1) val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -2369,8 +2340,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -2390,7 +2360,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val writer: CarbonWriter = CarbonWriter.builder .outputPath(writerPath) .withTableProperties(options) - .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration) + .withCsvInput(new Schema(fields)).build() writer.write(Array("carbon", "1")) writer.write(Array("hydrogen", "10")) writer.write(Array("boron", "4")) @@ -2408,7 +2378,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { // write local sort data val writer1: CarbonWriter = CarbonWriter.builder .outputPath(writerPath) - .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration) + .withCsvInput(new Schema(fields)).build() writer1.write(Array("carbon", "1")) writer1.write(Array("hydrogen", "10")) writer1.write(Array("boron", "4")) @@ -2422,7 +2392,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { test("test LocalDictionary with True") { FileUtils.deleteDirectory(new File(writerPath)) - val builder = CarbonWriter.builder.isTransactionalTable(false) + val builder = CarbonWriter.builder .sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true) .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath) generateCarbonData(builder) @@ -2447,7 +2417,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { "sort_columns" -> "name", "local_dictionary_threshold" -> "200", "local_dictionary_enable" -> "true").asJava - val builder = CarbonWriter.builder.isTransactionalTable(false) + val builder = CarbonWriter.builder .withTableProperties(tablePropertiesMap) .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath) generateCarbonData(builder) @@ -2467,7 +2437,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { test("test Local Dictionary with FallBack") { FileUtils.deleteDirectory(new File(writerPath)) - val builder = CarbonWriter.builder.isTransactionalTable(false) + val builder = CarbonWriter.builder .sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true) .localDictionaryThreshold(5) .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath) @@ -2488,7 +2458,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { test("test local dictionary with External Table data load ") { FileUtils.deleteDirectory(new File(writerPath)) - val builder = CarbonWriter.builder.isTransactionalTable(false) + val builder = CarbonWriter.builder .sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true) .localDictionaryThreshold(200) .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath) @@ -2517,8 +2487,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { fields(0) = new Field("name", DataTypes.STRING) fields(1) = new Field("surname", DataTypes.STRING) fields(2) = new Field("age", DataTypes.INT) - val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields), - sqlContext.sparkContext.hadoopConfiguration) + val carbonWriter = builder.withCsvInput(new Schema(fields)).build() var i = 0 while (i < 100) { { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala index b060ec1..778913e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala @@ -439,12 +439,12 @@ class TestNonTransactionalCarbonTableForMapType extends QueryTest with BeforeAnd val json = """ {"name":"bob", "age":10, "arrayRecord": [{"101": "Rahul", "102": "Pawan"}]} """.stripMargin nonTransactionalCarbonTable.WriteFilesWithAvroWriter(2, mySchema, json) - val reader = CarbonReader.builder(writerPath, "_temp").isTransactionalTable(false).build(conf) + val reader = CarbonReader.builder(writerPath, "_temp").build() reader.close() val exception1 = intercept[Exception] { val reader1 = CarbonReader.builder(writerPath, "_temp") - .projection(Array[String] { "arrayRecord.houseDetails" }).isTransactionalTable(false) - .build(conf) + .projection(Array[String] { "arrayRecord.houseDetails" }) + .build() reader1.close() } assert(exception1.getMessage http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala index 17aae1d..7ad698c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala @@ -95,11 +95,10 @@ class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAnd try { var options: util.Map[String, String] = Map("bAd_RECords_action" -> "FAIL", "quotechar" -> "\"").asJava val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false) + .outputPath(writerPath) .uniqueIdentifier(System.currentTimeMillis()) .withLoadOptions(options) - .buildWriterForJsonInput(carbonSchema, - sqlContext.sparkContext.hadoopConfiguration) + .withJsonInput(carbonSchema).build() writer.write(jsonRow) writer.close() } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala index e7fcf95..d5da794 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala @@ -92,8 +92,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -139,8 +138,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(mySchema) val record = testUtil.jsonToAvro(json1, mySchema) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -183,8 +181,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(mySchema) val record = testUtil.jsonToAvro(json, mySchema) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -214,8 +211,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -244,8 +240,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -274,8 +269,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -304,8 +298,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -334,8 +327,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -364,8 +356,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) val exception1 = intercept[UnsupportedOperationException] { - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() } @@ -401,8 +392,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -437,8 +427,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -475,8 +464,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -508,8 +496,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -567,8 +554,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -662,8 +648,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -715,8 +700,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef avroRec.put("union_field", bytes1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(avroRec) writer.close() sql( @@ -785,8 +769,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef avroRec.put("record2", 10.24) avroRec.put("struct_field_decimal", genericByteArray) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(avroRec) writer.close() sql( @@ -857,8 +840,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef avroRec.put("age", 10) avroRec.put("dec_fields", genericByteArray) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(avroRec) writer.close() sql( @@ -904,9 +886,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false) - .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -952,9 +932,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false) - .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -998,8 +976,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) val avroRec = new GenericData. Record(nn) avroRec.put("id", bytes1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(avroRec) writer.close() sql( @@ -1043,8 +1020,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) val avroRec = new GenericData. Record(nn) avroRec.put("dec_field", bytes1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(avroRec) writer.close() sql( @@ -1088,8 +1064,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) val avroRec = new GenericData. Record(nn) avroRec.put("dec_field", bytes1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(avroRec) writer.close() sql( @@ -1129,8 +1104,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef s"""{"dec_field":"$data"}""".stripMargin val avroRec = new GenericData. Record(nn) avroRec.put("dec_field", bytes) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(avroRec) writer.close() sql( @@ -1171,8 +1145,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val avroRec = new GenericData. Record(nn) avroRec.put("dec_field", bytes) val exception1 = intercept[Exception] { - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(avroRec) writer.close() } @@ -1221,8 +1194,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -1257,8 +1229,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef val nn = new org.apache.avro.Schema.Parser().parse(schema1) val record = testUtil.jsonToAvro(json1, schema1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(record) writer.close() sql( @@ -1307,8 +1278,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef avroRec.put("union_field", bytes1) - val writer = CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build() writer.write(avroRec) writer.close() sql(s"create table sdkOutputTable(union_field struct<union_field0:decimal(10,2),union_field1:int>) " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala index 0421ea8..42bb791 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala @@ -66,15 +66,13 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo try { val writer = if (isLocalDictionary) { CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false).enableLocalDictionary(true) + .outputPath(writerPath).enableLocalDictionary(true) .localDictionaryThreshold(2000) - .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build() } else { CarbonWriter.builder - .outputPath(writerPath).isTransactionalTable(false) - .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, - sqlContext.sparkContext.hadoopConfiguration) + .outputPath(writerPath) + .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build() } var i = 0 while (i < rows) { @@ -270,8 +268,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo """.stripMargin val pschema= org.apache.avro.Schema.parse(mySchema) val records = testUtil.jsonToAvro(jsonvalue, mySchema) - val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema, - sqlContext.sparkContext.hadoopConfiguration) + val writer = CarbonWriter.builder().outputPath(writerPath).withAvroInput(pschema).build() writer.write(records) writer.close() sql("DROP TABLE IF EXISTS sdkOutputTable") http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala deleted file mode 100644 index a4e58e7..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.createTable - -import java.io.File - -import org.apache.commons.io.FileUtils -import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.SparkSession - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.filesystem.CarbonFile -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} - - -object TestSparkCarbonFileFormatWithSparkSession { - - var writerPath = new File(this.getClass.getResource("/").getPath - + - "../." + - "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") - .getCanonicalPath - //getCanonicalPath gives path with \, but the code expects /. - writerPath = writerPath.replace("\\", "/"); - - val filePath = writerPath + "/Fact/Part0/Segment_null/" - - def buildTestData(persistSchema:Boolean) = { - - FileUtils.deleteDirectory(new File(writerPath)) - - val schema = new StringBuilder() - .append("[ \n") - .append(" {\"name\":\"string\"},\n") - .append(" {\"age\":\"int\"},\n") - .append(" {\"height\":\"double\"}\n") - .append("]") - .toString() - - try { - val builder = CarbonWriter.builder().isTransactionalTable(true) - val writer = - if (persistSchema) { - builder.persistSchemaFile(true) - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new - Configuration(false)) - } else { - builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new - Configuration(false)) - } - - var i = 0 - while (i < 100) { - writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) - i += 1 - } - writer.close() - } catch { - case ex: Throwable => throw new RuntimeException(ex) - } - } - - def cleanTestData() = { - FileUtils.deleteDirectory(new File(writerPath)) - } - - def deleteIndexFile(path: String, extension: String) : Unit = { - val file: CarbonFile = FileFactory - .getCarbonFile(path, FileFactory.getFileType(path)) - - for (eachDir <- file.listFiles) { - if (!eachDir.isDirectory) { - if (eachDir.getName.endsWith(extension)) { - CarbonUtil.deleteFoldersAndFilesSilent(eachDir) - } - } else { - deleteIndexFile(eachDir.getPath, extension) - } - } - } - - def main(args: Array[String]): Unit = { - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val storeLocation = s"$rootPath/examples/spark2/target/store" - val warehouse = s"$rootPath/examples/spark2/target/warehouse" - val metastoredb = s"$rootPath/examples/spark2/target/metastore_db" - - // clean data folder - if (true) { - val clean = (path: String) => FileUtils.deleteDirectory(new File(path)) - clean(storeLocation) - clean(warehouse) - clean(metastoredb) - } - - val spark = SparkSession - .builder() - .master("local") - .appName("TestSparkCarbonFileFormatWithSparkSession") - .enableHiveSupport() - .config("spark.sql.warehouse.dir", warehouse) - .config("javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=$metastoredb;create=true") - .getOrCreate() - - CarbonProperties.getInstance() - .addProperty("carbon.storelocation", storeLocation) - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") - .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") - buildTestData(false) - assert(new File(filePath).exists()) - //data source file format - if (spark.sparkContext.version.startsWith("2.1")) { - //data source file format - spark.sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """) - } else if (spark.sparkContext.version.startsWith("2.2")) { - //data source file format - spark.sql( - s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION - |'$filePath' """.stripMargin) - } else{ - // TO DO - } - - spark.sql("Describe formatted sdkOutputTable").show(false) - - spark.sql("select * from sdkOutputTable").show(false) - - spark.sql("select * from sdkOutputTable limit 3").show(false) - - spark.sql("select name from sdkOutputTable").show(false) - - spark.sql("select age from sdkOutputTable").show(false) - - spark.sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false) - - spark.sql("select * from sdkOutputTable where name = 'robot3'").show(200,false) - - spark.sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false) - - spark.sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false) - - spark.sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false) - - spark.sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false) - - spark.sql("select count(*) from sdkOutputTable").show(200,false) - - spark.sql("DROP TABLE sdkOutputTable") - // drop table should not delete the files - assert(new File(filePath).exists()) - cleanTestData() - - spark.stop() - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala index 337b13b..59cf2d8 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala @@ -220,7 +220,6 @@ object CarbonSparkDataSourceUtil { new Field(field.name, dataType) }) val builder = new CarbonWriterBuilder - builder.isTransactionalTable(false) builder.outputPath(options.getOrElse("path", "")) val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt) if (blockSize.isDefined) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala index 36f6061..3be8cb3 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala @@ -983,10 +983,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { val builder = CarbonWriter.builder() val writer = builder.outputPath(path) - .isTransactionalTable(false) .uniqueIdentifier(System.nanoTime()).withBlockSize(2) - .buildWriterForCSVInput(new Schema(structType), spark.sparkContext - .hadoopConfiguration) + .withCsvInput(new Schema(structType)).build() var i = 0 while (i < 11) { @@ -1032,9 +1030,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { val builder = CarbonWriter.builder() val writer = builder.outputPath(path) - .isTransactionalTable(false) .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(Array("bytefield")) - .buildWriterForCSVInput(new Schema(fields), spark.sparkContext.hadoopConfiguration) + .withCsvInput(new Schema(fields)).build() var i = 0 while (i < 11) { @@ -1087,10 +1084,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { val builder = CarbonWriter.builder() val writer = builder.outputPath(path) - .isTransactionalTable(false) .uniqueIdentifier(System.nanoTime()).withBlockSize(2) - .buildWriterForCSVInput(new Schema(structType), spark.sparkContext - .hadoopConfiguration) + .withCsvInput(new Schema(structType)).build() var i = 0 while (i < 10) { @@ -1160,9 +1155,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { val builder = CarbonWriter.builder() val writer = builder.outputPath(writerPath) - .isTransactionalTable(false) .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns) - .buildWriterForCSVInput(new Schema(fields), spark.sparkContext.hadoopConfiguration) + .withCsvInput(new Schema(fields)).build() var i = 0 while (i < rows) { @@ -1201,9 +1195,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { val builder = CarbonWriter.builder() val writer = builder.outputPath(path) - .isTransactionalTable(false) .uniqueIdentifier(System.nanoTime()).withBlockSize(2) - .buildWriterForCSVInput(new Schema(fields), spark.sparkContext.hadoopConfiguration) + .withCsvInput(new Schema(fields)).build() var i = 0 while (i < 33000) {