IUD support in 2.1

Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b2026970
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b2026970
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b2026970

Branch: refs/heads/branch-1.1
Commit: b202697019dcf18ba8cd41ef16c49b407269d5c1
Parents: 43e06b6
Author: ravikiran23 <ravikiran.sn...@gmail.com>
Authored: Fri May 26 11:44:53 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Jun 15 12:58:34 2017 +0530

----------------------------------------------------------------------
 .../iud/DeleteCarbonTableTestCase.scala         | 131 +++
 .../iud/UpdateCarbonTableTestCase.scala         | 393 +++++++++
 .../spark/sql/CarbonCatalystOperators.scala     |  24 +
 .../sql/execution/command/IUDCommands.scala     | 857 +++++++++++++++++++
 .../spark/sql/hive/CarbonAnalysisRules.scala    | 138 +++
 .../spark/sql/hive/CarbonSessionState.scala     |   2 +
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  24 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      | 130 ++-
 8 files changed, 1694 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2026970/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
new file mode 100644
index 0000000..33ae0d3
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.iud
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+
+    sql("use default")
+    sql("drop database  if exists iud_db cascade")
+    sql("create database  iud_db")
+
+    sql("""create table iud_db.source2 (c11 string,c22 int,c33 string,c55 
string, c66 int) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/source2.csv'
 INTO table iud_db.source2""")
+    sql("use iud_db")
+  }
+  test("delete data from carbon table with alias [where clause ]") {
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/dest.csv'
 INTO table iud_db.dest""")
+//    sql(s"""select getTupleId() as tupleId from dest """).show
+    sql("""delete from iud_db.dest d where d.c1 = 'a'""").show
+    checkAnswer(
+      sql("""select c2 from iud_db.dest"""),
+      Seq(Row(2), Row(3),Row(4), Row(5))
+    )
+  }
+  test("delete data from  carbon table[where clause ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/dest.csv'
 INTO table iud_db.dest""")
+    sql("""delete from iud_db.dest e where e.c2 = 2""").show
+    checkAnswer(
+      sql("""select c1 from dest"""),
+      Seq(Row("a"), Row("c"), Row("d"), Row("e"))
+    )
+  }
+  test("delete data from  carbon table[where IN  ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/dest.csv'
 INTO table iud_db.dest""")
+    sql("""delete from iud_db.dest where c1 IN ('d', 'e')""").show
+    checkAnswer(
+      sql("""select c1 from dest"""),
+      Seq(Row("a"), Row("b"),Row("c"))
+    )
+  }
+
+  test("delete data from  carbon table[with alias No where clause]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/dest.csv'
 INTO table iud_db.dest""")
+    sql("""delete from iud_db.dest a""").show
+    checkAnswer(
+      sql("""select c1 from iud_db.dest"""),
+      Seq()
+    )
+  }
+  test("delete data from  carbon table[No alias No where clause]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/dest.csv'
 INTO table iud_db.dest""")
+    sql("""delete from dest""").show()
+    checkAnswer(
+      sql("""select c1 from dest"""),
+      Seq()
+    )
+  }
+
+  test("delete data from  carbon table[ JOIN with another table ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/dest.csv'
 INTO table iud_db.dest""")
+    sql(""" DELETE FROM dest t1 INNER JOIN source2 t2 ON t1.c1 = 
t2.c11""").show(truncate = false)
+    checkAnswer(
+      sql("""select c1 from iud_db.dest"""),
+      Seq(Row("c"), Row("d"), Row("e"))
+    )
+  }
+
+  test("delete data from  carbon table[where IN (sub query) ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/dest.csv'
 INTO table iud_db.dest""")
+    sql("""delete from  iud_db.dest where c1 IN (select c11 from 
source2)""").show(truncate = false)
+    checkAnswer(
+      sql("""select c1 from iud_db.dest"""),
+      Seq(Row("c"), Row("d"), Row("e"))
+    )
+  }
+  test("delete data from  carbon table[where IN (sub query with where clause) 
]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""").show()
+    sql("""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/dest.csv'
 INTO table iud_db.dest""")
