[GitHub] carbondata pull request #1542: [CARBONDATA-1757] [PreAgg] Fix for wrong avg ...

2017-11-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/carbondata/pull/1542


---


[GitHub] carbondata pull request #1542: [CARBONDATA-1757] [PreAgg] Fix for wrong avg ...

2017-11-29 Thread kunal642
Github user kunal642 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1542#discussion_r153995580
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession: 
SparkSession) extends Rule[Logi
* @return
*/
   private def transformAggregatePlan(logicalPlan: LogicalPlan): 
LogicalPlan = {
+val validExpressionsMap = 
scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
 logicalPlan transform {
   case aggregate@Aggregate(_, aExp, _) =>
-val newExpressions = aExp.flatMap {
-  case alias@Alias(attrExpression: AggregateExpression, _) =>
-attrExpression.aggregateFunction match {
-  case Average(attr: AttributeReference) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(attr),
-resultId = NamedExpression.newExprId), attr.name + 
"_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(attr),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case Average(cast@MatchCast(attr: AttributeReference, _)) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(cast),
-resultId = NamedExpression.newExprId),
-  attr.name + "_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(cast),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case _ => Seq(alias)
-}
-  case namedExpr: NamedExpression => Seq(namedExpr)
+aExp.foreach {
+  case alias: Alias =>
+validExpressionsMap ++= 
validateAggregateFunctionAndGetAlias(alias)
+  case namedExpr: NamedExpression => 
validExpressionsMap.put(namedExpr.name, namedExpr)
 }
-aggregate.copy(aggregateExpressions = 
newExpressions.asInstanceOf[Seq[NamedExpression]])
+aggregate
+  .copy(aggregateExpressions = validExpressionsMap.values.toSeq)
   case plan: LogicalPlan => plan
 }
   }
+
+  /**
+   * This method will split the avg column into sum and count and will 
return a sequence of tuple
+   * of unique name, alias
+   *
+   * @param alias
+   * @return
--- End diff --

done


---


[GitHub] carbondata pull request #1542: [CARBONDATA-1757] [PreAgg] Fix for wrong avg ...

2017-11-29 Thread kunal642
Github user kunal642 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1542#discussion_r153995458
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession: 
SparkSession) extends Rule[Logi
* @return
*/
   private def transformAggregatePlan(logicalPlan: LogicalPlan): 
LogicalPlan = {
+val validExpressionsMap = 
scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
 logicalPlan transform {
   case aggregate@Aggregate(_, aExp, _) =>
-val newExpressions = aExp.flatMap {
-  case alias@Alias(attrExpression: AggregateExpression, _) =>
-attrExpression.aggregateFunction match {
-  case Average(attr: AttributeReference) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(attr),
-resultId = NamedExpression.newExprId), attr.name + 
"_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(attr),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case Average(cast@MatchCast(attr: AttributeReference, _)) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(cast),
-resultId = NamedExpression.newExprId),
-  attr.name + "_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(cast),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case _ => Seq(alias)
-}
-  case namedExpr: NamedExpression => Seq(namedExpr)
+aExp.foreach {
+  case alias: Alias =>
+validExpressionsMap ++= 
validateAggregateFunctionAndGetAlias(alias)
+  case namedExpr: NamedExpression => 
validExpressionsMap.put(namedExpr.name, namedExpr)
 }
-aggregate.copy(aggregateExpressions = 
newExpressions.asInstanceOf[Seq[NamedExpression]])
+aggregate
+  .copy(aggregateExpressions = validExpressionsMap.values.toSeq)
--- End diff --

ok



---


[GitHub] carbondata pull request #1542: [CARBONDATA-1757] [PreAgg] Fix for wrong avg ...

2017-11-29 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1542#discussion_r153995138
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession: 
SparkSession) extends Rule[Logi
* @return
*/
   private def transformAggregatePlan(logicalPlan: LogicalPlan): 
LogicalPlan = {
+val validExpressionsMap = 
scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
 logicalPlan transform {
   case aggregate@Aggregate(_, aExp, _) =>
-val newExpressions = aExp.flatMap {
-  case alias@Alias(attrExpression: AggregateExpression, _) =>
-attrExpression.aggregateFunction match {
-  case Average(attr: AttributeReference) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(attr),
-resultId = NamedExpression.newExprId), attr.name + 
"_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(attr),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case Average(cast@MatchCast(attr: AttributeReference, _)) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(cast),
-resultId = NamedExpression.newExprId),
-  attr.name + "_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(cast),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case _ => Seq(alias)
-}
-  case namedExpr: NamedExpression => Seq(namedExpr)
+aExp.foreach {
+  case alias: Alias =>
+validExpressionsMap ++= 
validateAggregateFunctionAndGetAlias(alias)
+  case namedExpr: NamedExpression => 
validExpressionsMap.put(namedExpr.name, namedExpr)
 }
-aggregate.copy(aggregateExpressions = 
newExpressions.asInstanceOf[Seq[NamedExpression]])
+aggregate
+  .copy(aggregateExpressions = validExpressionsMap.values.toSeq)
   case plan: LogicalPlan => plan
 }
   }
+
+  /**
+   * This method will split the avg column into sum and count and will 
return a sequence of tuple
+   * of unique name, alias
+   *
+   * @param alias
+   * @return
--- End diff --

remove line 844 and line 845


---


[GitHub] carbondata pull request #1542: [CARBONDATA-1757] [PreAgg] Fix for wrong avg ...

2017-11-29 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1542#discussion_r153995074
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession: 
SparkSession) extends Rule[Logi
* @return
*/
   private def transformAggregatePlan(logicalPlan: LogicalPlan): 
