[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/12836


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-15 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r67266001
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -1266,6 +1266,83 @@ setMethod("dapplyCollect",
 ldf
   })
 
+#' gapply
+#'
+#' Group the SparkDataFrame using the specified columns and apply the R 
function to each
+#' group.
+#'
+#' @param x A SparkDataFrame
+#' @param cols Grouping columns
+#' @param func A function to be applied to each group partition specified 
by grouping
+#' column of the SparkDataFrame. The function `func` takes as 
argument
+#' a key - grouping columns and a data frame - a local R 
data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the 
function is applied.
--- End diff --

Yeah thats fine. Also in the example below where we construct `schema` you 
can add a comment line which looks like `Here our output contains 2 columns, 
the key which is a integer and the mean which is a double`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-15 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r67265581
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -1266,6 +1266,83 @@ setMethod("dapplyCollect",
 ldf
   })
 
+#' gapply
+#'
+#' Group the SparkDataFrame using the specified columns and apply the R 
function to each
+#' group.
+#'
+#' @param x A SparkDataFrame
+#' @param cols Grouping columns
+#' @param func A function to be applied to each group partition specified 
by grouping
+#' column of the SparkDataFrame. The function `func` takes as 
argument
+#' a key - grouping columns and a data frame - a local R 
data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the 
function is applied.
--- End diff --

I could have in the documentation smth like: "The schema has to correspond 
to output SparkDataFrame. It has to be defined for each output column with 
preferred output column name and corresponding data type."
How does this sound ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-15 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r67264756
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -1266,6 +1266,83 @@ setMethod("dapplyCollect",
 ldf
   })
 
+#' gapply
+#'
+#' Group the SparkDataFrame using the specified columns and apply the R 
function to each
+#' group.
+#'
+#' @param x A SparkDataFrame
+#' @param cols Grouping columns
+#' @param func A function to be applied to each group partition specified 
by grouping
+#' column of the SparkDataFrame. The function `func` takes as 
argument
+#' a key - grouping columns and a data frame - a local R 
data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the 
function is applied.
+#'   It must match the output of func.
+#' @family SparkDataFrame functions
+#' @rdname gapply
+#' @name gapply
+#' @export
+#' @examples
+#' 
+#' \dontrun{
+#' Computes the arithmetic mean of the second column by grouping
+#' on the first and third columns. Output the grouping values and the 
average.
+#'
+#' df <- createDataFrame (
+#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 
0.3)),
+#'   c("a", "b", "c", "d"))
+#'
+#' schema <-  structType(structField("a", "integer"), structField("c", 
"string"),
+#'   structField("avg", "double"))
+#' df1 <- gapply(
+#'   df,
+#'   list("a", "c"),
+#'   function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#'   },
+#' schema)
+#' collect(df1)
+#'
+#' Result
+#' --
+#' a c avg
+#' 3 3 3.0
+#' 1 1 1.5
+#'
+#' Fits linear models on iris dataset by grouping on the 'Species' column 
and
+#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
+#' and 'Petal_Width' as training features.
+#' 
+#' df <- createDataFrame (iris)
+#' schema <- structType(structField("(Intercept)", "double"),
--- End diff --

The names do not have to match, we can give any name we want. Instead of 
"(Intercept)" I could have "(MyIntercept)". The datatype is important.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-15 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r67264555
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -1266,6 +1266,83 @@ setMethod("dapplyCollect",
 ldf
   })
 
+#' gapply
+#'
+#' Group the SparkDataFrame using the specified columns and apply the R 
function to each
+#' group.
+#'
+#' @param x A SparkDataFrame
+#' @param cols Grouping columns
+#' @param func A function to be applied to each group partition specified 
by grouping
+#' column of the SparkDataFrame. The function `func` takes as 
argument
+#' a key - grouping columns and a data frame - a local R 
data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the 
function is applied.
--- End diff --

The output schema is purely based on the output dataframe, if key is 
included in the output then we need to include the key to the schema.
Basically, the schema has to match to what we want to output. If we want to 
output only the average in the above example, we could have: 
 schema <-  structType(structField("avg", "double")),
what really matters is the data-type - it has to be double in above 
example, it cannot be string or character  The name doesn't matter either. 
I could have "hello", instead "avg'.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-15 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r67261006
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -1266,6 +1266,83 @@ setMethod("dapplyCollect",
 ldf
   })
 
+#' gapply
+#'
+#' Group the SparkDataFrame using the specified columns and apply the R 
function to each
+#' group.
+#'
+#' @param x A SparkDataFrame
+#' @param cols Grouping columns
+#' @param func A function to be applied to each group partition specified 
by grouping
+#' column of the SparkDataFrame. The function `func` takes as 
argument
+#' a key - grouping columns and a data frame - a local R 
data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the 
function is applied.
+#'   It must match the output of func.
+#' @family SparkDataFrame functions
+#' @rdname gapply
+#' @name gapply
+#' @export
+#' @examples
+#' 
+#' \dontrun{
+#' Computes the arithmetic mean of the second column by grouping
+#' on the first and third columns. Output the grouping values and the 
average.
+#'
+#' df <- createDataFrame (
+#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 
0.3)),
+#'   c("a", "b", "c", "d"))
+#'
+#' schema <-  structType(structField("a", "integer"), structField("c", 
"string"),
+#'   structField("avg", "double"))
+#' df1 <- gapply(
+#'   df,
+#'   list("a", "c"),
+#'   function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#'   },
+#' schema)
+#' collect(df1)
+#'
+#' Result
+#' --
+#' a c avg
+#' 3 3 3.0
+#' 1 1 1.5
+#'
+#' Fits linear models on iris dataset by grouping on the 'Species' column 
and
+#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
+#' and 'Petal_Width' as training features.
+#' 
+#' df <- createDataFrame (iris)
+#' schema <- structType(structField("(Intercept)", "double"),
--- End diff --

Similar to above, do the column names also have to match ? i.e. is 
`(Intercept)` important here or would `Intercept` work as well ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-15 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r67260862
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -1266,6 +1266,83 @@ setMethod("dapplyCollect",
 ldf
   })
 
+#' gapply
+#'
+#' Group the SparkDataFrame using the specified columns and apply the R 
function to each
+#' group.
+#'
+#' @param x A SparkDataFrame
+#' @param cols Grouping columns
+#' @param func A function to be applied to each group partition specified 
by grouping
+#' column of the SparkDataFrame. The function `func` takes as 
argument
+#' a key - grouping columns and a data frame - a local R 
data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the 
function is applied.
--- End diff --