+    sql("""delete from  iud_db.dest where c1 IN (select c11 from source2 where 
c11 = 'b')""").show()
+    checkAnswer(
+      sql("""select c1 from iud_db.dest"""),
+      Seq(Row("a"), Row("c"), Row("d"), Row("e"))
+    )
+  }
+  test("delete data from  carbon table[where numeric condition  ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH 
'D:/apacheCarbon/carbondata/integration/spark-common-test/src/test/resources/IUD/dest.csv'
 INTO table iud_db.dest""")
+    sql("""delete from iud_db.dest where c2 >= 4""").show()
+    checkAnswer(
+      sql("""select count(*) from iud_db.dest"""),
+      Seq(Row(3))
+    )
+  }
+  override def afterAll {
+  //  sql("use default")
+  //  sql("drop database  if exists iud_db cascade")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2026970/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
new file mode 100644
index 0000000..0ad700b
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.iud
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+
+    sql("drop database if exists iud cascade")
+    sql("create database iud")
+    sql("use iud")
+    sql("""create table iud.dest (c1 string,c2 int,c3 string,c5 string) STORED 
BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest""")
+    sql("""create table iud.source2 (c11 string,c22 int,c33 string,c55 string, 
c66 int) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source2.csv' INTO table 
iud.source2""")
+    sql("""create table iud.other (c1 string,c2 int) STORED BY 
'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/other.csv' INTO table 
iud.other""")
+    sql("""create table iud.hdest (c1 string,c2 int,c3 string,c5 string) ROW 
FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS 
TEXTFILE""").show()
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.hdest""")
+    sql("""CREATE TABLE iud.update_01(imei string,age int,task bigint,num 
double,level decimal(10,3),name string)STORED BY 'org.apache.carbondata.format' 
""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/update01.csv' INTO 
TABLE iud.update_01 OPTIONS('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 
'BAD_RECORDS_ACTION' = 'FORCE') """)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , 
"true")
+  }
+
+
+//  test("test update operation with 0 rows updation.") {
+//    sql("""drop table if exists iud.zerorows""").show
+//    sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.zerorows""")
+//    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 
'a'""").show()
+//    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 
'xxx'""").show()
+//     checkAnswer(
+//      sql("""select c1,c2,c3,c5 from iud.zerorows"""),
+//      
Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
+//    )
+//    sql("""drop table iud.zerorows""").show
+//
+//
+//  }
+
+
+  test("update carbon table[select from source table with where and exist]") {
+      sql("""drop table if exists iud.dest11""").show
+      sql("""create table iud.dest11 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+      sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest11""")
+      sql("""update iud.dest11 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from 
iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
+      checkAnswer(
+        sql("""select c3,c5 from iud.dest11"""),
+        Seq(Row("cc","ccc"), Row("dd","ddd"),Row("ee","eee"), 
Row("MGM","Disco"),Row("RGK","Music"))
+      )
+      sql("""drop table iud.dest11""").show
+   }
+
+//   test("update carbon table[using destination table columns with where and 
exist]") {
+//    sql("""drop table if exists iud.dest22""")
+//    sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest22""")
+//    checkAnswer(
+//      sql("""select c2 from iud.dest22 where c1='a'"""),
+//      Seq(Row(1))
+//    )
+//    sql("""update dest22 d  set (d.c2) = (d.c2 + 1) where d.c1 = 
'a'""").show()
+//    checkAnswer(
+//      sql("""select c2 from iud.dest22 where c1='a'"""),
+//      Seq(Row(2))
+//    )
+//    sql("""drop table iud.dest22""")
+//   }
+
+//   test("update carbon table without alias in set columns") {
+//      sql("""drop table iud.dest33""")
+//      sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//      sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO 
table iud.dest33""")
+//      sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55  from 
iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+//      checkAnswer(
+//        sql("""select c3,c5 from iud.dest33 where c1='a'"""),
+//        Seq(Row("MGM","Disco"))
+//      )
+//      sql("""drop table iud.dest33""")
+//  }
+//
+//  test("update carbon table without alias in set columns with mulitple 
loads") {
+//    sql("""drop table iud.dest33""")
+//    sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest33""")
+//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest33""")
+//    sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55  from 
iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+//    checkAnswer(
+//      sql("""select c3,c5 from iud.dest33 where c1='a'"""),
+//      Seq(Row("MGM","Disco"),Row("MGM","Disco"))
+//    )
+//    sql("""drop table iud.dest33""")
+//  }
+//
+//   test("update carbon table without alias in set three columns") {
+//     sql("""drop table iud.dest44""")
+//     sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest44""")
+//     sql("""update iud.dest44 d set (c1,c3,c5 ) = (select s.c11, s.c33 
,s.c55  from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+//     checkAnswer(
+//       sql("""select c1,c3,c5 from iud.dest44 where c1='a'"""),
+//       Seq(Row("a","MGM","Disco"))
+//     )
+//     sql("""drop table iud.dest44""")
+//   }
+//
+//   test("update carbon table[single column select from source with where and 
exist]") {
+//      sql("""drop table iud.dest55""")
+//      sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//      sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO 
table iud.dest55""")
+//     sql("""update iud.dest55 d set (c3)  = (select s.c33 from iud.source2 s 
where d.c1 = s.c11) where 1 = 1""").show()
+//      checkAnswer(
+//        sql("""select c1,c3 from iud.dest55 """),
+//        
Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
+//      )
+//      sql("""drop table iud.dest55""")
+//   }
+//
+//  test("update carbon table[single column SELECT from source with where and 
exist]") {
+//    sql("""drop table iud.dest55""")
+//    sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest55""")
+//    sql("""update iud.dest55 d set (c3)  = (SELECT s.c33 from iud.source2 s 
where d.c1 = s.c11) where 1 = 1""").show()
+//    checkAnswer(
+//      sql("""select c1,c3 from iud.dest55 """),
+//      
Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
+//    )
+//    sql("""drop table iud.dest55""")
+//  }
+//
+//   test("update carbon table[using destination table columns without where 
clause]") {
+//     sql("""drop table iud.dest66""")
+//     sql("""create table iud.dest66 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest66""")
+//     sql("""update iud.dest66 d set (c2, c5 ) = (c2 + 1, concat(c5 , 
"z"))""").show()
+//     checkAnswer(
+//       sql("""select c2,c5 from iud.dest66 """),
+//       
Seq(Row(2,"aaaz"),Row(3,"bbbz"),Row(4,"cccz"),Row(5,"dddz"),Row(6,"eeez"))
+//     )
+//     sql("""drop table iud.dest66""")
+//   }
+//
+//   test("update carbon table[using destination table columns with where 
clause]") {
+//       sql("""drop table iud.dest77""")
+//       sql("""create table iud.dest77 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//       sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO 
table iud.dest77""")
+//       sql("""update iud.dest77 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) 
where d.c3 = 'dd'""").show()
+//       checkAnswer(
+//         sql("""select c2,c5 from iud.dest77 where c3 = 'dd'"""),
+//         Seq(Row(5,"dddz"))
+//       )
+//       sql("""drop table iud.dest77""")
+//   }
+//
+//   test("update carbon table[using destination table( no alias) columns 
without where clause]") {
+//     sql("""drop table iud.dest88""")
+//     sql("""create table iud.dest88 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest88""")
+//     sql("""update iud.dest88  set (c2, c5 ) = (c2 + 1, concat(c5 , "y" 
))""").show()
+//     checkAnswer(
+//       sql("""select c2,c5 from iud.dest88 """),
+//       
Seq(Row(2,"aaay"),Row(3,"bbby"),Row(4,"cccy"),Row(5,"dddy"),Row(6,"eeey"))
+//     )
+//     sql("""drop table iud.dest88""")
+//   }
+//
+//   test("update carbon table[using destination table columns with hard coded 
value ]") {
+//     sql("""drop table iud.dest99""")
+//     sql("""create table iud.dest99 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest99""")
+//     sql("""update iud.dest99 d set (c2, c5 ) = (c2 + 1, "xyx")""").show()
+//     checkAnswer(
+//       sql("""select c2,c5 from iud.dest99 """),
+//       Seq(Row(2,"xyx"),Row(3,"xyx"),Row(4,"xyx"),Row(5,"xyx"),Row(6,"xyx"))
+//     )
+//     sql("""drop table iud.dest99""")
+//   }
+//
+//   test("update carbon tableusing destination table columns with hard coded 
value and where condition]") {
+//     sql("""drop table iud.dest110""")
+//     sql("""create table iud.dest110 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest110""")
+//     sql("""update iud.dest110 d set (c2, c5 ) = (c2 + 1, "xyx") where d.c1 
= 'e'""").show()
+//     checkAnswer(
+//       sql("""select c2,c5 from iud.dest110 where c1 = 'e' """),
+//       Seq(Row(6,"xyx"))
+//     )
+//     sql("""drop table iud.dest110""")
+//   }
+//
+//   test("update carbon table[using source  table columns with where and 
exist and no destination table condition]") {
+//     sql("""drop table iud.dest120""")
+//     sql("""create table iud.dest120 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest120""")
+//     sql("""update iud.dest120 d  set (c3, c5 ) = (select s.c33 ,s.c55  from 
iud.source2 s where d.c1 = s.c11)""").show()
+//     checkAnswer(
+//       sql("""select c3,c5 from iud.dest120 """),
+//       
Seq(Row("MGM","Disco"),Row("RGK","Music"),Row("cc","ccc"),Row("dd","ddd"),Row("ee","eee"))
+//     )
+//     sql("""drop table iud.dest120""")
+//   }
+//
+//   test("update carbon table[using destination table where and exist]") {
+//     sql("""drop table iud.dest130""")
+//     sql("""create table iud.dest130 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest130""")
+//     sql("""update iud.dest130 dd  set (c2, c5 ) = (c2 + 1, "xyx")  where 
dd.c1 = 'a'""").show()
+//     checkAnswer(
+//       sql("""select c2,c5 from iud.dest130 where c1 = 'a' """),
+//       Seq(Row(2,"xyx"))
+//     )
+//     sql("""drop table iud.dest130""")
+//   }
+//
+//   test("update carbon table[using destination table (concat) where and 
exist]") {
+//     sql("""drop table iud.dest140""")
+//     sql("""create table iud.dest140 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest140""")
+//     sql("""update iud.dest140 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))  
where d.c1 = 'a'""").show()
+//     checkAnswer(
+//       sql("""select c2,c5 from iud.dest140 where c1 = 'a'"""),
+//       Seq(Row(2,"aaaz"))
+//     )
+//     sql("""drop table iud.dest140""")
+//   }
+//
+//   test("update carbon table[using destination table (concat) with  where") {
+//     sql("""drop table iud.dest150""")
+//     sql("""create table iud.dest150 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest150""")
+//     sql("""update iud.dest150 d set (c5) = (concat(c5 , "z"))  where d.c1 = 
'b'""").show()
+//     checkAnswer(
+//       sql("""select c5 from iud.dest150 where c1 = 'b' """),
+//       Seq(Row("bbbz"))
+//     )
+//     sql("""drop table iud.dest150""")
+//   }
+//
+//  test("update table with data for datatype mismatch with column ") {
+//    sql("""update iud.update_01 set (imei) = ('skt') where level = 'aaa'""")
+//    checkAnswer(
+//      sql("""select * from iud.update_01 where imei = 'skt'"""),
+//      Seq()
+//    )
+//  }
+//
+//   test("update carbon table-error[more columns in source table not 
allowed") {
+//     val exception = intercept[Exception] {
+//       sql("""update iud.dest d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"), 
"abc")""").show()
+//     }
+//     assertResult("Number of source and destination columns are not 
matching")(exception.getMessage)
+//   }
+//
+//   test("update carbon table-error[no set columns") {
+//     intercept[Exception] {
+//       sql("""update iud.dest d set () = ()""").show()
+//     }
+//   }
+//
+//   test("update carbon table-error[no set columns with updated column") {
+//     intercept[Exception] {
+//       sql("""update iud.dest d set  = (c1+1)""").show()
+//     }
+//   }
+//   test("update carbon table-error[one set column with two updated column") {
+//     intercept[Exception] {
+//       sql("""update iud.dest  set c2 = (c2 + 1, concat(c5 , "z") 
)""").show()
+//     }
+//   }
+//
+// test("""update carbon [special characters  in value- test parsing logic 
]""") {
+//    sql("""drop table iud.dest160""")
+//    sql("""create table iud.dest160 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest160""")
+//    sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
+//    sql("""update iud.dest160 set(c1) =  
('abd$asjdh$adasj$l;sdf$*)$*)(&^')""").show()
+//    sql("""update iud.dest160 set(c1) =("\\")""").show()
+//    sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
+//    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'a\\a' from 
iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 
3""").show()
+//    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\' from 
iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 
3""").show()
+//    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a' from 
iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 
3""").show()
+//    sql("""update iud.dest160 d set (c3,c5)      =     (select 
s.c33,'a\\a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  
d.c2 between 1 and 3""").show()
+//    sql("""update iud.dest160 d set (c3,c5) =(select s.c33,'a\'a\\' from 
iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 
3""").show()
+//    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a\'a\"' from 
iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 
3""").show()
+//    sql("""drop table iud.dest160""")
+//  }
+//
+//  test("""update carbon [sub query, between and existing in outer 
condition.(Customer query ) ]""") {
+//    sql("""drop table iud.dest170""")
+//    sql("""create table iud.dest170 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest170""")
+//    sql("""update iud.dest170 d set (c3)=(select s.c33 from iud.source2 s 
where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
+//    checkAnswer(
+//      sql("""select c3 from  iud.dest170 as d where d.c2 between 1 and 3"""),
+//      Seq(Row("MGM"), Row("RGK"), Row("cc"))
+//    )
+//    sql("""drop table iud.dest170""")
+//  }
+//
+//  test("""update carbon [self join select query ]""") {
+//    sql("""drop table iud.dest171""")
+//    sql("""create table iud.dest171 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest171""")
+//    sql("""update iud.dest171 d set (c3)=(select concat(s.c3 , "z") from 
iud.dest171 s where d.c2 = s.c2)""").show
+//    sql("""drop table iud.dest172""")
+//    sql("""create table iud.dest172 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
+//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest172""")
+//    sql("""update iud.dest172 d set (c3)=( concat(c3 , "z"))""").show
+//    checkAnswer(
+//      sql("""select c3 from  iud.dest171"""),
+//      sql("""select c3 from  iud.dest172""")
+//    )
+//    sql("""drop table iud.dest171""")
+//    sql("""drop table iud.dest172""")
+//  }
+//
+//  test("update carbon table-error[closing bracket missed") {
+//    intercept[Exception] {
+//      sql("""update iud.dest d set (c2) = (194""").show()
+//    }
+//  }
+//
+//  test("update carbon table-error[starting bracket missed") {
+//    intercept[Exception] {
+//      sql("""update iud.dest d set (c2) = 194)""").show()
+//    }
+//  }
+//
+//  test("update carbon table-error[missing starting and closing bracket") {
+//    intercept[Exception] {
+//      sql("""update iud.dest d set (c2) = 194""").show()
+//    }
+//  }
+//
+//  test("test create table with column name as tupleID"){
+//    intercept[Exception] {
+//      sql("CREATE table carbontable (empno int, tupleID String, " +
+//          "designation String, doj Timestamp, workgroupcategory int, " +
+//          "workgroupcategoryname String, deptno int, deptname String, 
projectcode int, " +
+//          "projectjoindate Timestamp, projectenddate Timestamp, attendance 
int, " +
+//          "utilization int,salary int) STORED BY 
'org.apache.carbondata.format' " +
+//          
"TBLPROPERTIES('DICTIONARY_INCLUDE'='empno,workgroupcategory,deptno,projectcode',"
 +