LogicalPlan = {
+val validExpressionsMap = 
scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
 logicalPlan transform {
   case aggregate@Aggregate(_, aExp, _) =>
-val newExpressions = aExp.flatMap {
-  case alias@Alias(attrExpression: AggregateExpression, _) =>
-attrExpression.aggregateFunction match {
-  case Average(attr: AttributeReference) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(attr),
-resultId = NamedExpression.newExprId), attr.name + 
"_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(attr),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case Average(cast@MatchCast(attr: AttributeReference, _)) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(cast),
-resultId = NamedExpression.newExprId),
-  attr.name + "_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(cast),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case _ => Seq(alias)
-}
-  case namedExpr: NamedExpression => Seq(namedExpr)
+aExp.foreach {
+  case alias: Alias =>
+validExpressionsMap ++= 
validateAggregateFunctionAndGetAlias(alias)
+  case namedExpr: NamedExpression => 
validExpressionsMap.put(namedExpr.name, namedExpr)
 }
-aggregate.copy(aggregateExpressions = 
newExpressions.asInstanceOf[Seq[NamedExpression]])
+aggregate
+  .copy(aggregateExpressions = validExpressionsMap.values.toSeq)
--- End diff --

move this to previous line


---


[GitHub] carbondata pull request #1542: [CARBONDATA-1757] [PreAgg] Fix for wrong avg ...

2017-11-26 Thread kunal642
Github user kunal642 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1542#discussion_r153068856
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -797,33 +798,59 @@ object CarbonPreInsertionCasts extends 
Rule[LogicalPlan] {
* @return
*/
   private def transformAggregatePlan(logicalPlan: LogicalPlan): 
LogicalPlan = {
+val validExpressionsMap = 
scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
 logicalPlan transform {
   case aggregate@Aggregate(_, aExp, _) =>
-val newExpressions = aExp.flatMap {
-  case alias@Alias(attrExpression: AggregateExpression, _) =>
-attrExpression.aggregateFunction match {
-  case Average(attr: AttributeReference) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(attr),
-resultId = NamedExpression.newExprId), attr.name + 
"_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(attr),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case Average(cast@Cast(attr: AttributeReference, _)) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(cast),
-resultId = NamedExpression.newExprId),
-  attr.name + "_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(cast),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case _ => Seq(alias)
-}
-  case namedExpr: NamedExpression => Seq(namedExpr)
+aExp.foreach {
+  case alias: Alias =>
+validExpressionsMap ++= 
validateAggregateFunctionAndGetAlias(alias)
--- End diff --

Yes, validateAggregateFunctionAndGetAlias function returns a 
Seq((columnName_aggFunction, Alias)) which is added to a map to remove any 
duplicate Alias entries.


---


[GitHub] carbondata pull request #1542: [CARBONDATA-1757] [PreAgg] Fix for wrong avg ...

2017-11-26 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1542#discussion_r153068582
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
@@ -797,33 +798,59 @@ object CarbonPreInsertionCasts extends 
Rule[LogicalPlan] {
* @return
*/
   private def transformAggregatePlan(logicalPlan: LogicalPlan): 
LogicalPlan = {
+val validExpressionsMap = 
scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
 logicalPlan transform {
   case aggregate@Aggregate(_, aExp, _) =>
-val newExpressions = aExp.flatMap {
-  case alias@Alias(attrExpression: AggregateExpression, _) =>
-attrExpression.aggregateFunction match {
-  case Average(attr: AttributeReference) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(attr),
-resultId = NamedExpression.newExprId), attr.name + 
"_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(attr),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case Average(cast@Cast(attr: AttributeReference, _)) =>
-Seq(Alias(attrExpression
-  .copy(aggregateFunction = Sum(cast),
-resultId = NamedExpression.newExprId),
-  attr.name + "_sum")(),
-  Alias(attrExpression
-.copy(aggregateFunction = Count(cast),
-  resultId = NamedExpression.newExprId), attr.name + 
"_count")())
-  case _ => Seq(alias)
-}
-  case namedExpr: NamedExpression => Seq(namedExpr)
+aExp.foreach {
+  case alias: Alias =>
+validExpressionsMap ++= 
validateAggregateFunctionAndGetAlias(alias)
--- End diff --

Is the duplicate columns will be removed from aggexpressions? For example 
agg table is created with sum(col1) and avg(col1) then aggregation table should 
be created with sum(col1) and count(col1) only. sum(col1) should not be 
duplicated. Is this handled here?


---


[GitHub] carbondata pull request #1542: [CARBONDATA-1757] [PreAgg] Fix for wrong avg ...

2017-11-21 Thread kunal642
Github user kunal642 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1542#discussion_r152474076
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 ---
@@ -489,4 +489,44 @@ object PreAggregateUtil {
 }
 updatedPlan
   }
+
--- End diff --

moved the method to PreAggregationRules


---


[GitHub] carbondata pull request #1542: [CARBONDATA-1757] [PreAgg] Fix for wrong avg ...

2017-11-21 Thread kumarvishal09
Github user kumarvishal09 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1542#discussion_r152357935
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 ---
@@ -489,4 +489,44 @@ object PreAggregateUtil {
 }
 updatedPlan
   }
+
--- End diff --

This code is specific for inserting the data to aggregate table better to 
move this code in rules class 


---