[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-12-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/carbondata/pull/1508


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-29 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153724471
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 ---
@@ -570,6 +570,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser 
{
 }
   }
 
+  def addPreAggLoadFunction(sql: String): String = {
+addPreAggLoad(new lexical.Scanner(sql.toLowerCase)) match {
+  case Success(query, _) => query
+  case _ => throw new MalformedCarbonCommandException(
--- End diff --

move down line


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-29 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153724030
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
   }
 }
 
+object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+
+plan transform {
+
+  case aggregate@Aggregate(_, aExp, _) if 
validateAggregateExpressions(aExp) =>
+val newExpressions = aExp.flatMap {
+  case alias@Alias(attrExpression: AggregateExpression, _) =>
+attrExpression.aggregateFunction match {
+  case Average(attr: AttributeReference) =>
+Seq(Alias(attrExpression
+  .copy(aggregateFunction = Sum(attr),
+resultId = NamedExpression.newExprId), attr.name + 
"_sum")(),
+  Alias(attrExpression
+.copy(aggregateFunction = Count(attr),
+  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
+  case Average(cast@Cast(attr: AttributeReference, _)) =>
+Seq(Alias(attrExpression
+  .copy(aggregateFunction = Sum(cast),
+resultId = NamedExpression.newExprId),
+  attr.name + "_sum")(),
+  Alias(attrExpression
+.copy(aggregateFunction = Count(cast),
+  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
+  case _ => Seq(alias)
+}
+  case namedExpr: NamedExpression => Seq(namedExpr)
+}
+aggregate.copy(aggregateExpressions = 
newExpressions.asInstanceOf[Seq[NamedExpression]])
+  case plan: LogicalPlan => plan
+}
+  }
+
+  /**
+   * Called by PreAggregateLoadingRules to validate if plan is valid for 
applying rules or not.
+   * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg 
i.e Query UDF then it is
+   * valid.
+   *
+   * @param namedExpression
+   * @return
+   */
+  private def validateAggregateExpressions(namedExpression: 
Seq[NamedExpression]): Boolean = {
+val filteredExpressions = 
namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias])
+filteredExpressions
+  .exists {
+expr => !expr.name.equalsIgnoreCase("PreAgg") &&
+expr.name.equalsIgnoreCase("preAggLoad")
--- End diff --

format properly


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-29 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153723895
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
   }
 }
 
+object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+
+plan transform {
+
+  case aggregate@Aggregate(_, aExp, _) if 
validateAggregateExpressions(aExp) =>
+val newExpressions = aExp.flatMap {
+  case alias@Alias(attrExpression: AggregateExpression, _) =>
+attrExpression.aggregateFunction match {
+  case Average(attr: AttributeReference) =>
+Seq(Alias(attrExpression
+  .copy(aggregateFunction = Sum(attr),
+resultId = NamedExpression.newExprId), attr.name + 
"_sum")(),
+  Alias(attrExpression
+.copy(aggregateFunction = Count(attr),
+  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
+  case Average(cast@Cast(attr: AttributeReference, _)) =>
+Seq(Alias(attrExpression
+  .copy(aggregateFunction = Sum(cast),
+resultId = NamedExpression.newExprId),
+  attr.name + "_sum")(),
+  Alias(attrExpression
+.copy(aggregateFunction = Count(cast),
+  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
--- End diff --

Please format it properly


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-29 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153723567
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 ---
@@ -253,8 +260,8 @@ object PreAggregateUtil {
   carbonTable.getColumnByName(parentTableName, 
attr.name).getColumnId,
   parentTableName,
   parentDatabaseName, parentTableId = parentTableId)
-  case _ =>
-throw new MalformedCarbonCommandException("Un-Supported 
Aggregation Type")
+  case a@_ =>
--- End diff --

Keep as case others


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-29 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153723385
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 ---
@@ -49,13 +50,54 @@ object LoadPostAggregateListener extends 
OperationEventListener {
 carbonLoadModel.getTableName, "false")
 val childTableName = 
dataMapSchema.getRelationIdentifier.getTableName
 val childDatabaseName = 
dataMapSchema.getRelationIdentifier.getDatabaseName
-val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT 
QUERY")
-sparkSession.sql(s"insert into $childDatabaseName.$childTableName 
$selectQuery")
+val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
+  .addPreAggLoadFunction(s"${ 
dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } "))
+  .drop("preAggLoad")
+val headers = 
dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
+  .mkString(",")
+try {
+  LoadTableCommand(Some(childDatabaseName),
+childTableName,
+null,
+Nil,
+Map("fileheader" -> headers),
+isOverwriteTable = false,
+dataFrame = Some(childDataFrame),
+internalOptions = 
Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"))
+.run(sparkSession)
+} finally {
+  
CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+carbonLoadModel.getDatabaseName + "." +
+carbonLoadModel.getTableName)
+  
CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+carbonLoadModel.getDatabaseName + "." +
+carbonLoadModel.getTableName)
+}
   }
 }
   }
 }
 
+object LoadPreAggregateTablePreListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): 
Unit = {
+val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent]
+val carbonLoadModel = loadEvent.carbonLoadModel
+val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
+if (table.isChildDataMap && !isInternalLoadCall) {
+  throw new UnsupportedOperationException(
+"Cannot insert/load data directly into pre-aggregate table")
+}
+
--- End diff --

remove line


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-29 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153722923
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 ---
@@ -49,13 +50,54 @@ object LoadPostAggregateListener extends 
OperationEventListener {
 carbonLoadModel.getTableName, "false")
 val childTableName = 
dataMapSchema.getRelationIdentifier.getTableName
 val childDatabaseName = 
dataMapSchema.getRelationIdentifier.getDatabaseName
-val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT 
QUERY")
-sparkSession.sql(s"insert into $childDatabaseName.$childTableName 
$selectQuery")
+val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
+  .addPreAggLoadFunction(s"${ 
dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } "))
+  .drop("preAggLoad")
+val headers = 
dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
--- End diff --

move down


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-29 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153722722
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 ---
@@ -94,10 +95,21 @@ case class CreatePreAggregateTableCommand(
   dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
   // updating the parent table about child table
   PreAggregateUtil.updateMainTable(parentDbName, parentTableName, 
childSchema, sparkSession)
-  val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
-  if (loadAvailable) {
-sparkSession.sql(
-  s"insert into ${ tableModel.databaseName }.${ 
tableModel.tableName } $queryString")
+  val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable)
+  if (availableLoads) {
+val headers = 
childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
+  .mkString(",")
+val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
--- End diff --

Move down after (


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-29 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153722623
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 ---
@@ -94,10 +95,21 @@ case class CreatePreAggregateTableCommand(
   dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
   // updating the parent table about child table
   PreAggregateUtil.updateMainTable(parentDbName, parentTableName, 
childSchema, sparkSession)
-  val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
-  if (loadAvailable) {
-sparkSession.sql(
-  s"insert into ${ tableModel.databaseName }.${ 
tableModel.tableName } $queryString")
+  val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable)
+  if (availableLoads) {
+val headers = 
childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
--- End diff --

Move line done


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-29 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153722006
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
 ---
@@ -221,7 +221,6 @@ object DataLoadingUtil {
 ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat")
 ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat")
 ValidateUtil.validateSortScope(table, sort_scope)
-
--- End diff --

don 't change file unnecessarly


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-26 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153068121
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 ---
@@ -94,10 +95,22 @@ case class CreatePreAggregateTableCommand(
   dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
   // updating the parent table about child table
   PreAggregateUtil.updateMainTable(parentDbName, parentTableName, 
childSchema, sparkSession)
-  val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
-  if (loadAvailable) {
-sparkSession.sql(
-  s"insert into ${ tableModel.databaseName }.${ 
tableModel.tableName } $queryString")
+  val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable)
+  if (availableLoads) {
+val headers = 
childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
+  .mkString(",")
+val childDataFrame = Dataset.ofRows(sparkSession, new 
CarbonSpark2SqlParser()
+  .parse(s"insert into ${ tableModel.databaseName }.${
+tableModel.tableName} $queryString"))
--- End diff --

Here why don't use preAggLoad UDF?


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-26 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153068092
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 ---
@@ -49,13 +50,52 @@ object LoadPostAggregateListener extends 
OperationEventListener {
 carbonLoadModel.getTableName, "false")
 val childTableName = 
dataMapSchema.getRelationIdentifier.getTableName
 val childDatabaseName = 
dataMapSchema.getRelationIdentifier.getDatabaseName
-val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT 
QUERY")
-sparkSession.sql(s"insert into $childDatabaseName.$childTableName 
$selectQuery")
+val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
+  
.addPreAggLoadFunction(s"${dataMapSchema.getProperties.get("CHILD_SELECT 
QUERY")} ")).drop("preAggLoad")
+val headers = 
dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
+  .mkString(",")
+try {
+  LoadTableCommand(Some(childDatabaseName),
+childTableName,
+null,
+Nil,
+Map("fileheader" -> headers),
+isOverwriteTable = false,
+dataFrame = Some(childDataFrame),
+internalOptions = 
Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).run(sparkSession)
+} finally {
+  
CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+carbonLoadModel.getDatabaseName + "." +
+carbonLoadModel.getTableName)
+  
CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+carbonLoadModel.getDatabaseName + "." +
+carbonLoadModel.getTableName)
--- End diff --

Indentation is wrong, format properly


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-26 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153068009
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---
@@ -55,6 +55,7 @@ class CarbonEnv {
 // added for handling preaggregate table creation. when user will fire 
create ddl for
 // create table we are adding a udf so no need to apply PreAggregate 
rules.
 sparkSession.udf.register("preAgg", () => "")
+sparkSession.udf.register("preAggLoad", () => "")
--- End diff --

Add comment about usage of this udf


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-26 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153067988
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -736,6 +739,45 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
   }
 }
 
+object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+
+plan transform {
+  case aggregate@Aggregate(_, aExp, _) =>
+val isLoadPlan = aExp.exists(_.name.equalsIgnoreCase("preAggLoad"))
--- End diff --

Even this condition also move to `case ` block


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-26 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153067934
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -736,6 +739,45 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
   }
 }
 
+object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+
+plan transform {
+  case aggregate@Aggregate(_, aExp, _) =>
+val isLoadPlan = aExp.exists(_.name.equalsIgnoreCase("preAggLoad"))
+if (aExp.exists(_.name.equalsIgnoreCase("PreAgg"))) {
--- End diff --

move this `if` condition to the `case` block


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-26 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r153067591
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 ---
@@ -94,10 +95,22 @@ case class CreatePreAggregateTableCommand(
   dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
   // updating the parent table about child table
   PreAggregateUtil.updateMainTable(parentDbName, parentTableName, 
childSchema, sparkSession)
-  val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
-  if (loadAvailable) {
-sparkSession.sql(
-  s"insert into ${ tableModel.databaseName }.${ 
tableModel.tableName } $queryString")
+  val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable)
+  if (availableLoads) {
+val headers = 
childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
+  .mkString(",")
+val childDataFrame = Dataset.ofRows(sparkSession, new 
CarbonSpark2SqlParser()
+  .parse(s"insert into ${ tableModel.databaseName }.${
+tableModel.tableName} $queryString"))
--- End diff --

why it is not just `queryString`? why insertinto required here as you are 
already using load command


---


[GitHub] carbondata pull request #1508: [CARBONDATA-1738] [PreAgg] Block direct inser...

2017-11-22 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1508#discussion_r152597828
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
 ---
@@ -95,8 +95,14 @@ object CarbonSetCommand {
   }
 } else if 
(key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) {
   sessionParams.addProperty(key.toLowerCase(), value)
+} else if 
(key.startsWith(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL)) {
--- End diff --

I don't think it is required to use `set command` for this internal call. 
we are not going to give the option to load the aggregate table as it may 
corrupt the table.


---