This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 6c9bbfe8ec5606262bbbbb93b101986352578b2b Author: Zhang Zhichao <441586...@qq.com> AuthorDate: Tue Aug 13 11:00:23 2019 +0800 [CARBONDATA-3491] Return updated/deleted rows count when execute update/delete sql Return updated/deleted rows count when execute update/delete sql This closes #3357 --- .../testsuite/iud/DeleteCarbonTableTestCase.scala | 19 +++++++++++++ .../testsuite/iud/UpdateCarbonTableTestCase.scala | 33 ++++++++++++++++++++++ .../scala/org/apache/carbondata/spark/KeyVal.scala | 10 +++---- .../apache/spark/util/CarbonReflectionUtils.scala | 16 +++++++++++ .../apache/spark/sql/CarbonCatalystOperators.scala | 6 ++-- .../mutation/CarbonProjectForDeleteCommand.scala | 21 ++++++++++---- .../mutation/CarbonProjectForUpdateCommand.scala | 19 ++++++++----- .../command/mutation/DeleteExecution.scala | 27 ++++++++++-------- .../spark/sql/hive/CarbonAnalysisRules.scala | 12 +++++++- 9 files changed, 129 insertions(+), 34 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala index f26283b..4565d7a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala @@ -361,6 +361,25 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table if exists decimal_table") } + test("[CARBONDATA-3491] Return updated/deleted rows count when execute update/delete sql") { + sql("drop table if exists test_return_row_count") + + sql("create table test_return_row_count (a string, b string, c string) stored by 'carbondata'").show() + sql("insert into test_return_row_count select 'aaa','bbb','ccc'").show() + sql("insert into test_return_row_count select 'bbb','bbb','ccc'").show() + sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show() + sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show() + + checkAnswer(sql("delete from test_return_row_count where a = 'aaa'"), + Seq(Row(1)) + ) + checkAnswer(sql("select * from test_return_row_count"), + Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "bbb", "ccc"), Row("ccc", "bbb", "ccc")) + ) + + sql("drop table if exists test_return_row_count").show() + } + override def afterAll { sql("use default") sql("drop database if exists iud_db cascade") diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index cf45600..ef18035 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -826,6 +826,39 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("""drop table iud.dest11""").show } + test("[CARBONDATA-3491] Return updated/deleted rows count when execute update/delete sql") { + sql("drop table if exists test_return_row_count") + sql("drop table if exists test_return_row_count_source") + + sql("create table test_return_row_count (a string, b string, c string) stored by 'carbondata'").show() + sql("insert into test_return_row_count select 'bbb','bbb','ccc'").show() + sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show() + sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show() + + sql("create table test_return_row_count_source (a string, b string, c string) stored by 'carbondata'").show() + sql("insert into test_return_row_count_source select 'aaa','eee','ccc'").show() + sql("insert into test_return_row_count_source select 'bbb','bbb','ccc'").show() + sql("insert into test_return_row_count_source select 'ccc','bbb','ccc'").show() + sql("insert into test_return_row_count_source select 'ccc','bbb','ccc'").show() + + checkAnswer(sql("update test_return_row_count set (b) = ('ddd') where a = 'ccc'"), + Seq(Row(2)) + ) + checkAnswer(sql("select * from test_return_row_count"), + Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "ddd", "ccc"), Row("ccc", "ddd", "ccc")) + ) + + checkAnswer(sql("update test_return_row_count t set (t.b) = (select s.b from test_return_row_count_source s where s.a = 'aaa') where t.a = 'ccc'"), + Seq(Row(2)) + ) + checkAnswer(sql("select * from test_return_row_count"), + Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "eee", "ccc"), Row("ccc", "eee", "ccc")) + ) + + sql("drop table if exists test_return_row_count") + sql("drop table if exists test_return_row_count_source") + } + override def afterAll { sql("use default") sql("drop database if exists iud cascade") diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala index c4b1144..9fca245 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala @@ -76,22 +76,20 @@ class updateResultImpl } trait DeleteDelataResult[K, V] extends Serializable { - def getKey(key: SegmentStatus, value: (SegmentUpdateDetails, ExecutionErrors)): (K, V) + def getKey(key: SegmentStatus, value: (SegmentUpdateDetails, ExecutionErrors, Long)): (K, V) } class DeleteDelataResultImpl - extends DeleteDelataResult[SegmentStatus, (SegmentUpdateDetails, ExecutionErrors)] { + extends DeleteDelataResult[SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long)] { override def getKey(key: SegmentStatus, - value: (SegmentUpdateDetails, ExecutionErrors)): (SegmentStatus, (SegmentUpdateDetails, - ExecutionErrors)) = { + value: (SegmentUpdateDetails, ExecutionErrors, Long)): (SegmentStatus, (SegmentUpdateDetails, + ExecutionErrors, Long)) = { (key, value) } } - trait PartitionResult[K, V] extends Serializable { def getKey(key: Int, value: Boolean): (K, V) - } class PartitionResultImpl extends PartitionResult[Int, Boolean] { diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index 4fc30d02..46692df 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -26,6 +26,7 @@ import org.apache.spark.{SPARK_VERSION, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} @@ -381,4 +382,19 @@ object CarbonReflectionUtils { nameField.setAccessible(true) nameField.set(caseObj, objToSet) } + + def invokeAnalyzerExecute(analyzer: Analyzer, + plan: LogicalPlan): LogicalPlan = { + if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) { + val method: Method = analyzer.getClass + .getMethod("execute", classOf[LogicalPlan]) + method.invoke(analyzer, plan).asInstanceOf[LogicalPlan] + } else if (SparkUtil.isSparkVersionEqualTo("2.3")) { + val method: Method = analyzer.getClass + .getMethod("executeAndCheck", classOf[LogicalPlan]) + method.invoke(analyzer, plan).asInstanceOf[LogicalPlan] + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index 160c785..5f745d2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -68,8 +68,8 @@ case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attr case class ProjectForUpdate( table: UnresolvedRelation, columns: List[String], - children: Seq[LogicalPlan] ) extends LogicalPlan { - override def output: Seq[AttributeReference] = Seq.empty + children: Seq[LogicalPlan]) extends LogicalPlan { + override def output: Seq[Attribute] = Seq.empty } case class UpdateTable( @@ -79,7 +79,7 @@ case class UpdateTable( alias: Option[String] = None, filer: String) extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq.empty - override def output: Seq[AttributeReference] = Seq.empty + override def output: Seq[Attribute] = Seq.empty } case class DeleteRecords( diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index ae1d848..45c73ac 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql.execution.command.mutation import scala.collection.JavaConverters._ import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.types.LongType import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory @@ -48,6 +50,10 @@ private[sql] case class CarbonProjectForDeleteCommand( timestamp: String) extends DataCommand { + override val output: Seq[Attribute] = { + Seq(AttributeReference("Deleted Row Count", LongType, nullable = false)()) + } + override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) @@ -104,7 +110,7 @@ private[sql] case class CarbonProjectForDeleteCommand( // handle the clean up of IUD. CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) - val deletedSegments = DeleteExecution.deleteDeltaExecution( + val (deletedSegments, deletedRowCount) = DeleteExecution.deleteDeltaExecution( databaseNameOp, tableName, sparkSession, @@ -112,16 +118,18 @@ private[sql] case class CarbonProjectForDeleteCommand( timestamp, isUpdateOperation = false, executorErrors) + + // Check for any failures occured during delete delta execution + if (executorErrors.failureCauses != FailureCauses.NONE) { + throw new Exception(executorErrors.errorMsg) + } + // call IUD Compaction. HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable, isUpdateOperation = false) DeleteExecution.clearDistributedSegmentCache(carbonTable, deletedSegments)(sparkSession) - if (executorErrors.failureCauses != FailureCauses.NONE) { - throw new Exception(executorErrors.errorMsg) - } - val allDataMapSchemas = DataMapStoreManager.getInstance .getDataMapSchemasOfTable(carbonTable).asScala .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier && @@ -134,11 +142,13 @@ private[sql] case class CarbonProjectForDeleteCommand( val deleteFromTablePostEvent: DeleteFromTablePostEvent = DeleteFromTablePostEvent(sparkSession, carbonTable) OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext) + Seq(Row(deletedRowCount)) } catch { case e: HorizontalCompactionException => LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." + " Please check logs. " + e.getMessage) CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) + Seq(Row(0L)) case e: Exception => LOGGER.error("Exception in Delete data operation " + e.getMessage, e) @@ -160,7 +170,6 @@ private[sql] case class CarbonProjectForDeleteCommand( updateLock.unlock() compactionLock.unlock() } - Seq.empty } override protected def opName: String = "DELETE DATA" diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index b620e38..686990d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -20,11 +20,12 @@ package org.apache.spark.sql.execution.command.mutation import scala.collection.JavaConverters._ import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.types.ArrayType +import org.apache.spark.sql.types.{ArrayType, LongType} import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException @@ -50,8 +51,13 @@ private[sql] case class CarbonProjectForUpdateCommand( columns: List[String]) extends DataCommand { + override val output: Seq[Attribute] = { + Seq(AttributeReference("Updated Row Count", LongType, nullable = false)()) + } + override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + var updatedRowCount = 0L IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan) val res = plan find { case relation: LogicalRelation if relation.relation @@ -61,7 +67,7 @@ private[sql] case class CarbonProjectForUpdateCommand( } if (res.isEmpty) { - return Seq.empty + return Array(Row(updatedRowCount)).toSeq } val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) if (carbonTable.getPartitionInfo != null && @@ -71,6 +77,7 @@ private[sql] case class CarbonProjectForUpdateCommand( throw new UnsupportedOperationException("Unsupported update operation for range/" + "hash/list partition table") } + setAuditTable(carbonTable) setAuditInfo(Map("plan" -> plan.simpleString)) columns.foreach { col => @@ -135,7 +142,7 @@ private[sql] case class CarbonProjectForUpdateCommand( CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) // do delete operation. - val segmentsToBeDeleted = DeleteExecution.deleteDeltaExecution( + val (segmentsToBeDeleted, updatedRowCountTemp) = DeleteExecution.deleteDeltaExecution( databaseNameOp, tableName, sparkSession, @@ -148,6 +155,7 @@ private[sql] case class CarbonProjectForUpdateCommand( throw new Exception(executionErrors.errorMsg) } + updatedRowCount = updatedRowCountTemp // do update operation. performUpdate(dataSet, databaseNameOp, @@ -217,7 +225,7 @@ private[sql] case class CarbonProjectForUpdateCommand( CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK) } } - Seq.empty + Seq(Row(updatedRowCount)) } private def performUpdate( @@ -304,9 +312,6 @@ private[sql] case class CarbonProjectForUpdateCommand( executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses - - Seq.empty - } override protected def opName: String = "UPDATE DATA" diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index f9428a2..cb86cb5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -64,14 +64,15 @@ object DeleteExecution { dataRdd: RDD[Row], timestamp: String, isUpdateOperation: Boolean, - executorErrors: ExecutionErrors): Seq[Segment] = { + executorErrors: ExecutionErrors): (Seq[Segment], Long) = { - var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null + var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]] = null val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val tablePath = absoluteTableIdentifier.getTablePath var segmentsTobeDeleted = Seq.empty[Segment] + var operatedRowCount = 0L val deleteRdd = if (isUpdateOperation) { val schema = @@ -97,7 +98,7 @@ object DeleteExecution { // if no loads are present then no need to do anything. if (keyRdd.partitions.length == 0) { - return segmentsTobeDeleted + return (segmentsTobeDeleted, operatedRowCount) } val blockMappingVO = carbonInputFormat.getBlockRowCount( @@ -124,9 +125,9 @@ object DeleteExecution { val rdd = rowContRdd.join(keyRdd) res = rdd.mapPartitionsWithIndex( (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) => - Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] { + Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]] { ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) - var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]() + var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]() while (records.hasNext) { val ((key), (rowCountDetailsVO, groupedRows)) = records.next val segmentId = key.substring(0, key.indexOf(CarbonCommonConstants.FILE_SEPARATOR)) @@ -143,7 +144,7 @@ object DeleteExecution { // if no loads are present then no need to do anything. if (res.flatten.isEmpty) { - return segmentsTobeDeleted + return (segmentsTobeDeleted, operatedRowCount) } // update new status file @@ -217,7 +218,7 @@ object DeleteExecution { timestamp: String, rowCountDetailsVO: RowCountDetailsVO, isStandardTable: Boolean - ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))] = { + ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))] = { val result = new DeleteDelataResultImpl() var deleteStatus = SegmentStatus.LOAD_FAILURE @@ -228,7 +229,8 @@ object DeleteExecution { CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1))) val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0) val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName) - val resultIter = new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))] { + val resultIter = + new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))] { val segmentUpdateDetails = new SegmentUpdateDetails() var TID = "" var countOfRows = 0 @@ -305,15 +307,18 @@ object DeleteExecution { } } - override def next(): (SegmentStatus, (SegmentUpdateDetails, ExecutionErrors)) = { + override def next(): (SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long)) = { finished = true - result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors)) + result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors, countOfRows.toLong)) } } resultIter } - segmentsTobeDeleted + if (executorErrors.failureCauses == FailureCauses.NONE) { + operatedRowCount = res.flatten.map(_._2._3).sum + } + (segmentsTobeDeleted, operatedRowCount) } def clearDistributedSegmentCache(carbonTable: CarbonTable, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index 9ba2301..9b923b0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -40,6 +40,8 @@ import org.apache.carbondata.core.util.CarbonUtil case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { private lazy val parser = sparkSession.sessionState.sqlParser + private lazy val optimizer = sparkSession.sessionState.optimizer + private lazy val analyzer = sparkSession.sessionState.analyzer private def processUpdateQuery( table: UnresolvedRelation, @@ -181,7 +183,15 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica } val destinationTable = CarbonReflectionUtils.getUnresolvedRelation(table.tableIdentifier, alias) - ProjectForUpdate(destinationTable, columns, Seq(finalPlan)) + // In Spark 2.1 and 2.2, it uses Analyzer.execute method to transform LogicalPlan + // but in Spark 2.3, it uses Analyzer.executeAndCheck method + val analyzedPlan = CarbonReflectionUtils.invokeAnalyzerExecute( + analyzer, ProjectForUpdate(destinationTable, columns, Seq(finalPlan))) + // For all commands, they execute eagerly, and will be transformed to + // logical plan 'LocalRelation' in analyze phase(please see the code in 'Dataset.logicalPlan'), + // so it needs to return logical plan 'CarbonProjectForUpdateCommand' here + // instead of 'ProjectForUpdate' + optimizer.execute(analyzedPlan) }