[CARBONDATA-2961] Simplify SDK API interfaces

[CARBONDATA-2961] Simplify SDK API interfaces

problem: current SDK API interfaces are not simpler and don't follow builder 
pattern.
If new features are added, it will become more complex.

Solution: Simplify the SDK interfaces as per builder pattern.

Refer the latest sdk-guide.

Added:

changes in Carbon Writer:
public CarbonWriterBuilder withThreadSafe(short numOfThreads)
public CarbonWriterBuilder withHadoopConf(Configuration conf)

public CarbonWriterBuilder withCsvInput(Schema schema)
public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema)
public CarbonWriterBuilder withJsonInput(Schema carbonSchema)

public CarbonWriter build() throws IOException, InvalidLoadOptionException

Changes in carbon Reader
public CarbonReaderBuilder withHadoopConf(Configuration conf)
public CarbonWriter build() throws IOException, InvalidLoadOptionException

Removed:

changes in Carbon Writer:
public CarbonWriterBuilder isTransactionalTable(boolean isTransactionalTable)

public CarbonWriterBuilder persistSchemaFile(boolean persist)

setAccessKey
setAccessKey
setSecretKey
setSecretKey
setEndPoint
setEndPoint

public CarbonWriter buildWriterForCSVInput(Schema schema, Configuration 
configuration)
public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short 
numOfThreads,Configuration configuration)
public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema 
avroSchema,Configuration configuration)
public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema 
avroSchema,short numOfThreads, Configuration configuration)
public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, 
Configuration configuration)
public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, 
short numOfThreads,Configuration configuration)

Changes in carbon Reader
public CarbonReaderBuilder isTransactionalTable(boolean isTransactionalTable)
public CarbonWriter build(Configuration conf) throws IOException, 
InvalidLoadOptionException

This closes #2961


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6a2a94d0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6a2a94d0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6a2a94d0

Branch: refs/heads/master
Commit: 6a2a94d057d33435c16405d1da2e682cb748a77d
Parents: 1d4d240
Author: ajantha-bhat <ajanthab...@gmail.com>
Authored: Fri Sep 21 16:32:56 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Tue Sep 25 20:59:50 2018 +0530

----------------------------------------------------------------------
 docs/sdk-guide.md                               | 208 +++------
 .../examples/sdk/CarbonReaderExample.java       |   7 +-
 .../carbondata/examples/sdk/SDKS3Example.java   |  21 +-
 .../examples/sdk/SDKS3ReadExample.java          |   6 +-
 .../carbondata/examples/DirectSQLExample.scala  |  12 +-
 .../carbondata/examples/S3UsingSDkExample.scala |  18 +-
 .../sdv/generated/SDKwriterTestCase.scala       |  67 +--
 ...FileInputFormatWithExternalCarbonTable.scala |  30 +-
 .../TestNonTransactionalCarbonTable.scala       |  99 ++---
 ...tNonTransactionalCarbonTableForMapType.scala |   6 +-
 ...tNonTransactionalCarbonTableJsonWriter.scala |   5 +-
 ...ansactionalCarbonTableWithAvroDataType.scala |  86 ++--
 ...ransactionalCarbonTableWithComplexType.scala |  13 +-
 ...tSparkCarbonFileFormatWithSparkSession.scala | 176 --------
 .../datasources/CarbonSparkDataSourceUtil.scala |   1 -
 .../datasource/SparkCarbonDataSourceTest.scala  |  17 +-
 ...tCreateTableUsingSparkCarbonFileFormat.scala |  85 ++--
 .../sdk/file/CarbonReaderBuilder.java           | 105 +----
 .../sdk/file/CarbonWriterBuilder.java           | 342 ++++-----------
 .../apache/carbondata/sdk/file/TestUtil.java    |  44 +-
 .../carbondata/store/MetaCachedCarbonStore.java |  21 +-
 .../sdk/file/AvroCarbonWriterTest.java          |  72 +--
 .../sdk/file/CSVCarbonWriterTest.java           |  86 ++--
 .../CSVNonTransactionalCarbonWriterTest.java    | 298 -------------
 .../carbondata/sdk/file/CarbonReaderTest.java   | 434 ++++---------------
 .../sdk/file/ConcurrentAvroSdkWriterTest.java   |   9 +-
 .../sdk/file/ConcurrentSdkWriterTest.java       |   9 +-
 .../carbondata/store/LocalCarbonStoreTest.java  |   2 +-
 28 files changed, 486 insertions(+), 1793 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index dc21efd..d1e4bc5 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -67,9 +67,9 @@ These SDK writer output contains just a carbondata and 
carbonindex files. No met
 
      CarbonProperties.getInstance().addProperty("enable.offheap.sort", 
enableOffheap);
  
-     CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
+     CarbonWriterBuilder builder = 
CarbonWriter.builder().outputPath(path).withCsvInput(schema);
  