+//          "'DICTIONARY_EXCLUDE'='empname')")
+//    }
+//  }
+//
+//  test("Failure of update operation due to bad record with proper error 
message") {
+//    try {
+//      CarbonProperties.getInstance()
+//        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
+//      val errorMessage = intercept[Exception] {
+//        sql("drop table if exists update_with_bad_record")
+//        sql("create table update_with_bad_record(item int, name String) 
stored by 'carbondata'")
+//        sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/IUD/bad_record.csv' 
into table " +
+//            s"update_with_bad_record")
+//        sql("update update_with_bad_record set (item)=(3.45)").show()
+//        sql("drop table if exists update_with_bad_record")
+//      }
+//      assert(errorMessage.getMessage.contains("Data load failed due to bad 
record"))
+//    } finally {
+//      CarbonProperties.getInstance()
+//        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, 
"FORCE")
+//    }
+//  }
+
+  override def afterAll {
+//    sql("use default")
+//    sql("drop database  if exists iud cascade")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , 
"true")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2026970/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 5b47fcf..6651abe 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -86,6 +87,29 @@ case class ShowLoadsCommand(databaseNameOp: Option[String], 
table: String, limit
   }
 }
 
+case class ProjectForUpdate(
+    table: UnresolvedRelation,
+    columns: List[String],
+    child: Seq[LogicalPlan] ) extends Command {
+  override def output: Seq[AttributeReference] = Seq.empty
+}
+
+case class UpdateTable(
+    table: UnresolvedRelation,
+    columns: List[String],
+    selectStmt: String,
+    filer: String) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq.empty
+  override def output: Seq[AttributeReference] = Seq.empty
+}
+
+case class DeleteRecords(
+    statement: String,
+    table: UnresolvedRelation) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq.empty
+  override def output: Seq[AttributeReference] = Seq.empty
+}
+
 /**
  * Describe formatted for hive table
  */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2026970/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
