This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 00d2fe930e713958e052e3a738616a852f1dbfe7 Author: Manhua <kevin...@qq.com> AuthorDate: Wed Sep 25 11:47:01 2019 +0800 [CARBONDATA-3501] Fix update table with varchar column Problem Update on table with varchar column will throw exception Analyse In the loading part of update operation, it gets the isVarcharTypeMapping for each column in the order when table created. And this gives a hint for checking string length. It does not allow to exceeds 32000 char for a column which is not varchar type. However when changing the plan for updating in CarbonIUDRule, it first deletes the old expression and appends the new one, which makes the order differ to table created. Such that the string length checking fail. Solution Keep the order as table created when modify update plan This closes #3398 --- .../longstring/VarcharDataTypesBasicTestCase.scala | 10 ++++++++++ .../command/management/CarbonLoadDataCommand.scala | 2 +- .../org/apache/spark/sql/hive/CarbonAnalysisRules.scala | 4 ++-- .../org/apache/spark/sql/optimizer/CarbonIUDRule.scala | 17 ++++++++++++++--- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala index 4fd2cc0..9719cfc 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala @@ -389,6 +389,16 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi sql("DROP TABLE IF EXISTS varchar_complex_table") } + + test("update table with long string column") { + prepareTable() + // update non-varchar column + sql(s"update $longStringTable set(id)=(0) where name is not null").show() + // update varchar column + sql(s"update $longStringTable set(description)=('empty') where name is not null").show() + // update non-varchar&varchar column + sql(s"update $longStringTable set(description, id)=('sth.', 1) where name is not null").show() + } // ignore this test in CI, because it will need at least 4GB memory to run successfully ignore("Exceed 2GB per column page for varchar datatype") { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 6a03eab..b2f9a1e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -1060,7 +1060,7 @@ case class CarbonLoadDataCommand( val dropAttributes = df.logicalPlan.output.dropRight(1) val finalOutput = catalogTable.schema.map { attr => dropAttributes.find { d => - val index = d.name.lastIndexOf("-updatedColumn") + val index = d.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION) if (index > 0) { d.name.substring(0, index).equalsIgnoreCase(attr.name) } else { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index 9b923b0..d11bf1e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -122,9 +122,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica val renamedProjectList = projectList.zip(columns).map { case (attr, col) => attr match { case UnresolvedAlias(child22, _) => - UnresolvedAlias(Alias(child22, col + "-updatedColumn")()) + UnresolvedAlias(Alias(child22, col + CarbonCommonConstants.UPDATED_COL_EXTENSION)()) case UnresolvedAttribute(_) => - UnresolvedAlias(Alias(attr, col + "-updatedColumn")()) + UnresolvedAlias(Alias(attr, col + CarbonCommonConstants.UPDATED_COL_EXTENSION)()) case _ => attr } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala index ae5825d..da1ca55 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.mutation.CarbonProjectForUpdateCommand +import org.apache.carbondata.core.constants.CarbonCommonConstants + /** * Rule specific for IUD operations */ @@ -37,9 +39,9 @@ class CarbonIUDRule extends Rule[LogicalPlan] with PredicateHelper { var isTransformed = false val newPlan = updatePlan transform { case Project(pList, child) if !isTransformed => - val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList + var (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList .splitAt(pList.size - cols.size) - val diff = cols.diff(dest.map(_.name.toLowerCase)) + // check complex column cols.foreach { col => val complexExists = "\"name\":\"" + col + "\"" if (dest.exists(m => m.dataType.json.contains(complexExists))) { @@ -47,11 +49,20 @@ class CarbonIUDRule extends Rule[LogicalPlan] with PredicateHelper { "Unsupported operation on Complex data type") } } + // check updated columns exists in table + val diff = cols.diff(dest.map(_.name.toLowerCase)) if (diff.nonEmpty) { sys.error(s"Unknown column(s) ${ diff.mkString(",") } in table ${ table.tableName }") } + // modify plan for updated column *in place* isTransformed = true - Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child) + source.foreach { col => + val colName = col.name.substring(0, + col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)) + val updateIdx = dest.indexWhere(_.name.equalsIgnoreCase(colName)) + dest = dest.updated(updateIdx, col) + } + Project(dest, child) } CarbonProjectForUpdateCommand( newPlan, table.tableIdentifier.database, table.tableIdentifier.table, cols)