-     CarbonWriter writer = builder.buildWriterForCSVInput(schema);
+     CarbonWriter writer = builder.build();
  
      int rows = 5;
      for (int i = 0; i < rows; i++) {
@@ -124,7 +124,7 @@ public class TestSdkAvro {
     try {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
-          .buildWriterForAvroInput(new 
org.apache.avro.Schema.Parser().parse(avroSchema));
+          .withAvroInput(new 
org.apache.avro.Schema.Parser().parse(avroSchema)).build();
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -164,10 +164,10 @@ public class TestSdkJson {
 
     Schema CarbonSchema = new Schema(fields);
 
-    CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
+    CarbonWriterBuilder builder = 
CarbonWriter.builder().outputPath(path).withJsonInput(CarbonSchema);
 
     // initialize json writer with carbon schema
-    CarbonWriter writer = builder.buildWriterForJsonInput(CarbonSchema);
+    CarbonWriter writer = builder.build();
     // one row of json Data as String
     String  JsonRow = "{\"name\":\"abcd\", \"age\":10}";
 
@@ -199,7 +199,7 @@ Each of SQL data types are mapped into data types of SDK. 
Following are the mapp
 | DECIMAL | DataTypes.createDecimalType(precision, scale) |
 
 **NOTE:**
- Carbon Supports below logical types of AVRO.
+ 1. Carbon Supports below logical types of AVRO.
  a. Date
     The date logical type represents a date within the calendar, with no 
reference to a particular time zone or time of day.
     A date logical type annotates an Avro int, where the int stores the number 
of days from the unix epoch, 1 January 1970 (ISO calendar). 
@@ -211,7 +211,9 @@ Each of SQL data types are mapped into data types of SDK. 
Following are the mapp
     A timestamp-micros logical type annotates an Avro long, where the long 
stores the number of microseconds from the unix epoch, 1 January 1970 
00:00:00.000000 UTC.
     
     Currently the values of logical types are not validated by carbon. 
-    Expect that avro record passed by the user is already validated by avro 
record generator tools.   
+    Expect that avro record passed by the user is already validated by avro 
record generator tools.    
+ 2. If the string data is more than 32K in length, use withTableProperties() 
with "long_string_columns" property
+    or directly use DataTypes.VARCHAR if it is carbon schema.      
 
 ## Run SQL on files directly
 Instead of creating table and query it, you can also query that file directly 
with SQL.
@@ -236,20 +238,6 @@ public CarbonWriterBuilder outputPath(String path);
 
 ```
 /**
-* If set false, writes the carbondata and carbonindex files in a flat folder 
structure
-* @param isTransactionalTable is a boolelan value
-*             if set to false, then writes the carbondata and carbonindex files
-*                                                            in a flat folder 
structure.
-*             if set to true, then writes the carbondata and carbonindex files
-*                                                            in segment folder 
structure..
-*             By default set to false.
-* @return updated CarbonWriterBuilder
-*/
-public CarbonWriterBuilder isTransactionalTable(boolean isTransactionalTable);
-```
-
-```
-/**
 * to set the timestamp in the carbondata and carbonindex index files
 * @param UUID is a timestamp to be used in the carbondata and carbonindex 
index files.
 *             By default set to zero.
@@ -308,16 +296,6 @@ public CarbonWriterBuilder sortBy(String[] sortColumns);
 
 ```
 /**
-* If set, create a schema file in metadata folder.
-* @param persist is a boolean value, If set to true, creates a schema file in 
metadata folder.
-*                By default set to false. will not create metadata folder
-* @return updated CarbonWriterBuilder
-*/
-public CarbonWriterBuilder persistSchemaFile(boolean persist);
-```
-
-```
-/**
 * sets the taskNo for the writer. SDKs concurrently running
 * will set taskNo in order to avoid conflicts in file's name during write.
 * @param taskNo is the TaskNo user wants to specify.
@@ -370,6 +348,7 @@ public CarbonWriterBuilder withLoadOptions(Map<String, 
String> options);
 * c. local_dictionary_threshold -- positive value, default is 10000
 * d. local_dictionary_enable -- true / false. Default is false
 * e. sort_columns -- comma separated column. "c1,c2". Default all dimensions 
are sorted.
+                     If empty string "" is passed. No columns are sorted
 * j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is 
"local_sort"
 * k. long_string_columns -- comma separated string columns which are more than 
32k length. 
 *                           default value is null.
@@ -379,91 +358,69 @@ public CarbonWriterBuilder withLoadOptions(Map<String, 
String> options);
 public CarbonWriterBuilder withTableProperties(Map<String, String> options);
 ```
 
-
 ```
 /**
-* this writer is not thread safe, use buildThreadSafeWriterForCSVInput in 
multi thread environment
-* Build a {@link CarbonWriter}, which accepts row in CSV format object
-* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
-* @param configuration hadoop configuration object.
-* @return CSVCarbonWriter
-* @throws IOException
-* @throws InvalidLoadOptionException
+* To make sdk writer thread safe.
+*
+* @param numOfThreads should number of threads in which writer is called in 
multi-thread scenario
+*                     default sdk writer is not thread safe.
+*                     can use one writer instance in one thread only.
+* @return updated CarbonWriterBuilder
 */
-public CarbonWriter 
buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema, 
Configuration configuration) throws IOException, InvalidLoadOptionException;
+public CarbonWriterBuilder withThreadSafe(short numOfThreads);
 ```
 
 ```
 /**
-* Can use this writer in multi-thread instance.
-* Build a {@link CarbonWriter}, which accepts row in CSV format
-* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
-* @param numOfThreads number of threads() in which .write will be called.    
-* @param configuration hadoop configuration object          
-* @return CSVCarbonWriter
-* @throws IOException
-* @throws InvalidLoadOptionException
+* To support hadoop configuration
+*
+* @param conf hadoop configuration support, can set s3a AK,SK,end point and 
other conf with this
+* @return updated CarbonWriterBuilder
 */
-public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short 
numOfThreads, Configuration configuration)
-  throws IOException, InvalidLoadOptionException;
+public CarbonWriterBuilder withHadoopConf(Configuration conf)
 ```
 
-
-```  
+```
 /**
-* this writer is not thread safe, use buildThreadSafeWriterForAvroInput in 
multi thread environment
-* Build a {@link CarbonWriter}, which accepts Avro format object
-* @param avroSchema avro Schema object {org.apache.avro.Schema}
-* @param configuration hadoop configuration object
-* @return AvroCarbonWriter 
-* @throws IOException
-* @throws InvalidLoadOptionException
+* to build a {@link CarbonWriter}, which accepts row in CSV format
+*
+* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
+* @return CarbonWriterBuilder
 */
-public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema, 
Configuration configuration) throws IOException, InvalidLoadOptionException;
+public CarbonWriterBuilder withCsvInput(Schema schema);
 ```
 
 ```
 /**
-* Can use this writer in multi-thread instance.
-* Build a {@link CarbonWriter}, which accepts Avro object
+* to build a {@link CarbonWriter}, which accepts Avro object
+*
 * @param avroSchema avro Schema object {org.apache.avro.Schema}
-* @param numOfThreads number of threads() in which .write will be called.
-* @param configuration hadoop configuration object
-* @return AvroCarbonWriter
-* @throws IOException
-* @throws InvalidLoadOptionException
+* @return CarbonWriterBuilder
 */
-public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema 
avroSchema, short 
-numOfThreads, Configuration configuration)
-  throws IOException, InvalidLoadOptionException
+public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema);
 ```
 
-
 ```
 /**
-* this writer is not thread safe, use buildThreadSafeWriterForJsonInput in 
multi thread environment
-* Build a {@link CarbonWriter}, which accepts Json object
+* to build a {@link CarbonWriter}, which accepts Json object
+*
 * @param carbonSchema carbon Schema object
-* @param configuration hadoop configuration object
-* @return JsonCarbonWriter
-* @throws IOException
-* @throws InvalidLoadOptionException
+* @return CarbonWriterBuilder
 */
-public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, 
Configuration configuration);
+public CarbonWriterBuilder withJsonInput(Schema carbonSchema);
 ```
 
 ```
 /**
-* Can use this writer in multi-thread instance.
-* Build a {@link CarbonWriter}, which accepts Json object
-* @param carbonSchema carbon Schema object
-* @param numOfThreads number of threads() in which .write will be called.
-* @param configuration hadoop configuraiton object.
-* @return JsonCarbonWriter
+* Build a {@link CarbonWriter}
+* This writer is not thread safe,
+* use withThreadSafe() configuration in multi thread environment
+* 
+* @return CarbonWriter {AvroCarbonWriter/CSVCarbonWriter/JsonCarbonWriter 
based on Input Type }
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, 
short numOfThreads, Configuration configuration)
+public CarbonWriter build() throws IOException, InvalidLoadOptionException;
 ```
 
 ### Class org.apache.carbondata.sdk.file.CarbonWriter
@@ -474,7 +431,6 @@ public JsonCarbonWriter 
buildThreadSafeWriterForJsonInput(Schema carbonSchema, s
 *                      which is one row of data.
 * If CSVCarbonWriter, object is of type String[], which is one row of data
 * If JsonCarbonWriter, object is of type String, which is one row of json
-* Note: This API is not thread safe if writer is not built with number of 
threads argument.
 * @param object
 * @throws IOException
 */
@@ -648,19 +604,6 @@ Find example code at 
[CarbonReaderExample](https://github.com/apache/carbondata/
 ```
 
 ```
-  /**
-   * Configure the transactional status of table
-   * If set to false, then reads the carbondata and carbonindex files from a 
flat folder structure.
-   * If set to true, then reads the carbondata and carbonindex files from 
segment folder structure.
-   * Default value is false
-   *
-   * @param isTransactionalTable whether is transactional table or not
-   * @return CarbonReaderBuilder object
-   */
-  public CarbonReaderBuilder isTransactionalTable(boolean 
isTransactionalTable);
-```
-
-```
  /**
   * Configure the filter expression for carbon reader
   *
@@ -671,66 +614,13 @@ Find example code at 
[CarbonReaderExample](https://github.com/apache/carbondata/
 ```
 
 ```
-  /**
-   * Set the access key for S3
-   *
-   * @param key   the string of access key for different S3 type,like: 
fs.s3a.access.key
-   * @param value the value of access key
-   * @return CarbonWriterBuilder
-   */
-  public CarbonReaderBuilder setAccessKey(String key, String value);
-```
-
-```
-  /**
-   * Set the access key for S3.
-   *
-   * @param value the value of access key
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setAccessKey(String value);
-```
-
-```
-  /**
-   * Set the secret key for S3
-   *
-   * @param key   the string of secret key for different S3 type,like: 
fs.s3a.secret.key
-   * @param value the value of secret key
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setSecretKey(String key, String value);
-```
-
-```
-  /**
-   * Set the secret key for S3
-   *
-   * @param value the value of secret key
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setSecretKey(String value);
-```
-
-```
- /**
-   * Set the endpoint for S3
-   *
-   * @param key   the string of endpoint for different S3 type,like: 
fs.s3a.endpoint
-   * @param value the value of endpoint
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setEndPoint(String key, String value);
-```
-
-``` 
-  /**
-   * Set the endpoint for S3
-   *
-   * @param value the value of endpoint
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setEndPoint(String value);
+/**
+ * To support hadoop configuration
+ *
+ * @param conf hadoop configuration support, can set s3a AK,SK,end point and 
other conf with this
+ * @return updated CarbonReaderBuilder
+ */
+ public CarbonReaderBuilder withHadoopConf(Configuration conf);
 ```
 
 ```

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
index 76926ce..9e80567 100644
--- 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
+++ 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
 
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.sdk.file.CarbonReader;
@@ -62,7 +61,7 @@ public class CarbonReaderExample {
             CarbonWriter writer = CarbonWriter.builder()
                 .outputPath(path)
                 .withLoadOptions(map)
-                .buildWriterForCSVInput(new Schema(fields), new 
Configuration(false));
+                .withCsvInput(new Schema(fields)).build();
 
             for (int i = 0; i < 10; i++) {
                 String[] row2 = new String[]{
@@ -107,7 +106,7 @@ public class CarbonReaderExample {
             CarbonReader reader = CarbonReader
                 .builder(path, "_temp")
                 .projection(strings)
-                .build(new Configuration(false));
+                .build();
 
             System.out.println("\nData:");
             long day = 24L * 3600 * 1000;
@@ -132,7 +131,7 @@ public class CarbonReaderExample {
             // Read data
             CarbonReader reader2 = CarbonReader
                 .builder(path, "_temp")
-                .build(new Configuration(false));
+                .build();
 
             System.out.println("\nData:");
             i = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index 3abc342..d4f49f5 100644
--- 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -26,6 +26,7 @@ import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.sdk.file.*;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
 
 /**
  * Example for testing CarbonWriter on S3
@@ -49,16 +50,16 @@ public class SDKS3Example {
             num = Integer.parseInt(args[4]);
         }
 
+        Configuration conf = new Configuration(false);
+        conf.set(Constants.ACCESS_KEY, args[0]);
+        conf.set(Constants.SECRET_KEY, args[1]);
+        conf.set(Constants.ENDPOINT, args[2]);
+
         Field[] fields = new Field[2];
         fields[0] = new Field("name", DataTypes.STRING);
         fields[1] = new Field("age", DataTypes.INT);
-        CarbonWriterBuilder builder = CarbonWriter.builder()
-                .setAccessKey(args[0])
-                .setSecretKey(args[1])
-                .setEndPoint(args[2])
-                .outputPath(path);
-
-        CarbonWriter writer = builder.buildWriterForCSVInput(new 
Schema(fields), new Configuration(false));
+        CarbonWriterBuilder builder = 
CarbonWriter.builder().outputPath(path).withHadoopConf(conf);
+        CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
 
         for (int i = 0; i < num; i++) {
             writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
@@ -74,10 +75,8 @@ public class SDKS3Example {
             .builder(path, "_temp")
             .projection(new String[]{"name", "age"})
             .filter(equalToExpression)
-            .setAccessKey(args[0])
-            .setSecretKey(args[1])
-            .setEndPoint(args[2])
-            .build(new Configuration(false));
+            .withHadoopConf(conf)
+            .build();
 
         System.out.println("\nData:");
         int i = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
index a236175..1fac673 100644
--- 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
+++ 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
@@ -62,7 +62,8 @@ public class SDKS3ReadExample {
             .builder(path, "_temp")
             .projection(new String[]{"name", "age"})
             .filter(equalToExpression)
-            .build(configuration);
+            .withHadoopConf(configuration)
+            .build();
 
         System.out.println("\nData:");
         int i = 0;
@@ -78,7 +79,8 @@ public class SDKS3ReadExample {
         CarbonReader reader2 = CarbonReader
             .builder(path, "_temp")
             .projection(new String[]{"name", "age"})
-            .build(configuration);
+            .withHadoopConf(configuration)
+            .build();
 
         System.out.println("\nData:");
         i = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
index 86bf854..5ddcba0 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
@@ -37,7 +37,7 @@ object DirectSQLExample {
   def buildTestData(
       path: String,
       num: Int = 3,
-      persistSchema: Boolean = false, sparkSession: SparkSession): Any = {
+      sparkSession: SparkSession): Any = {
 
     // getCanonicalPath gives path with \, but the code expects /.
     val writerPath = path.replace("\\", "/");
@@ -51,14 +51,10 @@ object DirectSQLExample {
       val builder = CarbonWriter
         .builder()
         .outputPath(writerPath)
-        .isTransactionalTable(true)
         .uniqueIdentifier(System.currentTimeMillis)
         .withBlockSize(2)
-      if (persistSchema) {
-        builder.persistSchemaFile(true)
-      }
-      val writer = builder
-        .buildWriterForCSVInput(new Schema(fields), 
sparkSession.sparkContext.hadoopConfiguration)
+        .withCsvInput(new Schema(fields))
+      val writer = builder.build()
       var i = 0
       while (i < num) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), 
String.valueOf(i.toDouble / 2)))
@@ -85,7 +81,7 @@ object DirectSQLExample {
     // 1. generate data file
     cleanTestData(path)
     buildTestData(path, 20, sparkSession = carbonSession)
-    val readPath = path + "Fact/Part0/Segment_null"
+    val readPath = path
 
     println("Running SQL on carbon files directly")
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index c5c9710..014bec5 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -48,18 +48,10 @@ object S3UsingSDKExample {
     try {
       val builder = CarbonWriter.builder()
       val writer =
-        if (persistSchema) {
-          builder.persistSchemaFile(true)
-          builder.outputPath(writerPath).isTransactionalTable(true)
-            .uniqueIdentifier(
-              System.currentTimeMillis)
-            .buildWriterForCSVInput(new Schema(fields), new 
Configuration(false))
-        } else {
-          builder.outputPath(writerPath).isTransactionalTable(true)
-            .uniqueIdentifier(
-              System.currentTimeMillis).withBlockSize(2)
-            .buildWriterForCSVInput(new Schema(fields), new 
Configuration(false))
-        }
+        builder.outputPath(writerPath)
+          .uniqueIdentifier(System.currentTimeMillis)
+          .withBlockSize(2)
+          .withCsvInput(new Schema(fields)).build()
       var i = 0
       var row = num
       while (i < row) {
@@ -119,7 +111,7 @@ object S3UsingSDKExample {
 
     spark.sql("DROP TABLE IF EXISTS s3_sdk_table")
     spark.sql(s"CREATE EXTERNAL TABLE s3_sdk_table STORED BY 'carbondata'" +
-      s" LOCATION '$path/Fact/Part0/Segment_null'")
+      s" LOCATION '$path'")
     spark.sql("SELECT * FROM s3_sdk_table LIMIT 10").show()
     spark.stop()
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
index f785b36..668d9d1 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
@@ -70,31 +70,25 @@ class SDKwriterTestCase extends QueryTest with 
BeforeAndAfterEach {
   }
 
   def buildTestDataSingleFile(): Any = {
-    buildTestData(3, false, null)
+    buildTestData(3, null)
   }
 
   def buildTestDataWithBadRecordForce(writerPath: String): Any = {
     var options = Map("bAd_RECords_action" -> "FORCE").asJava
-    buildTestData(3, false, options)
+    buildTestData(3, options)
   }
 
   def buildTestDataWithBadRecordFail(writerPath: String): Any = {
     var options = Map("bAd_RECords_action" -> "FAIL").asJava
-    buildTestData(15001, false, options)
+    buildTestData(15001, options)
   }
 
-  def buildTestData(rows: Int,
-      persistSchema: Boolean,
-      options: util.Map[String, String]): Any = {
-    buildTestData(rows, persistSchema, options, List("name"), writerPath)
+  def buildTestData(rows: Int, options: util.Map[String, String]): Any = {
+    buildTestData(rows, options, List("name"), writerPath)
   }
 
   // prepare sdk writer output
-  def buildTestData(rows: Int,
-      persistSchema: Boolean,
-      options: util.Map[String, String],
-      sortColumns: List[String],
-      writerPath: String): Any = {
+  def buildTestData(rows: Int, options: util.Map[String, String], sortColumns: 
List[String], writerPath: String): Any = {
     val schema = new StringBuilder()
       .append("[ \n")
       .append("   {\"name\":\"string\"},\n")
@@ -106,31 +100,18 @@ class SDKwriterTestCase extends QueryTest with 
BeforeAndAfterEach {
     try {
       val builder = CarbonWriter.builder()
       val writer =
-        if (persistSchema) {
-          builder.persistSchemaFile(true)
-          builder
+        if (options != null) {
+          builder.outputPath(writerPath)
             .sortBy(sortColumns.toArray)
-            .outputPath(writerPath)
-            .isTransactionalTable(false)
-            .uniqueIdentifier(System.currentTimeMillis)
-            .buildWriterForCSVInput(Schema.parseJson(schema), 
sqlContext.sparkContext.hadoopConfiguration)
+            .uniqueIdentifier(
+              
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
+            .withCsvInput(Schema.parseJson(schema)).build()
         } else {
-          if (options != null) {
-            builder.outputPath(writerPath)
-              .isTransactionalTable(false)
-              .sortBy(sortColumns.toArray)
-              .uniqueIdentifier(
-                
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
-              .buildWriterForCSVInput(Schema.parseJson(schema), 
sqlContext.sparkContext.hadoopConfiguration)
-          } else {
-            builder.outputPath(writerPath)
-              .isTransactionalTable(false)
-              .sortBy(sortColumns.toArray)
-              .uniqueIdentifier(
-                System.currentTimeMillis).withBlockSize(2)
-              .buildWriterForCSVInput(Schema.parseJson(schema), 
sqlContext.sparkContext
-                .hadoopConfiguration)
-          }
+          builder.outputPath(writerPath)
+            .sortBy(sortColumns.toArray)
+            .uniqueIdentifier(
+              System.currentTimeMillis).withBlockSize(2)
+            .withCsvInput(Schema.parseJson(schema)).build()
         }
       var i = 0
       while (i < rows) {
@@ -156,12 +137,12 @@ class SDKwriterTestCase extends QueryTest with 
BeforeAndAfterEach {
 
   def buildTestDataWithBadRecordIgnore(writerPath: String): Any = {
     var options = Map("bAd_RECords_action" -> "IGNORE").asJava
-    buildTestData(3, false, options)
+    buildTestData(3, options)
   }
 
   def buildTestDataWithBadRecordRedirect(writerPath: String): Any = {
     var options = Map("bAd_RECords_action" -> "REDIRECT").asJava
-    buildTestData(3, false, options)
+    buildTestData(3, options)
   }
 
   def deleteFile(path: String, extension: String): Unit = {
@@ -545,8 +526,8 @@ class SDKwriterTestCase extends QueryTest with 
BeforeAndAfterEach {
 
     try {
       val writer = CarbonWriter.builder
-        .outputPath(writerPath).isTransactionalTable(false)
-        
.uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+        .outputPath(writerPath)
+        .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -744,8 +725,7 @@ class SDKwriterTestCase extends QueryTest with 
BeforeAndAfterEach {
       .append("]")
       .toString()
     val builder = CarbonWriter.builder()
-    val writer = builder.outputPath(writerPath)
-      .buildWriterForCSVInput(Schema.parseJson(schema), 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
 
     for (i <- 0 until 5) {
       writer.write(Array[String](s"name_$i", 
RandomStringUtils.randomAlphabetic(33000), i.toString))
@@ -768,8 +748,7 @@ class SDKwriterTestCase extends QueryTest with 
BeforeAndAfterEach {
       .append("]")
       .toString()
     val builder = CarbonWriter.builder()
-    val writer = builder.outputPath(writerPath)
-      .buildWriterForCSVInput(Schema.parseJson(schema), 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
     val varCharLen = 4000000
     for (i <- 0 until 3) {
       writer
@@ -802,7 +781,7 @@ class SDKwriterTestCase extends QueryTest with 
BeforeAndAfterEach {
     val writer = builder
       .outputPath(writerPath)
       .sortBy(Array[String]())
-      .buildWriterForCSVInput(Schema.parseJson(schema), new Configuration())
+      .withCsvInput(Schema.parseJson(schema)).build()
 
     for (i <- 0 until 5) {
       writer

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 1b181bc..3ab956c 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -33,16 +33,14 @@ import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
 
 class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with 
BeforeAndAfterAll {
 
-  var writerPath = new File(this.getClass.getResource("/").getPath
-                            +
-                            "../." +
-                            
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+  var writerPath = new File(this.getClass.getResource("/").getPath +
+                            
"../../src/test/resources/SparkCarbonFileFormat/WriterOutput")
     .getCanonicalPath
   //getCanonicalPath gives path with \, but the code expects /.
   writerPath = writerPath.replace("\\", "/");
 
 
-  def buildTestData(persistSchema:Boolean) = {
+  def buildTestData(): Any = {
 
     FileUtils.deleteDirectory(new File(writerPath))
 
@@ -55,16 +53,10 @@ class TestCarbonFileInputFormatWithExternalCarbonTable 
extends QueryTest with Be
       .toString()
 
     try {
-      val builder = CarbonWriter.builder().isTransactionalTable(true)
+      val builder = CarbonWriter.builder()
       val writer =
-      if (persistSchema) {
-        builder.persistSchemaFile(true)
-        
builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
-          sqlContext.sparkContext.hadoopConfiguration)
-      } else {
-        
builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
-          sqlContext.sparkContext.hadoopConfiguration)
-      }
+        builder.outputPath(writerPath + "/Fact/Part0/Segment_null")
+          .withCsvInput(Schema.parseJson(schema)).build()
 
       var i = 0
       while (i < 100) {
@@ -108,7 +100,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable 
extends QueryTest with Be
 
   //TO DO, need to remove segment dependency and tableIdentifier Dependency
   test("read carbondata files (sdk Writer Output) using the carbonfile ") {
-    buildTestData(false)
+    buildTestData()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
 
@@ -148,7 +140,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable 
extends QueryTest with Be
   }
 
   test("should not allow to alter datasource carbontable ") {
-    buildTestData(false)
+    buildTestData()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
 
@@ -171,7 +163,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable 
extends QueryTest with Be
   }
 
   test("Read sdk writer output file without index file should fail") {
-    buildTestData(false)
+    buildTestData()
     deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -196,7 +188,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable 
extends QueryTest with Be
 
   // TODO: Make the sparkCarbonFileFormat to work without index file
   test("Read sdk writer output file without Carbondata file should fail") {
-    buildTestData(false)
+    buildTestData()
     deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -218,7 +210,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable 
extends QueryTest with Be
 
 
   test("Read sdk writer output file without any file should fail") {
-    buildTestData(false)
+    buildTestData()
     deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
     deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
     assert(new File(writerPath).exists())

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index f6d12ab..0c55d1b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -112,16 +112,15 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
   def buildTestDataWithSortColumns(sortColumns: List[String]): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
-    buildTestData(3, false, null, sortColumns)
+    buildTestData(3, null, sortColumns)
   }
 
   def buildTestData(rows: Int, persistSchema: Boolean, options: 
util.Map[String, String]): Any = {
-    buildTestData(rows, persistSchema, options, List("name"))
+    buildTestData(rows, options, List("name"))
   }
 
   // prepare sdk writer output
   def buildTestData(rows: Int,
-      persistSchema: Boolean,
       options: util.Map[String, String],
       sortColumns: List[String]): Any = {
     val schema = new StringBuilder()
@@ -135,33 +134,18 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     try {
       val builder = CarbonWriter.builder()
       val writer =
-        if (persistSchema) {
-          builder.persistSchemaFile(true)
-          builder
+        if (options != null) {
+          builder.outputPath(writerPath)
             .sortBy(sortColumns.toArray)
-            .outputPath(writerPath)
-            .isTransactionalTable(false)
-            .uniqueIdentifier(System.currentTimeMillis)
-            .buildWriterForCSVInput(Schema.parseJson(schema),
-              sqlContext.sparkContext.hadoopConfiguration)
+            .uniqueIdentifier(
+              
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
+            .withCsvInput(Schema.parseJson(schema)).build()
         } else {
-          if (options != null) {
-            builder.outputPath(writerPath)
-              .isTransactionalTable(false)
-              .sortBy(sortColumns.toArray)
-              .uniqueIdentifier(
-                
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
-              .buildWriterForCSVInput(Schema.parseJson(schema),
-                sqlContext.sparkContext.hadoopConfiguration)
-          } else {
-            builder.outputPath(writerPath)
-              .isTransactionalTable(false)
-              .sortBy(sortColumns.toArray)
-              .uniqueIdentifier(
-                System.currentTimeMillis).withBlockSize(2)
-              .buildWriterForCSVInput(Schema.parseJson(schema),
-                sqlContext.sparkContext.hadoopConfiguration)
-          }
+          builder.outputPath(writerPath)
+            .sortBy(sortColumns.toArray)
+            .uniqueIdentifier(
+              System.currentTimeMillis).withBlockSize(2)
+            .withCsvInput(Schema.parseJson(schema)).build()
         }
       var i = 0
       while (i < rows) {
@@ -195,11 +179,8 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       val builder = CarbonWriter.builder()
       val writer =
         builder.outputPath(writerPath)
-          .isTransactionalTable(false)
           
.uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns)
-          .buildWriterForCSVInput(new Schema(fields),
-            sqlContext.sparkContext.hadoopConfiguration)
-
+          .withCsvInput(new Schema(fields)).build()
       var i = 0
       while (i < rows) {
         writer.write(Array[String]("true", String.valueOf(i), 
String.valueOf(i.toDouble / 2)))
@@ -228,12 +209,10 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       val builder = CarbonWriter.builder()
       val writer =
         builder.outputPath(writerPath)
-          .isTransactionalTable(false)
           .sortBy(sortColumns.toArray)
           .uniqueIdentifier(
             123).withBlockSize(2)
-          .buildWriterForCSVInput(Schema.parseJson(schema),
-            sqlContext.sparkContext.hadoopConfiguration)
+          .withCsvInput(Schema.parseJson(schema)).build()
       var i = 0
       while (i < rows) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), 
String.valueOf(i.toDouble / 2)))
@@ -1004,12 +983,11 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     fields(2) = new Field("mydate", DataTypes.DATE)
     fields(3) = new Field("mytime", DataTypes.TIMESTAMP)
 
-    val builder: CarbonWriterBuilder = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).withLoadOptions(options)
+    val builder: CarbonWriterBuilder = 
CarbonWriter.builder.outputPath(writerPath)
+      .withLoadOptions(options)
 
-    val writer: CarbonWriter = builder.buildWriterForCSVInput(new 
Schema(fields),
-      sqlContext.sparkContext.hadoopConfiguration)
-    writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00"));
+    val writer: CarbonWriter = builder.withCsvInput(new Schema(fields)).build()
+    writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00"))
     writer.close()
 
     assert(new File(writerPath).exists())
@@ -1132,9 +1110,8 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json, mySchema)
     try {
       val writer = CarbonWriter.builder
-        .outputPath(writerPath).isTransactionalTable(false)
-        
.uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
-        sqlContext.sparkContext.hadoopConfiguration)
+        .outputPath(writerPath)
+        .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -2108,8 +2085,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
     assert(intercept[RuntimeException] {
       val writer = CarbonWriter.builder.sortBy(Array("name", "id"))
-        
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
-        sqlContext.sparkContext.hadoopConfiguration)
+        .outputPath(writerPath).withAvroInput(nn).build()
       writer.write(record)
       writer.close()
     }.getMessage.toLowerCase.contains("column: name specified in sort 
columns"))
@@ -2149,8 +2125,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
-      sqlContext.sparkContext.hadoopConfiguration)
+      .outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
   }
@@ -2188,8 +2163,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder.sortBy(Array("id"))
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
-      sqlContext.sparkContext.hadoopConfiguration)
+      .outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
   }
@@ -2233,8 +2207,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema)
 
     val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
-      sqlContext.sparkContext.hadoopConfiguration)
+      .outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
   }
@@ -2274,8 +2247,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
-      sqlContext.sparkContext.hadoopConfiguration)
+      .outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -2321,8 +2293,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
-      sqlContext.sparkContext.hadoopConfiguration)
+      .outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -2369,8 +2340,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
 
     val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
-      sqlContext.sparkContext.hadoopConfiguration)
+      .outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -2390,7 +2360,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val writer: CarbonWriter = CarbonWriter.builder
       .outputPath(writerPath)
       .withTableProperties(options)
-      .buildWriterForCSVInput(new Schema(fields), 
sqlContext.sparkContext.hadoopConfiguration)
+      .withCsvInput(new Schema(fields)).build()
     writer.write(Array("carbon", "1"))
     writer.write(Array("hydrogen", "10"))
     writer.write(Array("boron", "4"))
@@ -2408,7 +2378,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     // write local sort data
     val writer1: CarbonWriter = CarbonWriter.builder
       .outputPath(writerPath)
-      .buildWriterForCSVInput(new Schema(fields), 
sqlContext.sparkContext.hadoopConfiguration)
+      .withCsvInput(new Schema(fields)).build()
     writer1.write(Array("carbon", "1"))
     writer1.write(Array("hydrogen", "10"))
     writer1.write(Array("boron", "4"))
@@ -2422,7 +2392,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
   test("test LocalDictionary with True") {
     FileUtils.deleteDirectory(new File(writerPath))
-    val builder = CarbonWriter.builder.isTransactionalTable(false)
+    val builder = CarbonWriter.builder
       
.sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
       
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
     generateCarbonData(builder)
@@ -2447,7 +2417,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
         "sort_columns" -> "name",
         "local_dictionary_threshold" -> "200",
         "local_dictionary_enable" -> "true").asJava
-    val builder = CarbonWriter.builder.isTransactionalTable(false)
+    val builder = CarbonWriter.builder
       .withTableProperties(tablePropertiesMap)
       
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
     generateCarbonData(builder)
@@ -2467,7 +2437,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
   test("test Local Dictionary with FallBack") {
     FileUtils.deleteDirectory(new File(writerPath))
-    val builder = CarbonWriter.builder.isTransactionalTable(false)
+    val builder = CarbonWriter.builder
       
.sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
       .localDictionaryThreshold(5)
       
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
@@ -2488,7 +2458,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
   test("test local dictionary with External Table data load ") {
     FileUtils.deleteDirectory(new File(writerPath))
-    val builder = CarbonWriter.builder.isTransactionalTable(false)
+    val builder = CarbonWriter.builder
       
.sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
       .localDictionaryThreshold(200)
       
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
@@ -2517,8 +2487,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     fields(0) = new Field("name", DataTypes.STRING)
     fields(1) = new Field("surname", DataTypes.STRING)
     fields(2) = new Field("age", DataTypes.INT)
-    val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields),
-      sqlContext.sparkContext.hadoopConfiguration)
+    val carbonWriter = builder.withCsvInput(new Schema(fields)).build()
     var i = 0
     while (i < 100) {
       {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala
index b060ec1..778913e 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala
@@ -439,12 +439,12 @@ class TestNonTransactionalCarbonTableForMapType extends 
QueryTest with BeforeAnd
     val json = """ {"name":"bob", "age":10, "arrayRecord": [{"101": "Rahul", 
"102": "Pawan"}]} """.stripMargin
     nonTransactionalCarbonTable.WriteFilesWithAvroWriter(2, mySchema, json)
 
-    val reader = CarbonReader.builder(writerPath, 
"_temp").isTransactionalTable(false).build(conf)
+    val reader = CarbonReader.builder(writerPath, "_temp").build()
     reader.close()
     val exception1 = intercept[Exception] {
       val reader1 = CarbonReader.builder(writerPath, "_temp")
-        .projection(Array[String] { "arrayRecord.houseDetails" 
}).isTransactionalTable(false)
-        .build(conf)
+        .projection(Array[String] { "arrayRecord.houseDetails" })
+        .build()
       reader1.close()
     }
     assert(exception1.getMessage

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
index 17aae1d..7ad698c 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
@@ -95,11 +95,10 @@ class TestNonTransactionalCarbonTableJsonWriter extends 
QueryTest with BeforeAnd
     try {
       var options: util.Map[String, String] = Map("bAd_RECords_action" -> 
"FAIL", "quotechar" -> "\"").asJava
       val writer = CarbonWriter.builder
-        .outputPath(writerPath).isTransactionalTable(false)
+        .outputPath(writerPath)
         .uniqueIdentifier(System.currentTimeMillis())
         .withLoadOptions(options)
-        .buildWriterForJsonInput(carbonSchema,
-          sqlContext.sparkContext.hadoopConfiguration)
+        .withJsonInput(carbonSchema).build()
       writer.write(jsonRow)
       writer.close()
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
index e7fcf95..d5da794 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
@@ -92,8 +92,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends 
QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -139,8 +138,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
     val record = testUtil.jsonToAvro(json1, mySchema)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -183,8 +181,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
     val record = testUtil.jsonToAvro(json, mySchema)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -214,8 +211,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -244,8 +240,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -274,8 +269,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -304,8 +298,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -334,8 +327,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -364,8 +356,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
     val exception1 = intercept[UnsupportedOperationException] {
-      val writer = CarbonWriter.builder
-        
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+      val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
       writer.write(record)
       writer.close()
     }
@@ -401,8 +392,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -437,8 +427,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -475,8 +464,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -508,8 +496,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -567,8 +554,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -662,8 +648,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -715,8 +700,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     avroRec.put("union_field", bytes1)
 
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -785,8 +769,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     avroRec.put("record2", 10.24)
     avroRec.put("struct_field_decimal", genericByteArray)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -857,8 +840,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     avroRec.put("age", 10)
     avroRec.put("dec_fields", genericByteArray)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -904,9 +886,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
 
-    val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false)
-      .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -952,9 +932,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
 
-    val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false)
-      .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -998,8 +976,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
     val avroRec = new GenericData. Record(nn)
     avroRec.put("id", bytes1)
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1043,8 +1020,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes1)
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1088,8 +1064,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes1)
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1129,8 +1104,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
       s"""{"dec_field":"$data"}""".stripMargin
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes)
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1171,8 +1145,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes)
     val exception1 = intercept[Exception] {
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(avroRec)
     writer.close()
     }
@@ -1221,8 +1194,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -1257,8 +1229,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(record)
     writer.close()
     sql(
@@ -1307,8 +1278,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType 
extends QueryTest with Bef
     avroRec.put("union_field", bytes1)
 
 
-    val writer = CarbonWriter.builder
-      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, 
sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
     writer.write(avroRec)
     writer.close()
     sql(s"create table sdkOutputTable(union_field 
struct<union_field0:decimal(10,2),union_field1:int>) " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 0421ea8..42bb791 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -66,15 +66,13 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
     try {
       val writer = if (isLocalDictionary) {
         CarbonWriter.builder
-          
.outputPath(writerPath).isTransactionalTable(false).enableLocalDictionary(true)
+          .outputPath(writerPath).enableLocalDictionary(true)
           .localDictionaryThreshold(2000)
-          
.uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
-          sqlContext.sparkContext.hadoopConfiguration)
+          
.uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
       } else {
         CarbonWriter.builder
-          .outputPath(writerPath).isTransactionalTable(false)
-          
.uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
-          sqlContext.sparkContext.hadoopConfiguration)
+          .outputPath(writerPath)
+          
.uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
       }
       var i = 0
       while (i < rows) {
@@ -270,8 +268,7 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
       """.stripMargin
     val pschema= org.apache.avro.Schema.parse(mySchema)
     val records = testUtil.jsonToAvro(jsonvalue, mySchema)
-    val 
writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema,
-      sqlContext.sparkContext.hadoopConfiguration)
+    val writer = 
CarbonWriter.builder().outputPath(writerPath).withAvroInput(pschema).build()
     writer.write(records)
     writer.close()
     sql("DROP TABLE IF EXISTS sdkOutputTable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
deleted file mode 100644
index a4e58e7..0000000
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.createTable
-
-import java.io.File
-
-import org.apache.commons.io.FileUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.SparkSession
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
-
-
-object TestSparkCarbonFileFormatWithSparkSession {
-
-  var writerPath = new File(this.getClass.getResource("/").getPath
-                            +
-                            "../." +
-                            
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
-    .getCanonicalPath
-  //getCanonicalPath gives path with \, but the code expects /.
-  writerPath = writerPath.replace("\\", "/");
-
-  val filePath = writerPath + "/Fact/Part0/Segment_null/"
-
-  def buildTestData(persistSchema:Boolean) = {
-
-    FileUtils.deleteDirectory(new File(writerPath))
-
-    val schema = new StringBuilder()
-      .append("[ \n")
-      .append("   {\"name\":\"string\"},\n")
-      .append("   {\"age\":\"int\"},\n")
-      .append("   {\"height\":\"double\"}\n")
-      .append("]")
-      .toString()
-
-    try {
-      val builder = CarbonWriter.builder().isTransactionalTable(true)
-      val writer =
-        if (persistSchema) {
-          builder.persistSchemaFile(true)
-          
builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), 
new
-              Configuration(false))
-        } else {
-          
builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), 
new
-              Configuration(false))
-        }
-
-      var i = 0
-      while (i < 100) {
-        writer.write(Array[String]("robot" + i, String.valueOf(i), 
String.valueOf(i.toDouble / 2)))
-        i += 1
-      }
-      writer.close()
-    } catch {
-      case ex: Throwable => throw new RuntimeException(ex)
-    }
-  }
-
-  def cleanTestData() = {
-    FileUtils.deleteDirectory(new File(writerPath))
-  }
-
-  def deleteIndexFile(path: String, extension: String) : Unit = {
-    val file: CarbonFile = FileFactory
-      .getCarbonFile(path, FileFactory.getFileType(path))
-
-    for (eachDir <- file.listFiles) {
-      if (!eachDir.isDirectory) {
-        if (eachDir.getName.endsWith(extension)) {
-          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
-        }
-      } else {
-        deleteIndexFile(eachDir.getPath, extension)
-      }
-    }
-  }
-
-  def main(args: Array[String]): Unit = {
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-    val storeLocation = s"$rootPath/examples/spark2/target/store"
-    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
-    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
-
-    // clean data folder
-    if (true) {
-      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
-      clean(storeLocation)
-      clean(warehouse)
-      clean(metastoredb)
-    }
-
-    val spark = SparkSession
-      .builder()
-      .master("local")
-      .appName("TestSparkCarbonFileFormatWithSparkSession")
-      .enableHiveSupport()
-      .config("spark.sql.warehouse.dir", warehouse)
-      .config("javax.jdo.option.ConnectionURL",
-        s"jdbc:derby:;databaseName=$metastoredb;create=true")
-      .getOrCreate()
-
-    CarbonProperties.getInstance()
-      .addProperty("carbon.storelocation", storeLocation)
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd 
HH:mm:ss")
-      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
-    buildTestData(false)
-    assert(new File(filePath).exists())
-    //data source file format
-    if (spark.sparkContext.version.startsWith("2.1")) {
-      //data source file format
-      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH 
'$filePath') """)
-    } else if (spark.sparkContext.version.startsWith("2.2")) {
-      //data source file format
-      spark.sql(
-        s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
-           |'$filePath' """.stripMargin)
-    } else{
-      // TO DO
-    }
-
-    spark.sql("Describe formatted sdkOutputTable").show(false)
-
-    spark.sql("select * from sdkOutputTable").show(false)
-
-    spark.sql("select * from sdkOutputTable limit 3").show(false)
-
-    spark.sql("select name from sdkOutputTable").show(false)
-
-    spark.sql("select age from sdkOutputTable").show(false)
-
-    spark.sql("select * from sdkOutputTable where age > 2 and age < 
8").show(200,false)
-
-    spark.sql("select * from sdkOutputTable where name = 
'robot3'").show(200,false)
-
-    spark.sql("select * from sdkOutputTable where name like 'robo%' limit 
5").show(200,false)
-
-    spark.sql("select * from sdkOutputTable where name like '%obot%' limit 
2").show(200,false)
-
-    spark.sql("select sum(age) from sdkOutputTable where name like 'robot1%' 
").show(200,false)
-
-    spark.sql("select count(*) from sdkOutputTable where name like 'robot%' 
").show(200,false)
-
-    spark.sql("select count(*) from sdkOutputTable").show(200,false)
-
-    spark.sql("DROP TABLE sdkOutputTable")
-    // drop table should not delete the files
-    assert(new File(filePath).exists())
-    cleanTestData()
-
-    spark.stop()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 337b13b..59cf2d8 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -220,7 +220,6 @@ object CarbonSparkDataSourceUtil {
       new Field(field.name, dataType)
     })
     val builder = new CarbonWriterBuilder
-    builder.isTransactionalTable(false)
     builder.outputPath(options.getOrElse("path", ""))
     val blockSize = 
options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
     if (blockSize.isDefined) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
 
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 36f6061..3be8cb3 100644
--- 
a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ 
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -983,10 +983,8 @@ class SparkCarbonDataSourceTest extends FunSuite with 
BeforeAndAfterAll {
       val builder = CarbonWriter.builder()
       val writer =
         builder.outputPath(path)
-          .isTransactionalTable(false)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
-          .buildWriterForCSVInput(new Schema(structType), spark.sparkContext
-            .hadoopConfiguration)
+          .withCsvInput(new Schema(structType)).build()
 
       var i = 0
       while (i < 11) {
@@ -1032,9 +1030,8 @@ class SparkCarbonDataSourceTest extends FunSuite with 
BeforeAndAfterAll {
       val builder = CarbonWriter.builder()
       val writer =
         builder.outputPath(path)
-          .isTransactionalTable(false)
           
.uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(Array("bytefield"))
-          .buildWriterForCSVInput(new Schema(fields), 
spark.sparkContext.hadoopConfiguration)
+          .withCsvInput(new Schema(fields)).build()
 
       var i = 0
       while (i < 11) {
@@ -1087,10 +1084,8 @@ class SparkCarbonDataSourceTest extends FunSuite with 
BeforeAndAfterAll {
       val builder = CarbonWriter.builder()
       val writer =
         builder.outputPath(path)
-          .isTransactionalTable(false)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
-          .buildWriterForCSVInput(new Schema(structType), spark.sparkContext
-            .hadoopConfiguration)
+          .withCsvInput(new Schema(structType)).build()
 
       var i = 0
       while (i < 10) {
@@ -1160,9 +1155,8 @@ class SparkCarbonDataSourceTest extends FunSuite with 
BeforeAndAfterAll {
       val builder = CarbonWriter.builder()
       val writer =
         builder.outputPath(writerPath)
-          .isTransactionalTable(false)
           
.uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
-          .buildWriterForCSVInput(new Schema(fields), 
spark.sparkContext.hadoopConfiguration)
+          .withCsvInput(new Schema(fields)).build()
 
       var i = 0
       while (i < rows) {
@@ -1201,9 +1195,8 @@ class SparkCarbonDataSourceTest extends FunSuite with 
BeforeAndAfterAll {
       val builder = CarbonWriter.builder()
       val writer =
         builder.outputPath(path)
-          .isTransactionalTable(false)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
-          .buildWriterForCSVInput(new Schema(fields), 
spark.sparkContext.hadoopConfiguration)
+          .withCsvInput(new Schema(fields)).build()
 
       var i = 0
       while (i < 33000) {

Reply via email to