minor comment: It will be good to clarify how this schema can be 
constructed. i.e. something like `The output schema is usually the the schema 
for the key along with the schema of the output R data frame.` We can also 
highlight this in the programming guide


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-15 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r67197168
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -79,75 +127,72 @@ if (numBroadcastVars > 0) {
 
 # Timing broadcast
 broadcastElap <- elapsedSecs()
+# Initial input timing
+inputElap <- broadcastElap
 
 # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the 
int
 # as number of partitions to create.
 numPartitions <- SparkR:::readInt(inputCon)
 
-isDataFrame <- as.logical(SparkR:::readInt(inputCon))
+# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode
+mode <- SparkR:::readInt(inputCon)
 
-# If isDataFrame, then read column names
-if (isDataFrame) {
+if (mode > 0) {
   colNames <- SparkR:::readObject(inputCon)
 }
 
 isEmpty <- SparkR:::readInt(inputCon)
+computeInputElapsDiff <- 0
+outputComputeElapsDiff <- 0
 
 if (isEmpty != 0) {
-
   if (numPartitions == -1) {
 if (deserializer == "byte") {
   # Now read as many characters as described in funcLen
   data <- SparkR:::readDeserialize(inputCon)
 } else if (deserializer == "string") {
   data <- as.list(readLines(inputCon))
-} else if (deserializer == "row") {
+} else if (deserializer == "row" && mode == 2) {
+  dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon)
+  keys <- dataWithKeys$keys
+  data <- dataWithKeys$data
+} else if (deserializer == "row"){
   data <- SparkR:::readMultipleObjects(inputCon)
 }
-# Timing reading input data for execution
-inputElap <- elapsedSecs()
 
-if (isDataFrame) {
-  if (deserializer == "row") {
-# Transform the list of rows into a data.frame
-# Note that the optional argument stringsAsFactors for rbind is
-# available since R 3.2.4. So we set the global option here.
-oldOpt <- getOption("stringsAsFactors")
-options(stringsAsFactors = FALSE)
-data <- do.call(rbind.data.frame, data)
-options(stringsAsFactors = oldOpt)
-
-names(data) <- colNames
-  } else {
-# Check to see if data is a valid data.frame
-stopifnot(deserializer == "byte")
-stopifnot(class(data) == "data.frame")
-  }
-  output <- computeFunc(data)
-  if (serializer == "row") {
-# Transform the result data.frame back to a list of rows
-output <- split(output, seq(nrow(output)))
-  } else {
-# Serialize the ouput to a byte array
-stopifnot(serializer == "byte")
+inputElap <- elapsedSecs()
+if (mode > 0) {
+  if (mode == 1) {
+# Timing reading input data for execution
+output <- compute(mode, partition, serializer, deserializer, NULL,
+colNames, computeFunc, outputCon, data)
+   } else {
+# gapply mode
+for (i in 1:length(data)) {
+  # Timing reading input data for execution
+  inputElap <- elapsedSecs()
+  output <- compute(mode, partition, serializer, deserializer, 
keys[[i]],
+  colNames, computeFunc, outputCon, data[[i]])
+  computeElap <- elapsedSecs()
+  outputResult(serializer, output, outputCon)
+  outputElap <- elapsedSecs()
+  computeInputElapsDiff <-  computeInputElapsDiff + (computeElap - 
inputElap)
+  outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - 
computeElap)
+}
   }
 } else {
-  output <- computeFunc(partition, data)
+  # Timing reading input data for execution
--- End diff --

Removed the comment - moved to line: 163

https://github.com/NarineK/spark/blob/4d1cc6b0fd3dd2839a6a879f43118c5828916733/R/pkg/inst/worker/worker.R#L163


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-15 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r67161527
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -79,75 +127,72 @@ if (numBroadcastVars > 0) {
 
 # Timing broadcast
 broadcastElap <- elapsedSecs()
+# Initial input timing
+inputElap <- broadcastElap
 
 # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the 
int
 # as number of partitions to create.
 numPartitions <- SparkR:::readInt(inputCon)
 
-isDataFrame <- as.logical(SparkR:::readInt(inputCon))
+# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode
+mode <- SparkR:::readInt(inputCon)
 
-# If isDataFrame, then read column names
-if (isDataFrame) {
+if (mode > 0) {
   colNames <- SparkR:::readObject(inputCon)
 }
 
 isEmpty <- SparkR:::readInt(inputCon)
+computeInputElapsDiff <- 0
+outputComputeElapsDiff <- 0
 
 if (isEmpty != 0) {
-
   if (numPartitions == -1) {
 if (deserializer == "byte") {
   # Now read as many characters as described in funcLen
   data <- SparkR:::readDeserialize(inputCon)
 } else if (deserializer == "string") {
   data <- as.list(readLines(inputCon))
-} else if (deserializer == "row") {
+} else if (deserializer == "row" && mode == 2) {
+  dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon)
+  keys <- dataWithKeys$keys
+  data <- dataWithKeys$data
+} else if (deserializer == "row"){
   data <- SparkR:::readMultipleObjects(inputCon)
 }
-# Timing reading input data for execution
-inputElap <- elapsedSecs()
 
-if (isDataFrame) {
-  if (deserializer == "row") {
-# Transform the list of rows into a data.frame
-# Note that the optional argument stringsAsFactors for rbind is
-# available since R 3.2.4. So we set the global option here.
-oldOpt <- getOption("stringsAsFactors")
-options(stringsAsFactors = FALSE)
-data <- do.call(rbind.data.frame, data)
-options(stringsAsFactors = oldOpt)
-
-names(data) <- colNames
-  } else {
-# Check to see if data is a valid data.frame
-stopifnot(deserializer == "byte")
-stopifnot(class(data) == "data.frame")
-  }
-  output <- computeFunc(data)
-  if (serializer == "row") {
-# Transform the result data.frame back to a list of rows
-output <- split(output, seq(nrow(output)))
-  } else {
-# Serialize the ouput to a byte array
-stopifnot(serializer == "byte")
+inputElap <- elapsedSecs()
+if (mode > 0) {
+  if (mode == 1) {
+# Timing reading input data for execution
+output <- compute(mode, partition, serializer, deserializer, NULL,
+colNames, computeFunc, outputCon, data)
+   } else {
+# gapply mode
+for (i in 1:length(data)) {
+  # Timing reading input data for execution
+  inputElap <- elapsedSecs()
+  output <- compute(mode, partition, serializer, deserializer, 
keys[[i]],
+  colNames, computeFunc, outputCon, data[[i]])
+  computeElap <- elapsedSecs()
+  outputResult(serializer, output, outputCon)
+  outputElap <- elapsedSecs()
+  computeInputElapsDiff <-  computeInputElapsDiff + (computeElap - 
inputElap)
+  outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - 
computeElap)
+}
   }
 } else {
-  output <- computeFunc(partition, data)
+  # Timing reading input data for execution
--- End diff --

this one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-13 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66745283
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---
@@ -325,6 +330,71 @@ case class MapGroupsExec(
 }
 
 /**
+ * Groups the input rows together and calls the R function with each group 
and an iterator
+ * containing all elements in the group.
+ * The result of this function is flattened before being output.
+ */
+case class FlatMapGroupsInRExec(
+func: Array[Byte],
+packageNames: Array[Byte],
+broadcastVars: Array[Broadcast[Object]],
+inputSchema: StructType,
+outputSchema: StructType,
+keyDeserializer: Expression,
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
+
+  override def output: Seq[Attribute] = outputObjAttr :: Nil
+  override def producedAttributes: AttributeSet = 
AttributeSet(outputObjAttr)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val isSerializedRData =
+  if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false
+val serializerForR = if (!isSerializedRData) {
+  SerializationFormats.ROW
+} else {
+  SerializationFormats.BYTE
+}
+
+child.execute().mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val getKey = ObjectOperator.deserializeRowToObject(keyDeserializer, 
groupingAttributes)
+  val getValue = 
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
+  val outputObject = 
ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)
+  val runner = new RRunner[Array[Byte]](
+func, SerializationFormats.ROW, serializerForR, packageNames, 
broadcastVars,
+isDataFrame = true, colNames = inputSchema.fieldNames, mode = 2)
+
+  val groupedRBytes = grouped.flatMap { case (key, rowIter) =>
--- End diff --

Hi @sun-rui, I did it similar to: 
https://github.com/NarineK/spark/blob/d51441f704e2abad7f7a3cc829664cd201b0fcd2/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala#L322

I we can also use map here. If there is no nested content flatmap will 
perform like map. In general for dataframe rows, can someone have a row of rows 
?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66733080
  
--- Diff: core/src/main/scala/org/apache/spark/api/r/RRunner.scala ---
@@ -40,7 +40,8 @@ private[spark] class RRunner[U](
 broadcastVars: Array[Broadcast[Object]],
 numPartitions: Int = -1,
 isDataFrame: Boolean = false,
-colNames: Array[String] = null)
+colNames: Array[String] = null,
+mode: Int = 0)
--- End diff --

it is better to define enumerations for mode instead of hard-coding. for 
example,
private[sql] object RRunnerModes = {
  val RDD = 0
  val DATAFRAME_DAPPLY = 1
  val DATAFRAME_GAPPLY = 2
}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66732763
  
--- Diff: R/pkg/R/group.R ---
@@ -142,3 +142,58 @@ createMethods <- function() {
 }
 
 createMethods()
+
+#' gapply
+#'
+#' Applies a R function to each group in the input GroupedData
+#'
+#' @param x a GroupedData
+#' @param func A function to be applied to each group partition specified 
by GroupedData.
+#' The function `func` takes as argument a key - grouping 
columns and
+#' a data frame - a local R data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the 
function is applied.
+#'   It must match the output of func.
+#' @return a SparkDataFrame
+#' @rdname gapply
+#' @name gapply
+#' @examples
+#' \dontrun{
+#' Computes the arithmetic mean of the second column by grouping
+#' on the first and third columns. Output the grouping values and the 
average.
+#'
+#' df <- createDataFrame (
+#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 
0.3)),
+#'   c("a", "b", "c", "d"))
+#'
+#' schema <-  structType(structField("a", "integer"), structField("c", 
"string"),
+#'   structField("avg", "double"))
+#' df1 <- gapply(
+#'   df,
+#'   list("a", "c"),
+#'   function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#'   },
+#' schema)
+#' collect(df1)
+#'
+#' Result
+#' --
+#' a c avg
+#' 3 3 3.0
+#' 1 1 1.5
+#' }
+setMethod("gapply",
+  signature(x = "GroupedData"),
+  function(x, func, schema) {
+packageNamesArr <- serialize(.sparkREnv[[".packages"]],
+ connection = NULL)
+broadcastArr <- lapply(ls(.broadcastNames),
+  function(name) { get(name, .broadcastNames) 
})
+sdf <- callJMethod(x@sgd, "flatMapGroupsInR",
+ serialize(cleanClosure(func), connection = NULL),
+ packageNamesArr,
+ broadcastArr,
+ if (is.null(schema)) { schema } else { schema$jobj })
--- End diff --

Thnx, I set an assertion. we cannot do it exactly like dapply by forcing 
with schema because gapply for GroupedData is slightly different from 
DataFrame's gapply.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66721674
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---
@@ -325,6 +330,71 @@ case class MapGroupsExec(
 }
 
 /**
+ * Groups the input rows together and calls the R function with each group 
and an iterator
+ * containing all elements in the group.
+ * The result of this function is flattened before being output.
+ */
+case class FlatMapGroupsInRExec(
+func: Array[Byte],
+packageNames: Array[Byte],
+broadcastVars: Array[Broadcast[Object]],
+inputSchema: StructType,
+outputSchema: StructType,
+keyDeserializer: Expression,
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
+
+  override def output: Seq[Attribute] = outputObjAttr :: Nil
+  override def producedAttributes: AttributeSet = 
AttributeSet(outputObjAttr)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val isSerializedRData =
+  if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false
+val serializerForR = if (!isSerializedRData) {
+  SerializationFormats.ROW
+} else {
+  SerializationFormats.BYTE
+}
+
+child.execute().mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val getKey = ObjectOperator.deserializeRowToObject(keyDeserializer, 
groupingAttributes)
+  val getValue = 
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
+  val outputObject = 
ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)
+  val runner = new RRunner[Array[Byte]](
+func, SerializationFormats.ROW, serializerForR, packageNames, 
broadcastVars,
+isDataFrame = true, colNames = inputSchema.fieldNames, mode = 2)
+
+  val groupedRBytes = grouped.flatMap { case (key, rowIter) =>
--- End diff --

why flatMap here? map() should be OK. is there something I miss?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66721643
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -79,75 +127,72 @@ if (numBroadcastVars > 0) {
 
 # Timing broadcast
 broadcastElap <- elapsedSecs()
+# Initial input timing
+inputElap <- broadcastElap
 
 # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the 
int
 # as number of partitions to create.
 numPartitions <- SparkR:::readInt(inputCon)
 
-isDataFrame <- as.logical(SparkR:::readInt(inputCon))
+# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode
+mode <- SparkR:::readInt(inputCon)
 
-# If isDataFrame, then read column names
-if (isDataFrame) {
+if (mode > 0) {
   colNames <- SparkR:::readObject(inputCon)
 }
 
 isEmpty <- SparkR:::readInt(inputCon)
+computeInputElapsDiff <- 0
+outputComputeElapsDiff <- 0
 
 if (isEmpty != 0) {
-
   if (numPartitions == -1) {
 if (deserializer == "byte") {
   # Now read as many characters as described in funcLen
   data <- SparkR:::readDeserialize(inputCon)
 } else if (deserializer == "string") {
   data <- as.list(readLines(inputCon))
-} else if (deserializer == "row") {
+} else if (deserializer == "row" && mode == 2) {
+  dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon)
+  keys <- dataWithKeys$keys
+  data <- dataWithKeys$data
+} else if (deserializer == "row"){
   data <- SparkR:::readMultipleObjects(inputCon)
 }
-# Timing reading input data for execution
-inputElap <- elapsedSecs()
 
-if (isDataFrame) {
-  if (deserializer == "row") {
-# Transform the list of rows into a data.frame
-# Note that the optional argument stringsAsFactors for rbind is
-# available since R 3.2.4. So we set the global option here.
-oldOpt <- getOption("stringsAsFactors")
-options(stringsAsFactors = FALSE)
-data <- do.call(rbind.data.frame, data)
-options(stringsAsFactors = oldOpt)
-
-names(data) <- colNames
-  } else {
-# Check to see if data is a valid data.frame
-stopifnot(deserializer == "byte")
-stopifnot(class(data) == "data.frame")
-  }
-  output <- computeFunc(data)
-  if (serializer == "row") {
-# Transform the result data.frame back to a list of rows
-output <- split(output, seq(nrow(output)))
-  } else {
-# Serialize the ouput to a byte array
-stopifnot(serializer == "byte")
+inputElap <- elapsedSecs()
+if (mode > 0) {
+  if (mode == 1) {
+# Timing reading input data for execution
+output <- compute(mode, partition, serializer, deserializer, NULL,
+colNames, computeFunc, outputCon, data)
+   } else {
+# gapply mode
+for (i in 1:length(data)) {
+  # Timing reading input data for execution
+  inputElap <- elapsedSecs()
+  output <- compute(mode, partition, serializer, deserializer, 
keys[[i]],
+  colNames, computeFunc, outputCon, data[[i]])
+  computeElap <- elapsedSecs()
+  outputResult(serializer, output, outputCon)
+  outputElap <- elapsedSecs()
+  computeInputElapsDiff <-  computeInputElapsDiff + (computeElap - 
inputElap)
+  outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - 
computeElap)
+}
   }
 } else {
-  output <- computeFunc(partition, data)
+  # Timing reading input data for execution
--- End diff --

this line of comment should be located before line 163?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66721611
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -79,75 +127,72 @@ if (numBroadcastVars > 0) {
 
 # Timing broadcast
 broadcastElap <- elapsedSecs()
+# Initial input timing
+inputElap <- broadcastElap
 
 # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the 
int
 # as number of partitions to create.
 numPartitions <- SparkR:::readInt(inputCon)
 
-isDataFrame <- as.logical(SparkR:::readInt(inputCon))
+# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode
+mode <- SparkR:::readInt(inputCon)
 
-# If isDataFrame, then read column names
-if (isDataFrame) {
+if (mode > 0) {
   colNames <- SparkR:::readObject(inputCon)
 }
 
 isEmpty <- SparkR:::readInt(inputCon)
+computeInputElapsDiff <- 0
+outputComputeElapsDiff <- 0
 
 if (isEmpty != 0) {
-
   if (numPartitions == -1) {
 if (deserializer == "byte") {
   # Now read as many characters as described in funcLen
   data <- SparkR:::readDeserialize(inputCon)
 } else if (deserializer == "string") {
   data <- as.list(readLines(inputCon))
-} else if (deserializer == "row") {
+} else if (deserializer == "row" && mode == 2) {
+  dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon)
+  keys <- dataWithKeys$keys
+  data <- dataWithKeys$data
+} else if (deserializer == "row"){
--- End diff --

style: (deserializer == "row") {


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66721551
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -27,6 +27,54 @@ elapsedSecs <- function() {
   proc.time()[3]
 }
 
+compute <- function(mode, partition, serializer, deserializer, key,
--- End diff --

function(computeFunc, mode, partition, key, inputData, serializer, 
deserializer, colNames) 
remove unused argument and re-order arguments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66721485
  
--- Diff: R/pkg/R/group.R ---
@@ -142,3 +142,58 @@ createMethods <- function() {
 }
 
 createMethods()
+
+#' gapply
+#'
+#' Applies a R function to each group in the input GroupedData
+#'
+#' @param x a GroupedData
+#' @param func A function to be applied to each group partition specified 
by GroupedData.
+#' The function `func` takes as argument a key - grouping 
columns and
+#' a data frame - a local R data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the 
function is applied.
+#'   It must match the output of func.
+#' @return a SparkDataFrame
+#' @rdname gapply
+#' @name gapply
+#' @examples
+#' \dontrun{
+#' Computes the arithmetic mean of the second column by grouping
+#' on the first and third columns. Output the grouping values and the 
average.
+#'
+#' df <- createDataFrame (
+#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 
0.3)),
+#'   c("a", "b", "c", "d"))
+#'
+#' schema <-  structType(structField("a", "integer"), structField("c", 
"string"),
+#'   structField("avg", "double"))
+#' df1 <- gapply(
+#'   df,
+#'   list("a", "c"),
+#'   function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#'   },
+#' schema)
+#' collect(df1)
+#'
+#' Result
+#' --
+#' a c avg
+#' 3 3 3.0
+#' 1 1 1.5
+#' }
+setMethod("gapply",
+  signature(x = "GroupedData"),
+  function(x, func, schema) {
+packageNamesArr <- serialize(.sparkREnv[[".packages"]],
+ connection = NULL)
+broadcastArr <- lapply(ls(.broadcastNames),
+  function(name) { get(name, .broadcastNames) 
})
+sdf <- callJMethod(x@sgd, "flatMapGroupsInR",
+ serialize(cleanClosure(func), connection = NULL),
+ packageNamesArr,
+ broadcastArr,
+ if (is.null(schema)) { schema } else { schema$jobj })
--- End diff --

same as dapply, we don't support NULL schema for now. fails when schema is 
NULL


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66721436
  
--- Diff: R/pkg/R/group.R ---
@@ -142,3 +142,58 @@ createMethods <- function() {
 }
 
 createMethods()
+
+#' gapply
+#'
+#' Applies a R function to each group in the input GroupedData
+#'
+#' @param x a GroupedData
+#' @param func A function to be applied to each group partition specified 
by GroupedData.
+#' The function `func` takes as argument a key - grouping 
columns and
+#' a data frame - a local R data.frame.
+#' The output of `func` is a local R data.frame.
+#' @param schema The schema of the resulting SparkDataFrame after the 
function is applied.
+#'   It must match the output of func.
+#' @return a SparkDataFrame
+#' @rdname gapply
+#' @name gapply
+#' @examples
+#' \dontrun{
+#' Computes the arithmetic mean of the second column by grouping
+#' on the first and third columns. Output the grouping values and the 
average.
+#'
+#' df <- createDataFrame (
+#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 
0.3)),
+#'   c("a", "b", "c", "d"))
+#'
+#' schema <-  structType(structField("a", "integer"), structField("c", 
"string"),
+#'   structField("avg", "double"))
+#' df1 <- gapply(
+#'   df,
+#'   list("a", "c"),
+#'   function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#'   },
+#' schema)
+#' collect(df1)
+#'
+#' Result
+#' --
+#' a c avg
+#' 3 3 3.0
+#' 1 1 1.5
+#' }
+setMethod("gapply",
+  signature(x = "GroupedData"),
+  function(x, func, schema) {
+packageNamesArr <- serialize(.sparkREnv[[".packages"]],
+ connection = NULL)
+broadcastArr <- lapply(ls(.broadcastNames),
+  function(name) { get(name, .broadcastNames) 
})
+sdf <- callJMethod(x@sgd, "flatMapGroupsInR",
--- End diff --

As discussed above, call a helper function in SQLUtils instead of calling 
the function of GroupedData


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66721354
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql](
   def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset = {
 pivot(pivotColumn, values.asScala)
   }
+
+  /**
+   * Applies the given serialized R function `func` to each group of data. 
For each unique group,
+   * the function will be passed the group key and an iterator that 
contains all of the elements in
+   * the group. The function can return an iterator containing elements of 
an arbitrary type which
+   * will be returned as a new [[DataFrame]].
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]]. If an application intends to perform an 
aggregation over each
+   * key, it is best to use the reduce function or an
+   * [[org.apache.spark.sql.expressions#Aggregator Aggregator]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @since 2.0.0
+   */
+  private[sql] def flatMapGroupsInR(
+  f: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Object],
+  outputSchema: StructType): DataFrame = {
+  val broadcastVarObj = 
broadcastVars.map(_.asInstanceOf[Broadcast[Object]])
+  val groupingNamedExpressions = groupingExprs.map(alias)
+  val groupingCols = groupingNamedExpressions.map(Column(_))
+  val groupingDataFrame = df.select(groupingCols : _*)
+  val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
+  val realOutputSchema = if (outputSchema == null) 
SERIALIZED_R_DATA_SCHEMA else outputSchema
--- End diff --

yeah, I think we  need gapplyCollect() 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66721347
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql](
   def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset = {
 pivot(pivotColumn, values.asScala)
   }
+
+  /**
+   * Applies the given serialized R function `func` to each group of data. 
For each unique group,
+   * the function will be passed the group key and an iterator that 
contains all of the elements in
+   * the group. The function can return an iterator containing elements of 
an arbitrary type which
+   * will be returned as a new [[DataFrame]].
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]]. If an application intends to perform an 
aggregation over each
+   * key, it is best to use the reduce function or an
+   * [[org.apache.spark.sql.expressions#Aggregator Aggregator]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @since 2.0.0
+   */
+  private[sql] def flatMapGroupsInR(
+  f: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Object],
+  outputSchema: StructType): DataFrame = {
+  val broadcastVarObj = 
broadcastVars.map(_.asInstanceOf[Broadcast[Object]])
+  val groupingNamedExpressions = groupingExprs.map(alias)
+  val groupingCols = groupingNamedExpressions.map(Column(_))
+  val groupingDataFrame = df.select(groupingCols : _*)
+  val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
+  val realOutputSchema = if (outputSchema == null) 
SERIALIZED_R_DATA_SCHEMA else outputSchema
--- End diff --

@NarineK, you can add a helper function like 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L143,
 it will pass in the schema for serialized R data when user-specified schema is 
NULL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-12 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66717543
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql](
   def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset = {
 pivot(pivotColumn, values.asScala)
   }
+
+  /**
+   * Applies the given serialized R function `func` to each group of data. 
For each unique group,
+   * the function will be passed the group key and an iterator that 
contains all of the elements in
+   * the group. The function can return an iterator containing elements of 
an arbitrary type which
+   * will be returned as a new [[DataFrame]].
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]]. If an application intends to perform an 
aggregation over each
+   * key, it is best to use the reduce function or an
+   * [[org.apache.spark.sql.expressions#Aggregator Aggregator]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @since 2.0.0
+   */
+  private[sql] def flatMapGroupsInR(
+  f: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Object],
+  outputSchema: StructType): DataFrame = {
+  val broadcastVarObj = 
broadcastVars.map(_.asInstanceOf[Broadcast[Object]])
+  val groupingNamedExpressions = groupingExprs.map(alias)
+  val groupingCols = groupingNamedExpressions.map(Column(_))
+  val groupingDataFrame = df.select(groupingCols : _*)
+  val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
+  val realOutputSchema = if (outputSchema == null) 
SERIALIZED_R_DATA_SCHEMA else outputSchema
--- End diff --

I see, the first option with assert is easy to do.

Didn't we want to add gapplyCollect too ? :)

https://docs.google.com/presentation/d/1oj17N5JaE8JDjT2as_DUI6LKutLcEHNZB29HsRGL_dM/edit#slide=id.p12



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-11 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66713462
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql](
   def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset = {
 pivot(pivotColumn, values.asScala)
   }
+
+  /**
+   * Applies the given serialized R function `func` to each group of data. 
For each unique group,
+   * the function will be passed the group key and an iterator that 
contains all of the elements in
+   * the group. The function can return an iterator containing elements of 
an arbitrary type which
+   * will be returned as a new [[DataFrame]].
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]]. If an application intends to perform an 
aggregation over each
+   * key, it is best to use the reduce function or an
+   * [[org.apache.spark.sql.expressions#Aggregator Aggregator]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @since 2.0.0
+   */
+  private[sql] def flatMapGroupsInR(
+  f: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Object],
+  outputSchema: StructType): DataFrame = {
+  val broadcastVarObj = 
broadcastVars.map(_.asInstanceOf[Broadcast[Object]])
+  val groupingNamedExpressions = groupingExprs.map(alias)
+  val groupingCols = groupingNamedExpressions.map(Column(_))
+  val groupingDataFrame = df.select(groupingCols : _*)
+  val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
+  val realOutputSchema = if (outputSchema == null) 
SERIALIZED_R_DATA_SCHEMA else outputSchema
--- End diff --

If schema should not be null can we assert it to be not null on the R side 
and just pass in a non-null value always ? I think for `dapply` we wanted to 
support `collect` on the result of the UDF which could work even without a 
schema. 

The other nice way to handle this would be to do construct the binary 
schema that we fall back on from the R side and pass that in (i.e. keeping all 
input validation in R and just logic in scala)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-11 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66712035
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql](
   def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset = {
 pivot(pivotColumn, values.asScala)
   }
+
+  /**
+   * Applies the given serialized R function `func` to each group of data. 
For each unique group,
+   * the function will be passed the group key and an iterator that 
contains all of the elements in
+   * the group. The function can return an iterator containing elements of 
an arbitrary type which
+   * will be returned as a new [[DataFrame]].
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]]. If an application intends to perform an 
aggregation over each
+   * key, it is best to use the reduce function or an
+   * [[org.apache.spark.sql.expressions#Aggregator Aggregator]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @since 2.0.0
+   */
+  private[sql] def flatMapGroupsInR(
+  f: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Object],
+  outputSchema: StructType): DataFrame = {
+  val broadcastVarObj = 
broadcastVars.map(_.asInstanceOf[Broadcast[Object]])
+  val groupingNamedExpressions = groupingExprs.map(alias)
+  val groupingCols = groupingNamedExpressions.map(Column(_))
+  val groupingDataFrame = df.select(groupingCols : _*)
+  val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
+  val realOutputSchema = if (outputSchema == null) 
SERIALIZED_R_DATA_SCHEMA else outputSchema
--- End diff --

@liancheng , thank you for the review comments.

Those are good suggestions, however for:

case 1:  using Option[StructType] ...  -  I gave a try but since this 
method is being called from R side we need to somehow instantiate 
"scala.Option" class and this doesn't seem to be primitive to do in R. 
From R side we will basically call the following method:
`org.apache.spark.sql.Dataset flatMapGroupsInR 
(byte[] f, byte[] packageNames, java.lang.Object[] broadcastVars, 
scala.Option outputSchema)`

Case 2: Similar to dapply, gapply forces schema by signature, the default 
value doesn't really work here.

But I can make the changes if it is preferred.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66697390
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---
@@ -325,6 +330,71 @@ case class MapGroupsExec(
 }
 
 /**
+ * Groups the input rows together and calls the R function with each group 
and an iterator
+ * containing all elements in the group.
+ * The result of this function is flattened before being output.
+ */
+case class FlatMapGroupsInRExec(
+func: Array[Byte],
+packageNames: Array[Byte],
+broadcastVars: Array[Broadcast[Object]],
+inputSchema: StructType,
+outputSchema: StructType,
+keyDeserializer: Expression,
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
+
+  override def output: Seq[Attribute] = outputObjAttr :: Nil
+  override def producedAttributes: AttributeSet = 
AttributeSet(outputObjAttr)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val isSerializedRData =
+  if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false
+val serializerForR = if (!isSerializedRData) {
+  SerializationFormats.ROW
+} else {
+  SerializationFormats.BYTE
+}
+
+child.execute().mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val getKey = ObjectOperator.deserializeRowToObject(keyDeserializer, 
groupingAttributes)
+  val getValue = 
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
+  val outputObject = 
ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)
+  val runner = new RRunner[Array[Byte]](
+func, SerializationFormats.ROW, serializerForR, packageNames, 
broadcastVars,
+isDataFrame = true, colNames = inputSchema.fieldNames, mode = 2)
+
+  val groupedRBytes = grouped.flatMap { case (key, rowIter) =>
+val deserializedIter = rowIter.map(getValue)
+val newIter =
+  deserializedIter.asInstanceOf[Iterator[Row]].map {row => 
rowToRBytes(row)}
--- End diff --

Nit: Space after `{` and before `}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66697263
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -243,6 +243,55 @@ case class MapGroups(
 outputObjAttr: Attribute,
 child: LogicalPlan) extends UnaryNode with ObjectProducer
 
+/** Factory for constructing new `FlatMapGroupsInR` nodes. */
+object FlatMapGroupsInR {
+  def apply(
+  func: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Broadcast[Object]],
+  schema: StructType,
+  encoder: Expression,
+  keyEncoder: Expression,
+  rowEncoder: ExpressionEncoder[Row],
--- End diff --

Actually how about just using the same names as constructor parameters of 
case class `FlatMapGroupsInR` below?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66691737
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -243,6 +243,55 @@ case class MapGroups(
 outputObjAttr: Attribute,
 child: LogicalPlan) extends UnaryNode with ObjectProducer
 
+/** Factory for constructing new `FlatMapGroupsInR` nodes. */
+object FlatMapGroupsInR {
+  def apply(
+  func: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Broadcast[Object]],
+  schema: StructType,
+  encoder: Expression,
+  keyEncoder: Expression,
+  rowEncoder: ExpressionEncoder[Row],
--- End diff --

Names of the above 3 parameters are quite confusing. The first two are 
actually deserializer expressions rather than encoders, so I guess 
`dataDeserializer` and `keyDeserializer` can be better names. The 3rd one is an 
encoder, but seems that what we actually care about is only its schema.

Basically in most cases you can simply treat encoder as a pair of 
serializer and deserializer expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66690948
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -243,6 +243,55 @@ case class MapGroups(
 outputObjAttr: Attribute,
 child: LogicalPlan) extends UnaryNode with ObjectProducer
 
+/** Factory for constructing new `FlatMapGroupsInR` nodes. */
+object FlatMapGroupsInR {
+  def apply(
+  func: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Broadcast[Object]],
+  schema: StructType,
+  encoder: Expression,
+  keyEncoder: Expression,
+  rowEncoder: ExpressionEncoder[Row],
+  groupingAttributes: Seq[Attribute],
+  dataAttributes: Seq[Attribute],
+  child: LogicalPlan): LogicalPlan = {
+ val mapped = FlatMapGroupsInR(
+   func,
+   packageNames,
+   broadcastVars,
+   rowEncoder.schema,
+   schema,
+   UnresolvedDeserializer(keyEncoder, groupingAttributes),
+   UnresolvedDeserializer(encoder, dataAttributes),
+   groupingAttributes,
+   dataAttributes,
+   CatalystSerde.generateObjAttr(RowEncoder(schema)),
+   child)
+ CatalystSerde.serialize(mapped)(RowEncoder(schema))
--- End diff --

Function body indentation is off (3 spaces).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66688912
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql](
   def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset = {
 pivot(pivotColumn, values.asScala)
   }
+
+  /**
+   * Applies the given serialized R function `func` to each group of data. 
For each unique group,
+   * the function will be passed the group key and an iterator that 
contains all of the elements in
+   * the group. The function can return an iterator containing elements of 
an arbitrary type which
+   * will be returned as a new [[DataFrame]].
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]]. If an application intends to perform an 
aggregation over each
+   * key, it is best to use the reduce function or an
+   * [[org.apache.spark.sql.expressions#Aggregator Aggregator]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @since 2.0.0
+   */
+  private[sql] def flatMapGroupsInR(
+  f: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Object],
+  outputSchema: StructType): DataFrame = {
+  val broadcastVarObj = 
broadcastVars.map(_.asInstanceOf[Broadcast[Object]])
+  val groupingNamedExpressions = groupingExprs.map(alias)
+  val groupingCols = groupingNamedExpressions.map(Column(_))
+  val groupingDataFrame = df.select(groupingCols : _*)
+  val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
+  val realOutputSchema = if (outputSchema == null) 
SERIALIZED_R_DATA_SCHEMA else outputSchema
--- End diff --

Or maybe in this case just add a default value for `outputSchema`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66688844
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql](
   def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset = {
 pivot(pivotColumn, values.asScala)
   }
+
+  /**
+   * Applies the given serialized R function `func` to each group of data. 
For each unique group,
+   * the function will be passed the group key and an iterator that 
contains all of the elements in
+   * the group. The function can return an iterator containing elements of 
an arbitrary type which
+   * will be returned as a new [[DataFrame]].
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]]. If an application intends to perform an 
aggregation over each
+   * key, it is best to use the reduce function or an
+   * [[org.apache.spark.sql.expressions#Aggregator Aggregator]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @since 2.0.0
+   */
+  private[sql] def flatMapGroupsInR(
+  f: Array[Byte],
+  packageNames: Array[Byte],
+  broadcastVars: Array[Object],
+  outputSchema: StructType): DataFrame = {
+  val broadcastVarObj = 
broadcastVars.map(_.asInstanceOf[Broadcast[Object]])
+  val groupingNamedExpressions = groupingExprs.map(alias)
+  val groupingCols = groupingNamedExpressions.map(Column(_))
+  val groupingDataFrame = df.select(groupingCols : _*)
+  val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
+  val realOutputSchema = if (outputSchema == null) 
SERIALIZED_R_DATA_SCHEMA else outputSchema
--- End diff --

Shall we use `Option[StructType]` for `outputSchema` since it's nullable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66673292
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -286,6 +290,9 @@ case class FlatMapGroupsInR(
 child: LogicalPlan) extends UnaryNode with ObjectProducer{
 
   override lazy val schema = outputSchema
+
+  override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, 
outputSchema,
+outputObjAttr, child)
--- End diff --

We can do a separate JIRA, PR . Would be better that way to track later on 
as to why we changed it etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66672823
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -286,6 +290,9 @@ case class FlatMapGroupsInR(
 child: LogicalPlan) extends UnaryNode with ObjectProducer{
 
   override lazy val schema = outputSchema
+
+  override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, 
outputSchema,
+outputObjAttr, child)
--- End diff --

I reverted it because I've noticed that I had a separate branch for that 
fix.

https://github.com/apache/spark/commit/939dbd5a17e63171ef2c18d5b23874daa75dbfcc

But if you think it is better to have it here, I can bring it back.
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66671272
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -286,6 +290,9 @@ case class FlatMapGroupsInR(
 child: LogicalPlan) extends UnaryNode with ObjectProducer{
 
   override lazy val schema = outputSchema
+
+  override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, 
outputSchema,
+outputObjAttr, child)
--- End diff --

Ah I see - Sorry. I think I saw the diff from one commit there. Also it 
looks like that commit reverted the override change to the MapPartitionsInR 
class ?  I think thats useful to have. Do you want to do  that in a separate PR 
?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r66670797
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -286,6 +290,9 @@ case class FlatMapGroupsInR(
 child: LogicalPlan) extends UnaryNode with ObjectProducer{
 
   override lazy val schema = outputSchema
+
+  override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, 
outputSchema,
+outputObjAttr, child)
--- End diff --

Thanks, @shivaram, I've added groupingAttributes and dataAttributes in my 
last commit: 
https://github.com/apache/spark/pull/12836/commits/0ca74fddc91a94d8b5c69c2e510afeee8531c0e2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-10 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r9908
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -286,6 +290,9 @@ case class FlatMapGroupsInR(
 child: LogicalPlan) extends UnaryNode with ObjectProducer{
 
   override lazy val schema = outputSchema
+
+  override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, 
outputSchema,
+outputObjAttr, child)
--- End diff --

Can we add `groupingAttributes` and `dataAttributes` as well here ? They 
shouldn't contain R serialized data from what I can see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-04 Thread NarineK
Github user NarineK commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65807864
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -27,6 +27,58 @@ elapsedSecs <- function() {
   proc.time()[3]
 }
 
+computeHelper <- function(mode, partition, serializer, deserializer, key,
--- End diff --

Hi @sun-rui , this is good, however time accumulation might lead to very 
large numbers and numerical type limitations.
I was thinking to accumulate the diffs such as:
inputComputeDiff += compute - input and computeOutputDiff += output - 
compute.

What do you think ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65563636
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -84,68 +136,51 @@ broadcastElap <- elapsedSecs()
 # as number of partitions to create.
 numPartitions <- SparkR:::readInt(inputCon)
 
-isDataFrame <- as.logical(SparkR:::readInt(inputCon))
+# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode
+mode <- SparkR:::readInt(inputCon)
 
-# If isDataFrame, then read column names
-if (isDataFrame) {
+if (mode > 0) {
   colNames <- SparkR:::readObject(inputCon)
+  if (mode == 2) {
--- End diff --

grouping column names are not necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65563441
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -27,6 +27,58 @@ elapsedSecs <- function() {
   proc.time()[3]
 }
 
+computeHelper <- function(mode, partition, serializer, deserializer, key,
+   colNames, computeFunc, outputCon, inputData) {
--- End diff --

coding style: identation aligment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65563298
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -1266,6 +1266,83 @@ setMethod("dapplyCollect",
 ldf
   })
 
+#' gapply
+#'
+#' Group the SparkDataFrame using the specified columns and apply the R 
function to each
+#' group. The group is defined by input grouping columns.
--- End diff --

the second sentence is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65563183
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -84,68 +136,51 @@ broadcastElap <- elapsedSecs()
 # as number of partitions to create.
 numPartitions <- SparkR:::readInt(inputCon)
 
-isDataFrame <- as.logical(SparkR:::readInt(inputCon))
+# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode
+mode <- SparkR:::readInt(inputCon)
 
-# If isDataFrame, then read column names
-if (isDataFrame) {
+if (mode > 0) {
   colNames <- SparkR:::readObject(inputCon)
+  if (mode == 2) {
+key <- SparkR:::readObject(inputCon)
+  }
 }
 
 isEmpty <- SparkR:::readInt(inputCon)
 
 if (isEmpty != 0) {
-
   if (numPartitions == -1) {
 if (deserializer == "byte") {
   # Now read as many characters as described in funcLen
   data <- SparkR:::readDeserialize(inputCon)
 } else if (deserializer == "string") {
   data <- as.list(readLines(inputCon))
-} else if (deserializer == "row") {
+} else if (deserializer == "row" && mode == 2) {
+  data <- SparkR:::readMultipleObjectsWithKeys(inputCon)
+} else if (deserializer == "row"){
   data <- SparkR:::readMultipleObjects(inputCon)
 }
+
 # Timing reading input data for execution
 inputElap <- elapsedSecs()
-
-if (isDataFrame) {
-  if (deserializer == "row") {
-# Transform the list of rows into a data.frame
-# Note that the optional argument stringsAsFactors for rbind is
-# available since R 3.2.4. So we set the global option here.
-oldOpt <- getOption("stringsAsFactors")
-options(stringsAsFactors = FALSE)
-data <- do.call(rbind.data.frame, data)
-options(stringsAsFactors = oldOpt)
-
-names(data) <- colNames
+if (mode > 0) {
+  if (mode == 1) {
+computeElap <- computeHelper(mode, partition, serializer, 
deserializer, key,
+ colNames, computeFunc, outputCon, data)
   } else {
-# Check to see if data is a valid data.frame
-stopifnot(deserializer == "byte")
-stopifnot(class(data) == "data.frame")
-  }
-  output <- computeFunc(data)
-  if (serializer == "row") {
-# Transform the result data.frame back to a list of rows
-output <- split(output, seq(nrow(output)))
-  } else {
-# Serialize the ouput to a byte array
-stopifnot(serializer == "byte")
+# gapply mode
+for (i in 1:length(data)) {
+  # TODO compute the diff between outputElap and computeElap
+  # Currently, only `computeElap` for the last iteration will be 
used.
+  computeElap <- computeHelper(mode, partition, serializer, 
deserializer, key,
--- End diff --

make sure correct key is passed in 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65562991
  
--- Diff: R/pkg/R/deserialize.R ---
@@ -197,6 +197,32 @@ readMultipleObjects <- function(inputCon) {
   data # this is a list of named lists now
 }
 
+readMultipleObjectsWithKeys <- function(inputCon) {
+  # readMultipleObjectsWithKeys will read multiple continuous objects from
+  # a DataOutputStream. There is no preceding field telling the count
+  # of the objects, so the number of objects varies, we try to read
+  # all objects in a loop until the end of the stream. This function
+  # is for use by gapply. Each group of rows is followed by the grouping
+  # key for this group which is then followed by next group.
+  data <- list()
+  subData <- list()
+  while (TRUE) {
+# If reaching the end of the stream, type returned should be "".
+type <- readType(inputCon)
+if (type == "") {
+  break
+} else if (type == "r") {
+  # A grouping boundary detected
+  readTypedObject(inputCon, type)
--- End diff --

should save the key to a variable that will be passed as the first argument 
to the user specified R function


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65562697
  
--- Diff: R/pkg/inst/worker/worker.R ---
@@ -27,6 +27,58 @@ elapsedSecs <- function() {
   proc.time()[3]
 }
 
+computeHelper <- function(mode, partition, serializer, deserializer, key,
--- End diff --

propose to separate this function to two functions: compute() and 
outputResult(). Compute elapsed time and output elapsed time can be calculated 
within these two functions.
Something like follows:
```
  computeElapsedTime <- 0
  outputElapsedTime <- 0
  for (group in groups) {
(output, elapsed time) <- compute(group)
computeElapsedTime += elapsed time
outputElapsedTime += outputResult(output)
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65561139
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---
@@ -325,6 +330,77 @@ case class MapGroupsExec(
 }
 
 /**
+ * Groups the input rows together and calls the R function with each group 
and an iterator
+ * containing all elements in the group.
+ * The result of this function is flattened before being output.
+ */
+case class FlatMapGroupsInRExec(
+func: Array[Byte],
+packageNames: Array[Byte],
+broadcastVars: Array[Broadcast[Object]],
+inputSchema: StructType,
+outputSchema: StructType,
+keyDeserializer: Expression,
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
+
+  override def output: Seq[Attribute] = outputObjAttr :: Nil
+  override def producedAttributes: AttributeSet = 
AttributeSet(outputObjAttr)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val groupNames = groupingAttributes.map(_.name).toArray
+val isDeserializedRData =
+  if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false
+val serializerForR = if (!isDeserializedRData) {
+  SerializationFormats.ROW
+} else {
+  SerializationFormats.BYTE
+}
+
+child.execute().mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val getKey = ObjectOperator.deserializeRowToObject(keyDeserializer, 
groupingAttributes)
+  val getValue = 
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
+  val outputObject = 
ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)
+  val runner = new RRunner[Array[Byte]](
+func, SerializationFormats.ROW, serializerForR, packageNames, 
broadcastVars,
+isDataFrame = true, colNames = inputSchema.fieldNames, key = 
groupNames)
+
+  val hasGroups = grouped.hasNext
--- End diff --

propose to refactor the code as follows. No need special handling for empty 
iterator. This should be handled within RRunner.compute().
```
val groupedRBytes = grouped.map { case (key, rowIter) =>
  (rowToRBytes(getKey(key).asInstanceOf[Row]),
   rowIter.map(getValue).asInstanceOf[Iterator[Row]].map {row => 
rowToRBytes(row)})
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65561420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
 ---
@@ -25,6 +25,21 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
 
 /**
+ * A function wrapper that applies the given R function to each partition 
of each group.
--- End diff --

Please revert change to MapPartitionsRWrapper.scala. No change is needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65560547
  
--- Diff: core/src/main/scala/org/apache/spark/api/r/RRunner.scala ---
@@ -40,7 +40,8 @@ private[spark] class RRunner[U](
 broadcastVars: Array[Broadcast[Object]],
 numPartitions: Int = -1,
 isDataFrame: Boolean = false,
-colNames: Array[String] = null)
+colNames: Array[String] = null,
+key: Array[String] = null)
--- End diff --

remove this useless parameter. No need to pass grouping column names to R 
worker


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65560574
  
--- Diff: core/src/main/scala/org/apache/spark/api/r/RRunner.scala ---
@@ -149,12 +150,24 @@ private[spark] class RRunner[U](
 
   dataOut.writeInt(numPartitions)
 
-  dataOut.writeInt(if (isDataFrame) 1 else 0)
+  val mode = if (isDataFrame && key != null) {
+2 // gapply
+  } else if (isDataFrame) {
+1 // dapply
+  } else {
+0 // RDD
+  }
+  dataOut.writeInt(mode)
 
   if (isDataFrame) {
 SerDe.writeObject(dataOut, colNames)
   }
 
+  // Write key - the grouping columns for gapply mode
--- End diff --

remove this piece of code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...

2016-06-02 Thread sun-rui
Github user sun-rui commented on a diff in the pull request:

https://github.com/apache/spark/pull/12836#discussion_r65560429
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---
@@ -325,6 +330,77 @@ case class MapGroupsExec(
 }
 
 /**
+ * Groups the input rows together and calls the R function with each group 
and an iterator
+ * containing all elements in the group.
+ * The result of this function is flattened before being output.
+ */
+case class FlatMapGroupsInRExec(
+func: Array[Byte],
+packageNames: Array[Byte],
+broadcastVars: Array[Broadcast[Object]],
+inputSchema: StructType,
+outputSchema: StructType,
+keyDeserializer: Expression,
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
+
+  override def output: Seq[Attribute] = outputObjAttr :: Nil
+  override def producedAttributes: AttributeSet = 
AttributeSet(outputObjAttr)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val groupNames = groupingAttributes.map(_.name).toArray
+val isDeserializedRData =
--- End diff --

rename the val to "isSerializedRData"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org