new file mode 100644
index 0000000..39d03bb
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -0,0 +1,857 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, 
DataFrame, Dataset, Row, SparkSession, getDB}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, 
DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
+import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
+import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, 
SegmentUpdateStatusManager}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
+import org.apache.carbondata.processing.exception.MultipleMatchingException
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, 
CarbonDataMergerUtilResult, CompactionType}
+import org.apache.carbondata.spark.DeleteDelataResultImpl
+import org.apache.carbondata.spark.load.FailureCauses
+import org.apache.carbondata.spark.util.QueryPlanUtil
+
+
+/**
+ * IUD update delete and compaction framework.
+ *
+ */
+
+private[sql] case class ProjectForDeleteCommand(
+     plan: LogicalPlan,
+     identifier: Seq[String],
+     timestamp: String) extends RunnableCommand {
+
+  val LOG = LogServiceFactory.getLogService(this.getClass.getName)
+  var horizontalCompactionFailed = false
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val dataFrame = Dataset.ofRows(sparkSession, plan)
+//    dataFrame.show(truncate = false)
+//    dataFrame.collect().foreach(println)
+    val dataRdd = dataFrame.rdd
+
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      
.lookupRelation(deleteExecution.getTableIdentifier(identifier))(sparkSession).
+      asInstanceOf[CarbonRelation]
+    val carbonTable = relation.tableMeta.carbonTable
+    val metadataLock = CarbonLockFactory
+      
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    var lockStatus = false
+    try {
+      lockStatus = metadataLock.lockWithRetries()
+      LOG.audit(s" Delete data request has been received " +
+                s"for ${ relation.databaseName }.${ relation.tableName }.")
+      if (lockStatus) {
+        LOG.info("Successfully able to get the table metadata file lock")
+      }
+      else {
+        throw new Exception("Table is locked for deletion. Please try after 
some time")
+      }
+      val tablePath = CarbonStorePath.getCarbonTablePath(
+        carbonTable.getStorePath,
+        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
+      var executorErrors = new ExecutionErrors(FailureCauses.NONE, "")
+
+        // handle the clean up of IUD.
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+
+          if (deleteExecution
+            .deleteDeltaExecution(identifier, sparkSession, dataRdd, 
timestamp, relation,
+              false, executorErrors)) {
+            // call IUD Compaction.
+            IUDCommon.tryHorizontalCompaction(sparkSession, relation, 
isUpdateOperation = false)
+          }
+    } catch {
+      case e: HorizontalCompactionException =>
+          LOG.error("Delete operation passed. Exception in Horizontal 
Compaction." +
+              " Please check logs. " + e.getMessage)
+          CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, 
e.compactionTimeStamp.toString)
+
+      case e: Exception =>
+        LOG.error("Exception in Delete data operation " + e.getMessage)
+        // ****** start clean up.
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+
+        // clean up. Null check is required as for executor error some times 
message is null
+        if (null != e.getMessage) {
+          sys.error("Delete data operation is failed. " + e.getMessage)
+        }
+        else {
+          sys.error("Delete data operation is failed. Please check logs.")
+        }
+    } finally {
+      if (lockStatus) {
+        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
+      }
+    }
+    Seq.empty
+  }
+}
+
+private[sql] case class ProjectForUpdateCommand(
+    plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand {
+  val LOGGER = 
LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+
+
+   //  
sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
+    //  .EXECUTION_ID_KEY, null)
+    // DataFrame(sqlContext, plan).show(truncate = false)
+    // return Seq.empty
+
+
+    val res = plan find {
+      case relation: LogicalRelation if (relation.relation
+        .isInstanceOf[CarbonDatasourceHadoopRelation]) =>
+        true
+      case _ => false
+    }
+
+    if (!res.isDefined) {
+      return Seq.empty
+    }
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      
.lookupRelation(deleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
+      asInstanceOf[CarbonRelation]
+//    val relation = CarbonEnv.get.carbonMetastore
+//      
.lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext).
+//      asInstanceOf[CarbonRelation]
+    val carbonTable = relation.tableMeta.carbonTable
+    val metadataLock = CarbonLockFactory
+      
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    var lockStatus = false
+    // get the current time stamp which should be same for delete and update.
+    val currentTime = CarbonUpdateUtil.readCurrentTime
+//    var dataFrame: DataFrame = null
+    var dataSet: DataFrame = null
+    val isPersistEnabledUserValue = CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.isPersistEnabled,
+        CarbonCommonConstants.defaultValueIsPersistEnabled)
+   var isPersistEnabled = 
CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean
+    if (isPersistEnabledUserValue.equalsIgnoreCase("false")) {
+      isPersistEnabled = false
+    }
+    else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) {
+      isPersistEnabled = true
+    }
+    try {
+      lockStatus = metadataLock.lockWithRetries()
+      if (lockStatus) {
+        logInfo("Successfully able to get the table metadata file lock")
+      }
+      else {
+        throw new Exception("Table is locked for updation. Please try after 
some time")
+      }
+      val tablePath = CarbonStorePath.getCarbonTablePath(
+        carbonTable.getStorePath,
+        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
+        // Get RDD.
+
+      dataSet = if (isPersistEnabled) {
+          Dataset.ofRows(sparkSession, 
plan).persist(StorageLevel.MEMORY_AND_DISK)
+//          DataFrame(sqlContext, plan)
+//            .persist(StorageLevel.MEMORY_AND_DISK)
+        }
+        else {
+          Dataset.ofRows(sparkSession, plan)
+//          DataFrame(sqlContext, plan)
+        }
+        var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+
+
+        // handle the clean up of IUD.
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+
+        // do delete operation.
+        deleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, 
dataSet.rdd,
+          currentTime + "",
+        relation, isUpdateOperation = true, executionErrors)
+
+        if(executionErrors.failureCauses != FailureCauses.NONE) {
+          throw new Exception(executionErrors.errorMsg)
+        }
+
+        // do update operation.
+        UpdateExecution.performUpdate(dataSet, tableIdentifier, plan,
+          sparkSession, currentTime, executionErrors)
+
+        if(executionErrors.failureCauses != FailureCauses.NONE) {
+          throw new Exception(executionErrors.errorMsg)
+        }
+
+        // Do IUD Compaction.
+        IUDCommon.tryHorizontalCompaction(sparkSession, relation, 
isUpdateOperation = true)
+    }
+
+    catch {
+      case e: HorizontalCompactionException =>
+        LOGGER.error(
+            "Update operation passed. Exception in Horizontal Compaction. 
Please check logs." + e)
+        // In case of failure , clean all related delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, 
e.compactionTimeStamp.toString)
+
+      case e: Exception =>
+        LOGGER.error("Exception in update operation" + e)
+        // ****** start clean up.
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
+
+        // *****end clean up.
+        if (null != e.getMessage) {
+          sys.error("Update operation failed. " + e.getMessage)
+        }
+        if (null != e.getCause && null != e.getCause.getMessage) {
+          sys.error("Update operation failed. " + e.getCause.getMessage)
+        }
+        sys.error("Update operation failed. please check logs.")
+    }
+    finally {
+      if (null != dataSet && isPersistEnabled) {
+        dataSet.unpersist()
+      }
+      if (lockStatus) {
+        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
+      }
+    }
+    Seq.empty
+  }
+}
+
+object IUDCommon {
+
+  val LOG = LogServiceFactory.getLogService(this.getClass.getName)
+
+  /**
+   * The method does horizontal compaction. After Update and Delete completion
+   * tryHorizontal compaction will be called. In case this method is called 
after
+   * Update statement then Update Compaction followed by Delete Compaction 
will be
+   * processed whereas for tryHorizontalCompaction called after Delete 
statement
+   * then only Delete Compaction will be processed.
+    *
+    * @param sparkSession
+   * @param carbonRelation
+   * @param isUpdateOperation
+   */
+  def tryHorizontalCompaction(sparkSession: SparkSession,
+      carbonRelation: CarbonRelation,
+      isUpdateOperation: Boolean): Unit = {
+
+    var ishorizontalCompaction = 
CarbonDataMergerUtil.isHorizontalCompactionEnabled()
+
+    if (ishorizontalCompaction == false) {
+      return
+    }
+
+    var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+    val carbonTable = carbonRelation.tableMeta.carbonTable
+    val (db, table) = (carbonTable.getDatabaseName, 
carbonTable.getFactTableName)
+    val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+    val updateTimeStamp = System.currentTimeMillis()
+    // To make sure that update and delete timestamps are not same,
+    // required to commit to status metadata and cleanup
+    val deleteTimeStamp = updateTimeStamp + 1
+
+    // get the valid segments
+    var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
+
+    if (segLists == null || segLists.size() == 0) {
+      return
+    }
+
+    // Should avoid reading Table Status file from Disk every time. Better to 
load it
+    // in-memory at the starting and pass it along the routines. The 
constructor of
+    // SegmentUpdateStatusManager reads the Table Status File and Table Update 
Status
+    // file and save the content in segmentDetails and updateDetails 
respectively.
+    val segmentUpdateStatusManager: SegmentUpdateStatusManager = new 
SegmentUpdateStatusManager(
+      absTableIdentifier)
+
+    if (isUpdateOperation == true) {
+
+      // This is only update operation, perform only update compaction.
+      compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+      performUpdateDeltaCompaction(sparkSession,
+        compactionTypeIUD,
+        carbonTable,
+        absTableIdentifier,
+        segmentUpdateStatusManager,
+        updateTimeStamp,
+        segLists)
+    }
+
+    // After Update Compaction perform delete compaction
+    compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION
+    segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
+    if (segLists == null || segLists.size() == 0) {
+      return
+    }
+
+    // Delete Compaction
+    performDeleteDeltaCompaction(sparkSession,
+      compactionTypeIUD,
+      carbonTable,
+      absTableIdentifier,
+      segmentUpdateStatusManager,
+      deleteTimeStamp,
+      segLists)
+  }
+
+  /**
+   * Update Delta Horizontal Compaction.
+    *
+    * @param sparkSession
+   * @param compactionTypeIUD
+   * @param carbonTable
+   * @param absTableIdentifier
+   * @param segLists
+   */
+  private def performUpdateDeltaCompaction(sparkSession: SparkSession,
+      compactionTypeIUD: CompactionType,
+      carbonTable: CarbonTable,
+      absTableIdentifier: AbsoluteTableIdentifier,
+      segmentUpdateStatusManager: SegmentUpdateStatusManager,
+      factTimeStamp: Long,
+      segLists: util.List[String]): Unit = {
+    val db = carbonTable.getDatabaseName
+    val table = carbonTable.getFactTableName
+    // get the valid segments qualified for update compaction.
+    val validSegList = 
CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
+      absTableIdentifier,
+      segmentUpdateStatusManager,
+      compactionTypeIUD)
+
+    if (validSegList.size() == 0) {
+      return
+    }
+
+    LOG.info(s"Horizontal Update Compaction operation started for 
[${db}.${table}].")
+    LOG.audit(s"Horizontal Update Compaction operation started for 
[${db}.${table}].")
+
+    try {
+      // Update Compaction.
+      val altertablemodel = 
AlterTableModel(Option(carbonTable.getDatabaseName),
+        carbonTable.getFactTableName,
+        Some(segmentUpdateStatusManager),
+        CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
+        Some(factTimeStamp),
+        "")
+
+      AlterTableCompaction(altertablemodel).run(sparkSession)
+    }
+    catch {
+      case e: Exception =>
+        val msg = if (null != e.getMessage) {
+          e.getMessage
+        } else {
+          "Please check logs for more info"
+        }
+        throw new HorizontalCompactionException(
+          s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + 
msg, factTimeStamp)
+    }
+    LOG.info(s"Horizontal Update Compaction operation completed for [${ db 
}.${ table }].")
+    LOG.audit(s"Horizontal Update Compaction operation completed for [${ db 
}.${ table }].")
+  }
+
+  /**
+   * Delete Delta Horizontal Compaction.
+    *
+    * @param sparkSession
+   * @param compactionTypeIUD
+   * @param carbonTable
+   * @param absTableIdentifier
+   * @param segLists
+   */
+  private def performDeleteDeltaCompaction(sparkSession: SparkSession,
+      compactionTypeIUD: CompactionType,
+      carbonTable: CarbonTable,
+      absTableIdentifier: AbsoluteTableIdentifier,
+      segmentUpdateStatusManager: SegmentUpdateStatusManager,
+      factTimeStamp: Long,
+      segLists: util.List[String]): Unit = {
+
+    val db = carbonTable.getDatabaseName
+    val table = carbonTable.getFactTableName
+    val deletedBlocksList = 
CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
+      absTableIdentifier,
+      segmentUpdateStatusManager,
+      compactionTypeIUD)
+
+    if (deletedBlocksList.size() == 0) {
+      return
+    }
+
+    LOG.info(s"Horizontal Delete Compaction operation started for 
[${db}.${table}].")
+    LOG.audit(s"Horizontal Delete Compaction operation started for 
[${db}.${table}].")
+
+    try {
+
+      // Delete Compaction RDD
+      val rdd1 = sparkSession.sparkContext
+        .parallelize(deletedBlocksList.asScala.toSeq, deletedBlocksList.size())
+
+      val timestamp = factTimeStamp
+      val updateStatusDetails = 
segmentUpdateStatusManager.getUpdateStatusDetails
+      val result = rdd1.mapPartitions(iter =>
+        new Iterator[Seq[CarbonDataMergerUtilResult]] {
+          override def hasNext: Boolean = iter.hasNext
+
+          override def next(): Seq[CarbonDataMergerUtilResult] = {
+            val segmentAndBlocks = iter.next
+            val segment = segmentAndBlocks.substring(0, 
segmentAndBlocks.lastIndexOf("/"))
+            val blockName = segmentAndBlocks
+              .substring(segmentAndBlocks.lastIndexOf("/") + 1, 
segmentAndBlocks.length)
+
+            val result = 
CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
+              absTableIdentifier,
+              updateStatusDetails,
+              timestamp)
+
+            result.asScala.toList
+
+          }
+        }).collect
+
+      val resultList = ListBuffer[CarbonDataMergerUtilResult]()
+      result.foreach(x => {
+        x.foreach(y => {
+          resultList += y
+        })
+      })
+
+      val updateStatus = 
CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava,
+        carbonTable,
+        timestamp.toString,
+        segmentUpdateStatusManager)
+      if (updateStatus == false) {
+        LOG.audit(s"Delete Compaction data operation is failed for 
[${db}.${table}].")
+        LOG.error("Delete Compaction data operation is failed.")
+        throw new HorizontalCompactionException(
+          s"Horizontal Delete Compaction Failed for [${db}.${table}] ." +
+          s" Please check logs for more info.", factTimeStamp)
+      }
+      else {
+        LOG.info(s"Horizontal Delete Compaction operation completed for 
[${db}.${table}].")
+        LOG.audit(s"Horizontal Delete Compaction operation completed for 
[${db}.${table}].")
+      }
+    }
+    catch {
+      case e: Exception =>
+        val msg = if (null != e.getMessage) {
+          e.getMessage
+        } else {
+          "Please check logs for more info"
+        }
+        throw new HorizontalCompactionException(
+          s"Horizontal Delete Compaction Failed for [${ db }.${ table }]. " + 
msg, factTimeStamp)
+    }
+  }
+}
+
+class HorizontalCompactionException(
+    message: String,
+    // required for cleanup
+    val compactionTimeStamp: Long) extends RuntimeException(message) {
+}
+
+object deleteExecution {
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
+    if (tableIdentifier.size > 1) {
+      TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0)))
+    } else {
+      TableIdentifier(tableIdentifier(0), None)
+    }
+  }
+
+  def deleteDeltaExecution(identifier: Seq[String],
+                           sparkSession: SparkSession,
+                           dataRdd: RDD[Row],
+                           timestamp: String, relation: CarbonRelation, 
isUpdateOperation: Boolean,
+                           executorErrors: ExecutionErrors): Boolean = {
+
+    var res: Array[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] = 
null
+    val tableName = getTableIdentifier(identifier).table
+    val database = 
getDB.getDatabaseName(getTableIdentifier(identifier).database, sparkSession)
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      
.lookupRelation(deleteExecution.getTableIdentifier(identifier))(sparkSession).
+      asInstanceOf[CarbonRelation]
+
+    val storeLocation = relation.tableMeta.storePath
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = new
+        AbsoluteTableIdentifier(storeLocation,
+          relation.tableMeta.carbonTableIdentifier)
+    var tablePath = CarbonStorePath
+      .getCarbonTablePath(storeLocation,
+        absoluteTableIdentifier.getCarbonTableIdentifier())
+    var tableUpdateStatusPath = tablePath.getTableUpdateStatusFilePath
+    val totalSegments =
+      
SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath).length
+    var factPath = tablePath.getFactDir
+
+    var carbonTable = relation.tableMeta.carbonTable
+    var deleteStatus = true
+    val deleteRdd = if (isUpdateOperation) {
+      val schema =
+        
org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
+          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
+          org.apache.spark.sql.types.StringType)))
+      val rdd = dataRdd
+        .map(row => Row(row.get(row.fieldIndex(
+          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
+      sparkSession.createDataFrame(rdd, schema).rdd
+      // sqlContext.createDataFrame(rdd, schema).rdd
+    } else {
+      dataRdd
+    }
+
+    val (carbonInputFormat, job) =
+      QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
+
+    val keyRdd = deleteRdd.map({ row =>
+      val tupleId: String = row
+        
.getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+      val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+      (key, row)
+    }).groupByKey()
+
+    // if no loads are present then no need to do anything.
+    if (keyRdd.partitions.size == 0) {
+      return true
+    }
+
+    var blockMappingVO = carbonInputFormat.getBlockRowCount(job, 
absoluteTableIdentifier)
+    val segmentUpdateStatusMngr = new 
SegmentUpdateStatusManager(absoluteTableIdentifier)
+    CarbonUpdateUtil
+      .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
+
+    val rowContRdd = 
sparkSession.sparkContext.parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
+      keyRdd.partitions.size)
+
+//    val rowContRdd = sqlContext.sparkContext
+//      .parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
+//        keyRdd.partitions.size)
+
+    val rdd = rowContRdd.join(keyRdd)
+
+    // rdd.collect().foreach(println)
+
+    res = rdd.mapPartitionsWithIndex(
+      (index: Int, records: Iterator[((String), (RowCountDetailsVO, 
Iterable[Row]))]) =>
+        Iterator[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] {
+
+          var result = List[(String, (SegmentUpdateDetails, 
ExecutionErrors))]()
+          while (records.hasNext) {
+            val ((key), (rowCountDetailsVO, groupedRows)) = records.next
+            result = result ++
+              deleteDeltaFunc(index,
+                key,
+                groupedRows.toIterator,
+                timestamp,
+                rowCountDetailsVO)
+
+          }
+          result
+        }
+    ).collect()
+
+    // if no loads are present then no need to do anything.
+    if (res.isEmpty) {
+      return true
+    }
+
+    // update new status file
+    checkAndUpdateStatusFiles
+
+    // all or none : update status file, only if complete delete opeartion is 
successfull.
+    def checkAndUpdateStatusFiles: Unit = {
+      val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
+      val segmentDetails = new util.HashSet[String]()
+      res.foreach(resultOfSeg => resultOfSeg.foreach(
+        resultOfBlock => {
+          if 
(resultOfBlock._1.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS))
 {
+            blockUpdateDetailsList.add(resultOfBlock._2._1)
+            segmentDetails.add(resultOfBlock._2._1.getSegmentName)
+            // if this block is invalid then decrement block count in map.
+            if 
(CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getStatus)) {
+              CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
+                blockMappingVO.getSegmentNumberOfBlockMapping)
+            }
+          }
+          else {
+            deleteStatus = false
+            // In case of failure , clean all related delete delta files
+            CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+            LOGGER.audit(s"Delete data operation is failed for ${ database 
}.${ tableName }")
+            val errorMsg =
+              "Delete data operation is failed due to failure in creating 
delete delta file for " +
+                "segment : " + resultOfBlock._2._1.getSegmentName + " block : 
" +
+                resultOfBlock._2._1.getBlockName
+            executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+            executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
+
+            if (executorErrors.failureCauses == FailureCauses.NONE) {
+              executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+              executorErrors.errorMsg = errorMsg
+            }
+            LOGGER.error(errorMsg)
+            return
+          }
+        }
+      )
+      )
+
+      val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
+        
.getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
+
+
+
+      // this is delete flow so no need of putting timestamp in the status 
file.
+      if (CarbonUpdateUtil
+        .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, 
false) &&
+        CarbonUpdateUtil
+          .updateTableMetadataStatus(segmentDetails,
+            carbonTable,
+            timestamp,
+            !isUpdateOperation,
+            listOfSegmentToBeMarkedDeleted)
+      ) {
+        LOGGER.info(s"Delete data operation is successful for ${ database }.${ 
tableName }")
+        LOGGER.audit(s"Delete data operation is successful for ${ database 
}.${ tableName }")
+      }
+      else {
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+
+        val errorMessage = "Delete data operation is failed due to failure " +
+          "in table status updation."
+        LOGGER.audit(s"Delete data operation is failed for ${ database }.${ 
tableName }")
+        LOGGER.error("Delete data operation is failed due to failure in table 
status updation.")
+        executorErrors.failureCauses = 
FailureCauses.STATUS_FILE_UPDATION_FAILURE
+        executorErrors.errorMsg = errorMessage
+        // throw new Exception(errorMessage)
+      }
+    }
+
+    def deleteDeltaFunc(index: Int,
+                        key: String,
+                        iter: Iterator[Row],
+                        timestamp: String,
+                        rowCountDetailsVO: RowCountDetailsVO):
+    Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] = {
+
+      val result = new DeleteDelataResultImpl()
+      var deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+      val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+      // here key = segment/blockName
+      val blockName = CarbonUpdateUtil
+        .getBlockName(
+          
CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
+      val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
+      var deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new 
DeleteDeltaBlockDetails(blockName)
+      val resultIter = new Iterator[(String, (SegmentUpdateDetails, 
ExecutionErrors))] {
+        val segmentUpdateDetails = new SegmentUpdateDetails()
+        var TID = ""
+        var countOfRows = 0
+        try {
+          while (iter.hasNext) {
+            val oneRow = iter.next
+            TID = oneRow
+              
.get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
+            val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, 
TupleIdEnum.OFFSET)
+            val blockletId = CarbonUpdateUtil
+              .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
+            val IsValidOffset = 
deleteDeltaBlockDetails.addBlocklet(blockletId, offset)
+            // stop delete operation
+            if(!IsValidOffset) {
+              executorErrors.failureCauses = 
FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING
+              executorErrors.errorMsg = "Multiple input rows matched for same 
row."
+              throw new MultipleMatchingException("Multiple input rows matched 
for same row.")
+            }
+            countOfRows = countOfRows + 1
+          }
+
+          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath)
+          val completeBlockName = CarbonTablePath
+            .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, 
TupleIdEnum.BLOCK_ID) +
+              CarbonCommonConstants.FACT_FILE_EXT)
+          val deleteDeletaPath = CarbonUpdateUtil
+            .getDeleteDeltaFilePath(blockPath, blockName, timestamp)
+          val carbonDeleteWriter = new 
CarbonDeleteDeltaWriterImpl(deleteDeletaPath,
+            FileFactory.getFileType(deleteDeletaPath))
+
+
+
+          segmentUpdateDetails.setBlockName(blockName)
+          segmentUpdateDetails.setActualBlockName(completeBlockName)
+          segmentUpdateDetails.setSegmentName(segmentId)
+          segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
+          segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)
+
+          val alreadyDeletedRows: Long = 
rowCountDetailsVO.getDeletedRowsInBlock
+          val totalDeletedRows: Long = alreadyDeletedRows + countOfRows
+          segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString)
+          if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) {
+            
segmentUpdateDetails.setStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
+          }
+          else {
+            // write the delta file
+            carbonDeleteWriter.write(deleteDeltaBlockDetails)
+          }
+
+          deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+        } catch {
+          case e : MultipleMatchingException =>
+            LOGGER.audit(e.getMessage)
+            LOGGER.error(e.getMessage)
+          // dont throw exception here.
+          case e: Exception =>
+            val errorMsg = s"Delete data operation is failed for ${ database 
}.${ tableName }."
+            LOGGER.audit(errorMsg)
+            LOGGER.error(errorMsg + e.getMessage)
+            throw e
+        }
+
+
+        var finished = false
+
+        override def hasNext: Boolean = {
+          if (!finished) {
+            finished = true
+            finished
+          }
+          else {
+            !finished
+          }
+        }
+
+        override def next(): (String, (SegmentUpdateDetails, ExecutionErrors)) 
= {
+          finished = true
+          result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors))
+        }
+      }
+      resultIter
+    }
+    true
+  }
+}
+
+
+
+object UpdateExecution {
+
+  def performUpdate(
+         dataFrame: Dataset[Row],
+         tableIdentifier: Seq[String],
+         plan: LogicalPlan,
+         sparkSession: SparkSession,
+         currentTime: Long,
+         executorErrors: ExecutionErrors): Unit = {
+
+    def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): 
Boolean = {
+
+      val tableName = 
relation.absIdentifier.getCarbonTableIdentifier.getTableName
+      val dbName = 
relation.absIdentifier.getCarbonTableIdentifier.getDatabaseName
+      (tableIdentifier.size > 1 &&
+        tableIdentifier(0) == dbName &&
+        tableIdentifier(1) == tableName) ||
+        (tableIdentifier(0) == tableName)
+    }
+    def getHeader(relation: CarbonDatasourceHadoopRelation, plan: 
LogicalPlan): String = {
+      var header = ""
+      var found = false
+
+      plan match {
+        case Project(pList, _) if (!found) =>
+          found = true
+          header = pList
+            .filter(field => !field.name
+              
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+            .map(col => if 
(col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) {
+              col.name
+                .substring(0, 
col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
+            }
+            else {
+              col.name
+            }).mkString(",")
+      }
+      header
+    }
+    val ex = dataFrame.queryExecution.analyzed
+    val res = ex find {
+      case relation: LogicalRelation if 
(relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+        isDestinationRelation(relation.relation
+          .asInstanceOf[CarbonDatasourceHadoopRelation])) =>
+        true
+      case _ => false
+    }
+    val carbonRelation: CarbonDatasourceHadoopRelation = res match {
+      case Some(relation: LogicalRelation) =>
+        relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+      case _ => sys.error("")
+    }
+
+    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
+
+    val header = getHeader(carbonRelation, plan)
+
+    LoadTable(
+      
Some(carbonRelation.absIdentifier.getCarbonTableIdentifier.getDatabaseName),
+      carbonRelation.absIdentifier.getCarbonTableIdentifier.getTableName,
+      null,
+      Seq(),
+      Map(("fileheader" -> header)),
+      false,
+      null,
+      Some(dataFrame),
+      Some(updateTableModel)).run(sparkSession)
+
+    executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
+    executorErrors.failureCauses = 
updateTableModel.executorErrors.failureCauses
+
+    Seq.empty
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2026970/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 6061e3e..7d94c92 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -18,9 +18,14 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
+import org.apache.spark.sql.execution.{ProjectExec, SparkSqlParser, 
SubqueryExec}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -73,3 +78,136 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
     }
   }
 }
