[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user asfgit closed the pull request at: https://github.com/apache/carbondata/pull/1508 ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153724471 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -570,6 +570,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } } + def addPreAggLoadFunction(sql: String): String = { +addPreAggLoad(new lexical.Scanner(sql.toLowerCase)) match { + case Success(query, _) => query + case _ => throw new MalformedCarbonCommandException( --- End diff -- move down line ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153724030 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + +plan transform { + + case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) => +val newExpressions = aExp.flatMap { + case alias@Alias(attrExpression: AggregateExpression, _) => +attrExpression.aggregateFunction match { + case Average(attr: AttributeReference) => +Seq(Alias(attrExpression + .copy(aggregateFunction = Sum(attr), +resultId = NamedExpression.newExprId), attr.name + "_sum")(), + Alias(attrExpression +.copy(aggregateFunction = Count(attr), + resultId = NamedExpression.newExprId), attr.name + "_count")()) + case Average(cast@Cast(attr: AttributeReference, _)) => +Seq(Alias(attrExpression + .copy(aggregateFunction = Sum(cast), +resultId = NamedExpression.newExprId), + attr.name + "_sum")(), + Alias(attrExpression +.copy(aggregateFunction = Count(cast), + resultId = NamedExpression.newExprId), attr.name + "_count")()) + case _ => Seq(alias) +} + case namedExpr: NamedExpression => Seq(namedExpr) +} +aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]]) + case plan: LogicalPlan => plan +} + } + + /** + * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not. + * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is + * valid. + * + * @param namedExpression + * @return + */ + private def validateAggregateExpressions(namedExpression: Seq[NamedExpression]): Boolean = { +val filteredExpressions = namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias]) +filteredExpressions + .exists { +expr => !expr.name.equalsIgnoreCase("PreAgg") && +expr.name.equalsIgnoreCase("preAggLoad") --- End diff -- format properly ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153723895 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + +plan transform { + + case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) => +val newExpressions = aExp.flatMap { + case alias@Alias(attrExpression: AggregateExpression, _) => +attrExpression.aggregateFunction match { + case Average(attr: AttributeReference) => +Seq(Alias(attrExpression + .copy(aggregateFunction = Sum(attr), +resultId = NamedExpression.newExprId), attr.name + "_sum")(), + Alias(attrExpression +.copy(aggregateFunction = Count(attr), + resultId = NamedExpression.newExprId), attr.name + "_count")()) + case Average(cast@Cast(attr: AttributeReference, _)) => +Seq(Alias(attrExpression + .copy(aggregateFunction = Sum(cast), +resultId = NamedExpression.newExprId), + attr.name + "_sum")(), + Alias(attrExpression +.copy(aggregateFunction = Count(cast), + resultId = NamedExpression.newExprId), attr.name + "_count")()) --- End diff -- Please format it properly ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153723567 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -253,8 +260,8 @@ object PreAggregateUtil { carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName, parentDatabaseName, parentTableId = parentTableId) - case _ => -throw new MalformedCarbonCommandException("Un-Supported Aggregation Type") + case a@_ => --- End diff -- Keep as case others ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153723385 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -49,13 +50,54 @@ object LoadPostAggregateListener extends OperationEventListener { carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName -val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY") -sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery") +val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + .addPreAggLoadFunction(s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")) + .drop("preAggLoad") +val headers = dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") +try { + LoadTableCommand(Some(childDatabaseName), +childTableName, +null, +Nil, +Map("fileheader" -> headers), +isOverwriteTable = false, +dataFrame = Some(childDataFrame), +internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")) +.run(sparkSession) +} finally { + CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + +carbonLoadModel.getDatabaseName + "." + +carbonLoadModel.getTableName) + CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + +carbonLoadModel.getDatabaseName + "." + +carbonLoadModel.getTableName) +} } } } } +object LoadPreAggregateTablePreListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { +val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent] +val carbonLoadModel = loadEvent.carbonLoadModel +val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable +val isInternalLoadCall = carbonLoadModel.isAggLoadRequest +if (table.isChildDataMap && !isInternalLoadCall) { + throw new UnsupportedOperationException( +"Cannot insert/load data directly into pre-aggregate table") +} + --- End diff -- remove line ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153722923 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -49,13 +50,54 @@ object LoadPostAggregateListener extends OperationEventListener { carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName -val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY") -sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery") +val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + .addPreAggLoadFunction(s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")) + .drop("preAggLoad") +val headers = dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) --- End diff -- move down ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153722722 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -94,10 +95,21 @@ case class CreatePreAggregateTableCommand( dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) // updating the parent table about child table PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) - if (loadAvailable) { -sparkSession.sql( - s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") + val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable) + if (availableLoads) { +val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") +val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() --- End diff -- Move down after ( ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153722623 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -94,10 +95,21 @@ case class CreatePreAggregateTableCommand( dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) // updating the parent table about child table PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) - if (loadAvailable) { -sparkSession.sql( - s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") + val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable) + if (availableLoads) { +val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) --- End diff -- Move line done ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153722006 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala --- @@ -221,7 +221,6 @@ object DataLoadingUtil { ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat") ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat") ValidateUtil.validateSortScope(table, sort_scope) - --- End diff -- don 't change file unnecessarly ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153068121 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -94,10 +95,22 @@ case class CreatePreAggregateTableCommand( dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) // updating the parent table about child table PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) - if (loadAvailable) { -sparkSession.sql( - s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") + val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable) + if (availableLoads) { +val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") +val childDataFrame = Dataset.ofRows(sparkSession, new CarbonSpark2SqlParser() + .parse(s"insert into ${ tableModel.databaseName }.${ +tableModel.tableName} $queryString")) --- End diff -- Here why don't use preAggLoad UDF? ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153068092 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -49,13 +50,52 @@ object LoadPostAggregateListener extends OperationEventListener { carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName -val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY") -sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery") +val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + .addPreAggLoadFunction(s"${dataMapSchema.getProperties.get("CHILD_SELECT QUERY")} ")).drop("preAggLoad") +val headers = dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") +try { + LoadTableCommand(Some(childDatabaseName), +childTableName, +null, +Nil, +Map("fileheader" -> headers), +isOverwriteTable = false, +dataFrame = Some(childDataFrame), +internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).run(sparkSession) +} finally { + CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + +carbonLoadModel.getDatabaseName + "." + +carbonLoadModel.getTableName) + CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + +carbonLoadModel.getDatabaseName + "." + +carbonLoadModel.getTableName) --- End diff -- Indentation is wrong, format properly ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153068009 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -55,6 +55,7 @@ class CarbonEnv { // added for handling preaggregate table creation. when user will fire create ddl for // create table we are adding a udf so no need to apply PreAggregate rules. sparkSession.udf.register("preAgg", () => "") +sparkSession.udf.register("preAggLoad", () => "") --- End diff -- Add comment about usage of this udf ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153067988 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -736,6 +739,45 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + +plan transform { + case aggregate@Aggregate(_, aExp, _) => +val isLoadPlan = aExp.exists(_.name.equalsIgnoreCase("preAggLoad")) --- End diff -- Even this condition also move to `case ` block ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153067934 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -736,6 +739,45 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + +plan transform { + case aggregate@Aggregate(_, aExp, _) => +val isLoadPlan = aExp.exists(_.name.equalsIgnoreCase("preAggLoad")) +if (aExp.exists(_.name.equalsIgnoreCase("PreAgg"))) { --- End diff -- move this `if` condition to the `case` block ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r153067591 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -94,10 +95,22 @@ case class CreatePreAggregateTableCommand( dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) // updating the parent table about child table PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) - if (loadAvailable) { -sparkSession.sql( - s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") + val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable) + if (availableLoads) { +val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") +val childDataFrame = Dataset.ofRows(sparkSession, new CarbonSpark2SqlParser() + .parse(s"insert into ${ tableModel.databaseName }.${ +tableModel.tableName} $queryString")) --- End diff -- why it is not just `queryString`? why insertinto required here as you are already using load command ---
[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1508#discussion_r152597828 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala --- @@ -95,8 +95,14 @@ object CarbonSetCommand { } } else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) { sessionParams.addProperty(key.toLowerCase(), value) +} else if (key.startsWith(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL)) { --- End diff -- I don't think it is required to use `set command` for this internal call. we are not going to give the option to load the aggregate table as it may corrupt the table. ---