+
+object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
+
+  var sparkSession: SparkSession = _
+
+  def init(sparkSession: SparkSession) {
+     this.sparkSession = sparkSession
+  }
+
+  private def processUpdateQuery(
+      table: UnresolvedRelation,
+      columns: List[String],
+      selectStmt: String,
+      filter: String): LogicalPlan = {
+    var includedDestColumns = false
+    var includedDestRelation = false
+    var addedTupleId = false
+
+    def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = {
+      val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
+        Seq.empty, isDistinct = false), "tupleId")())
+      val projList = Seq(
+        UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId)
+      // include tuple id and rest of the required columns in subqury
+      SubqueryAlias(table.alias.getOrElse(""), Project(projList, relation), 
Option(table.tableIdentifier))
+    }
+    // get the un-analyzed logical plan
+    val targetTable = prepareTargetReleation(table)
+    val selectPlan = new 
SparkSqlParser(sparkSession.sessionState.conf).parsePlan(selectStmt) transform {
+      case Project(projectList, child) if (!includedDestColumns) =>
+        includedDestColumns = true
+        if (projectList.size != columns.size) {
+          sys.error("Number of source and destination columns are not 
matching")
+        }
+        val renamedProjectList = projectList.zip(columns).map{ case(attr, col) 
=>
+          attr match {
+            case UnresolvedAlias(child22, _) =>
+              UnresolvedAlias(Alias(child22, col + "-updatedColumn")())
+            case UnresolvedAttribute(param) =>
+              UnresolvedAlias(Alias(attr, col + "-updatedColumn")())
+             // UnresolvedAttribute(col + "-updatedColumn")
+//              UnresolvedAlias(Alias(child, col + "-updatedColumn")())
+            case _ => attr
+          }
+        }
+        val list = Seq(
+          UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq)))) ++ 
renamedProjectList
+        Project(list, child)
+      case Filter(cond, child) if (!includedDestRelation) =>
+        includedDestRelation = true
+        Filter(cond, Join(child, targetTable, Inner, None))
+      case r @ UnresolvedRelation(t, a) if (!includedDestRelation &&
+                                            t != table.tableIdentifier) =>
+        includedDestRelation = true
+        Join(r, targetTable, Inner, None)
+    }
+    val updatedSelectPlan : LogicalPlan = if (!includedDestRelation) {
+      // special case to handle self join queries
+      // Eg. update tableName  SET (column1) = (column1+1)
+      selectPlan transform {
+        case relation: UnresolvedRelation if (table.tableIdentifier == 
relation.tableIdentifier &&
+                                              addedTupleId == false) =>
+          addedTupleId = true
+          targetTable
+      }
+    } else {
+      selectPlan
+    }
+    val finalPlan = if (filter.length > 0) {
+      val alias = table.alias.getOrElse("")
+      var transformed: Boolean = false
+      // Create a dummy projection to include filter conditions
+      var newPlan: LogicalPlan = null
+      if (table.tableIdentifier.database.isDefined) {
+        newPlan = new 
SparkSqlParser(sparkSession.sessionState.conf).parsePlan("select * from  " +
+                                                                     
table.tableIdentifier.database
+                                                                       
.getOrElse("") + "." +
+                                                                     
table.tableIdentifier.table +
+                                                                     " " + 
alias + " " +
+                                                                     filter)
+      }
+      else {
+        newPlan = new 
SparkSqlParser(sparkSession.sessionState.conf).parsePlan("select * from  " +
+                                                                     
table.tableIdentifier.table +
+                                                                     " " + 
alias + " " +
+                                                                     filter)
+      }
+      newPlan transform {
+        case UnresolvedRelation(t, Some(a)) if (
+          !transformed && t == table.tableIdentifier && a == alias) =>
+          transformed = true
+          // Add the filter condition of update statement  on destination table
+          SubqueryAlias(alias, updatedSelectPlan, 
Option(table.tableIdentifier))
+      }
+    } else {
+      updatedSelectPlan
+    }
+    val tid = 
CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
+    val tidSeq = Seq(getDB.getDatabaseName(tid.database, sparkSession))
+    val destinationTable = UnresolvedRelation(table.tableIdentifier, 
table.alias)
+    ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
+  }
+
+  def processDeleteRecordsQuery(selectStmt: String, table: 
UnresolvedRelation): LogicalPlan = {
+   // val tid = 
CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
+   val tidSeq = Seq(getDB.getDatabaseName(table.tableIdentifier.database, 
sparkSession),
+     table.tableIdentifier.table)
+    var addedTupleId = false
+    val selectPlan = new 
SparkSqlParser(sparkSession.sessionState.conf).parsePlan(selectStmt) transform {
+      case relation: UnresolvedRelation if (table.tableIdentifier == 
relation.tableIdentifier &&
+                                            addedTupleId == false) =>
+        addedTupleId = true
+        val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
+          Seq.empty, isDistinct = false), "tupleId")())
+        val projList = Seq(
+          UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId)
+        // include tuple id in subqury
+        Project(projList, relation)
+    }
+    ProjectForDeleteCommand(
+      selectPlan,
+      tidSeq,
+      System.currentTimeMillis().toString)
+  }
+
+  override def apply(logicalplan: LogicalPlan): LogicalPlan = {
+
+    logicalplan transform {
+      case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, 
sel, where)
+      case DeleteRecords(statement, table) => 
processDeleteRecordsQuery(statement, table)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2026970/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 687afc4..e413840 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -67,6 +67,7 @@ class CarbonSessionCatalog(
   lazy val carbonEnv = {
     val env = new CarbonEnv
     env.init(sparkSession)
+    CarbonIUDAnalysisRule.init(sparkSession)
     env
   }
 
@@ -129,6 +130,7 @@ class CarbonSessionState(sparkSession: SparkSession) 
extends HiveSessionState(sp
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
         CarbonPreInsertionCasts ::
+        CarbonIUDAnalysisRule ::
         AnalyzeCreateTable(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2026970/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index d1a0c90..cc27181 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.command.{ProjectForUpdateCommand, 
RunnableCommand}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.{IntegerType, StringType}
 
@@ -69,7 +69,8 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
         return plan
       }
       LOGGER.info("Starting to optimize plan")
-      val udfTransformedPlan = pushDownUDFToJoinLeftRelation(plan)
+      val iudPlan = processPlan(plan)
+      val udfTransformedPlan = pushDownUDFToJoinLeftRelation(iudPlan)
       val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
       val queryStatistic = new QueryStatistic()
       val result = transformCarbonPlan(udfTransformedPlan, relations)
@@ -113,6 +114,25 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
     output
   }
 
+  private def processPlan(plan: LogicalPlan): LogicalPlan = {
+    plan transform {
+      case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
+        var isTransformed = false
+        val newPlan = updatePlan transform {
+          case Project(pList, child) if (!isTransformed) =>
+            val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = 
pList
+              .splitAt(pList.size - cols.size)
+            val diff = cols.diff(dest.map(_.name))
+            if (diff.size > 0) {
+              sys.error(s"Unknown column(s) ${diff.mkString(",")} in table 
${table.tableName}")
+            }
+            isTransformed = true
+            Project(dest.filter(a => !cols.contains(a.name)) ++ source, child)
+        }
+        ProjectForUpdateCommand(newPlan, Seq(table.tableIdentifier.toString()))
+    }
+  }
+
   def isOptimized(plan: LogicalPlan): Boolean = {
     plan find {
       case cd: CarbonDictionaryCatalystDecoder => true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2026970/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index d1a764f..367aab4 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.ShowLoadsCommand
-import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
+import org.apache.spark.sql.{DeleteRecords, ShowLoadsCommand, UpdateTable}
+import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.types.StructField
@@ -61,7 +63,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
 
   protected lazy val startCommand: Parser[LogicalPlan] =
-    loadManagement| showLoads | alterTable | restructure
+    loadManagement| showLoads | alterTable | restructure | updateTable | 
deleteRecords
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -78,6 +80,128 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         AlterTableCompaction(altertablemodel)
     }
 
+  protected lazy val deleteRecords: Parser[LogicalPlan] =
+    (DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ {
+      case table ~ rest =>
+        val tableName = getTableName(table.tableIdentifier)
+        val alias = table.alias.getOrElse("")
+        DeleteRecords("select tupleId from " + tableName + " " + alias + 
rest.getOrElse(""), table)
+    }
+
+  protected lazy val updateTable: Parser[LogicalPlan] =
+    UPDATE ~> table ~
+    (SET ~> "(" ~> repsep(element, ",") <~ ")") ~
+    ("=" ~> restInput) <~ opt(";") ^^ {
+      case tab ~ columns ~ rest =>
+        val (sel, where) = splitQuery(rest)
+        val (selectStmt, relation) =
+          if (!sel.toLowerCase.startsWith("select ")) {
+            if (sel.trim.isEmpty) {
+              sys.error("At least one source column has to be specified ")
+            }
+            // only list of expression are given, need to convert that list of 
expressions into
+            // select statement on destination table
+            val relation = tab match {
+              case r@UnresolvedRelation(tableIdentifier, alias) =>
+                updateRelation(r, tableIdentifier, alias)
+              case _ => tab
+            }
+            ("select " + sel + " from " + 
getTableName(relation.tableIdentifier) + " " +
+             relation.alias.get, relation)
+          } else {
+            (sel, updateRelation(tab, tab.tableIdentifier, tab.alias))
+          }
+        UpdateTable(relation, columns, selectStmt, where)
+    }
+
+  private def updateRelation(
+      r: UnresolvedRelation,
+      tableIdentifier: Seq[String],
+      alias: Option[String]): UnresolvedRelation = {
+    alias match {
+      case Some(_) => r
+      case _ =>
+        val tableAlias = tableIdentifier match {
+          case Seq(dbName, tableName) => Some(tableName)
+          case Seq(tableName) => Some(tableName)
+        }
+        UnresolvedRelation(tableIdentifier, Option(tableAlias.toString))
+    }
+  }
+
+  protected lazy val element: Parser[String] =
+    (ident <~ ".").? ~ ident ^^ {
+      case table ~ column => column.toLowerCase
+    }
+
+  protected lazy val table: Parser[UnresolvedRelation] = {
+    rep1sep(attributeName, ".") ~ opt(ident) ^^ {
+      case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
+    }
+  }
+
+  private def splitQuery(query: String): (String, String) = {
+    val stack = scala.collection.mutable.Stack[Char]()
+    var foundSingleQuotes = false
+    var foundDoubleQuotes = false
+    var foundEscapeChar = false
+    var ignoreChar = false
+    var stop = false
+    var bracketCount = 0
+    val (selectStatement, where) = query.span {
+      ch => {
+        if (stop) {
+          false
+        } else {
+          ignoreChar = false
+          if (foundEscapeChar && (ch == '\'' || ch == '\"' || ch == '\\')) {
+            foundEscapeChar = false
+            ignoreChar = true
+          }
+          // If escaped single or double quotes found, no need to consider
+          if (!ignoreChar) {
+            if (ch == '\\') {
+              foundEscapeChar = true
+            } else if (ch == '\'') {
+              foundSingleQuotes = !foundSingleQuotes
+            } else if (ch == '\"') {
+              foundDoubleQuotes = !foundDoubleQuotes
+            }
+            else if (ch == '(' && !foundSingleQuotes && !foundDoubleQuotes) {
+              bracketCount = bracketCount + 1
+              stack.push(ch)
+            } else if (ch == ')' && !foundSingleQuotes && !foundDoubleQuotes) {
+              bracketCount = bracketCount + 1
+              stack.pop()
+              if (0 == stack.size) {
+                stop = true
+              }
+            }
+          }
+          true
+        }
+      }
+    }
+    if (bracketCount == 0 || bracketCount % 2 != 0) {
+      sys.error("Parsing error, missing bracket ")
+    }
+    val select = selectStatement.trim
+    (select.substring(1, select.length - 1).trim -> where.trim)
+  }
+
+  protected lazy val attributeName: Parser[String] = acceptMatch("attribute 
name", {
+    case lexical.Identifier(str) => str.toLowerCase
+    case lexical.Keyword(str) if !lexical.delimiters.contains(str) => 
str.toLowerCase
+  })
+
+  private def getTableName(tableIdentifier: Seq[String]): String = {
+    if (tableIdentifier.size > 1) {
+      tableIdentifier(0) + "." + tableIdentifier(1)
+    } else {
+      tableIdentifier(0)
+    }
+  }
+
 
   protected lazy val loadDataNew: Parser[LogicalPlan] =
     LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~

Reply via email to