[GitHub] spark pull request #19858: [SPARK-22489][DOC][FOLLOWUP] Update broadcast beh...

2017-12-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19858


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r154576646
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+
+/**
+ * Defines APIs used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred bu children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val paramsFromSubExprs = prepareFunctionParams(ctx, subExprs, 
subExprCodes)
--- End diff --

do we support subexpression elimination when splitting codes to methods in 
the non-whole-stage-codegen path?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154576368
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -573,94 +574,84 @@ case class HashAggregateExec(
   enableTwoLevelHashMap(ctx)
 } else {
   
sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) 
match {
-case "true" => logWarning("Two level hashmap is disabled but 
vectorized hashmap is " +
-  "enabled.")
-case null | "" | "false" => None
+case "true" =>
+  logWarning("Two level hashmap is disabled but vectorized hashmap 
is enabled.")
+case _ =>
   }
 }
-fastHashMapTerm = ctx.freshName("fastHashMap")
-val fastHashMapClassName = ctx.freshName("FastHashMap")
-val fastHashMapGenerator =
-  if (isVectorizedHashMapEnabled) {
-new VectorizedHashMapGenerator(ctx, aggregateExpressions,
-  fastHashMapClassName, groupingKeySchema, bufferSchema)
-  } else {
-new RowBasedHashMapGenerator(ctx, aggregateExpressions,
-  fastHashMapClassName, groupingKeySchema, bufferSchema)
-  }
 
 val thisPlan = ctx.addReferenceObj("plan", this)
 
-// Create a name for iterator from vectorized HashMap
+// Create a name for the iterator from the fast hash map.
 val iterTermForFastHashMap = ctx.freshName("fastHashMapIter")
 if (isFastHashMapEnabled) {
+  // Generates the fast hash map class and creates the fash hash map 
term.
+  fastHashMapTerm = ctx.freshName("fastHashMap")
+  val fastHashMapClassName = ctx.freshName("FastHashMap")
   if (isVectorizedHashMapEnabled) {
+val generatedMap = new VectorizedHashMapGenerator(ctx, 
aggregateExpressions,
+  fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
+ctx.addInnerClass(generatedMap)
+
 ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
   s"$fastHashMapTerm = new $fastHashMapClassName();")
 ctx.addMutableState(
-  
"java.util.Iterator",
+  classOf[java.util.Iterator[ColumnarRow]].getName,
--- End diff --

Is this as same as before?

```scala
scala> classOf[java.util.Iterator[Int]].getName
res2: String = java.util.Iterator
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19858: [SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior c...

2017-12-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19858
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19858: [SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior c...

2017-12-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19858
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r154575849
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+
+/**
+ * Defines APIs used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred bu children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val paramsFromSubExprs = prepareFunctionParams(ctx, subExprs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs, subExprs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
+subExprs.map { subExpr =>
+  val stat = ctx.subExprEliminationExprs(subExpr)
+  ExprCode(code = "", value = stat.value, isNull = stat.isNull)
+}
+  }
+
+  /**
+   * Retrieves previous input rows referred by children and deferred 
expressions.
+   */
+  def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): 
Seq[String] = {
+expr.children.flatMap(getInputRows(ctx, _)).distinct
+  }
+
+  /**
+   * Given a child expression, retrieves previous input rows referred by 
it or deferred expressions
+   * which are needed to evaluate it.
+   */
+  def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = {
+child.flatMap {
+  // An expression directly evaluates on current input row.
+  case BoundReference(ordinal, _, _) if ctx.currentVars == null ||
+  ctx.currentVars(ordinal) == null =>
+Seq(ctx.INPUT_ROW)
+
+  // An expression which is 

[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...

2017-12-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19841#discussion_r154575693
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ---
@@ -104,14 +105,61 @@ case class InsertIntoHiveTable(
 val partitionColumns = 
fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
 val partitionColumnNames = 
Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
 
-// By this time, the partition map must match the table's partition 
columns
-if (partitionColumnNames.toSet != partition.keySet) {
-  throw new SparkException(
-s"""Requested partitioning does not match the 
${table.identifier.table} table:
-   |Requested partitions: ${partition.keys.mkString(",")}
-   |Table partitions: 
${table.partitionColumnNames.mkString(",")}""".stripMargin)
+try {
+  // By this time, the partition map must match the table's partition 
columns
+  if (partitionColumnNames.toSet != partition.keySet) {
+throw new SparkException(
+  s"""Requested partitioning does not match the 
${table.identifier.table} table:
+ |Requested partitions: ${partition.keys.mkString(",")}
+ |Table partitions: 
${table.partitionColumnNames.mkString(",")}""".stripMargin)
+  }
+
+  validatePartitionSpec(hadoopConf, numDynamicPartitions, 
numStaticPartitions,
+partitionSpec, partitionColumnNames)
+
+  validateBucketSpec(hadoopConf)
+
+  val partitionAttributes = 
partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
+query.resolve(name :: Nil, 
sparkSession.sessionState.analyzer.resolver).getOrElse {
+  throw new AnalysisException(
+s"Unable to resolve $name given 
[${query.output.map(_.name).mkString(", ")}]")
+}.asInstanceOf[Attribute]
+  }
+
--- End diff --

```Scala
  try {
```

should start from this line, right?

No need to create `validatePartitionSpec ` and `validateBucketSpec ` in 
this PR. We want to minimized the code changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19874: [SPARK-22675] [SQL] Refactoring PropagateTypes in TypeCo...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19874
  
**[Test build #84418 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84418/testReport)**
 for PR 19874 at commit 
[`657cf88`](https://github.com/apache/spark/commit/657cf88bbf4259d1a823f93f16eaccc2fbe78667).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r154575588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala 
---
@@ -108,7 +108,10 @@ private[sql] trait ColumnarBatchScan extends 
CodegenSupport {
  |}""".stripMargin)
 
 ctx.currentVars = null
+// `rowIdx` isn't in `ctx.currentVars`. If the expressions are split 
later, we can't track it.
+// So making it as global variable.
--- End diff --

can we add `rowId` to `ExprCode.inputVars` in `genCodeColumnVector`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17176: [SPARK-19833][SQL]remove SQLConf.HIVE_VERIFY_PART...

2017-12-03 Thread barrenlake
Github user barrenlake commented on a diff in the pull request:

https://github.com/apache/spark/pull/17176#discussion_r154575331
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -159,36 +159,11 @@ class HadoopTableReader(
 def verifyPartitionPath(
 partitionToDeserializer: Map[HivePartition, Class[_ <: 
Deserializer]]):
 Map[HivePartition, Class[_ <: Deserializer]] = {
-  if (!sparkSession.sessionState.conf.verifyPartitionPath) {
-partitionToDeserializer
-  } else {
-var existPathSet = collection.mutable.Set[String]()
-var pathPatternSet = collection.mutable.Set[String]()
-partitionToDeserializer.filter {
-  case (partition, partDeserializer) =>
-def updateExistPathSetByPathPattern(pathPatternStr: String) {
-  val pathPattern = new Path(pathPatternStr)
-  val fs = pathPattern.getFileSystem(hadoopConf)
-  val matches = fs.globStatus(pathPattern)
-  matches.foreach(fileStatus => existPathSet += 
fileStatus.getPath.toString)
-}
-// convert  /demo/data/year/month/day  to  /demo/data/*/*/*/
-def getPathPatternByPath(parNum: Int, tempPath: Path): String 
= {
-  var path = tempPath
-  for (i <- (1 to parNum)) path = path.getParent
-  val tails = (1 to parNum).map(_ => "*").mkString("/", "/", 
"/")
-  path.toString + tails
-}
-
-val partPath = partition.getDataLocation
-val partNum = 
Utilities.getPartitionDesc(partition).getPartSpec.size();
-var pathPatternStr = getPathPatternByPath(partNum, partPath)
-if (!pathPatternSet.contains(pathPatternStr)) {
-  pathPatternSet += pathPatternStr
-  updateExistPathSetByPathPattern(pathPatternStr)
-}
-existPathSet.contains(partPath.toString)
-}
+  partitionToDeserializer.filter {
+case (partition, partDeserializer) =>
+  val partPath = partition.getDataLocation
+  val fs = partPath.getFileSystem(hadoopConf)
+  fs.exists(partPath)
--- End diff --

Each partition sending an RPC request to the NameNode can result in poor 
performance


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19874: [SPARK-22675] [SQL] Refactoring PropagateTypes in...

2017-12-03 Thread gatorsmile
GitHub user gatorsmile opened a pull request:

https://github.com/apache/spark/pull/19874

[SPARK-22675] [SQL] Refactoring PropagateTypes in TypeCoercion

## What changes were proposed in this pull request?
PropagateTypes are called twice in TypeCoercion. We do not need to call it 
twice. Instead, we should call it after each change on the types. 

## How was this patch tested?
The existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gatorsmile/spark deduplicatePropagateTypes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19874.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19874


commit fc3a24e039c905fdd263d2ece2088ce152887fc7
Author: gatorsmile 
Date:   2017-12-03T05:55:00Z

clean

commit 657cf88bbf4259d1a823f93f16eaccc2fbe78667
Author: gatorsmile 
Date:   2017-12-04T07:40:26Z

refactor




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19869
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19869
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84416/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19869
  
**[Test build #84416 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84416/testReport)**
 for PR 19869 at commit 
[`479fd8d`](https://github.com/apache/spark/commit/479fd8d4cb6ba6f0126d996362125746dfeac8f2).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-03 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r154574087
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+
+/**
+ * Defines APIs used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred bu children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val paramsFromSubExprs = prepareFunctionParams(ctx, subExprs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs, subExprs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
+subExprs.map { subExpr =>
+  val stat = ctx.subExprEliminationExprs(subExpr)
--- End diff --

nit: what does `stat` mean? In general, it is related to `statistics`. Is 
it better to use another variable name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r154573755
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -1001,16 +1017,25 @@ class CodegenContext {
 commonExprs.foreach { e =>
   val expr = e.head
   val fnName = freshName("evalExpr")
-  val isNull = s"${fnName}IsNull"
+  val isNull = if (expr.nullable) {
+s"${fnName}IsNull"
+  } else {
+""
+  }
   val value = s"${fnName}Value"
 
   // Generate the code for this expression tree and wrap it in a 
function.
   val eval = expr.genCode(this)
+  val nullValue = if (expr.nullable) {
+s"$isNull = ${eval.isNull};"
--- End diff --

nit:
```
val setIsNull = if (expr.nullable) {
  s"${fnName}IsNull = ${eval.isNull};"
} else ""
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154572754
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind " +
+  "address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be " +
+  "managed via a Kubernetes service.")
+
+val preferredServiceName = 
s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
+  preferredServiceName
+} else {
+  val randomServiceId = clock.getTimeMillis()
+  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+  logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
+s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). 
Falling back to use " +
+s"$shorterServiceName as the driver service's name.")
+  shorterServiceName
+}
+
+val driverPort = submissionSparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT)
+val driverBlockManagerPort = submissionSparkConf.getInt(
+org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, 
DEFAULT_BLOCKMANAGER_PORT)
+val driverService = new ServiceBuilder()
+  .withNewMetadata()
+.withName(resolvedServiceName)
+.endMetadata()
+  .withNewSpec()
+.withClusterIP("None")
+.withSelector(driverLabels.asJava)
+.addNewPort()
+  .withName(DRIVER_PORT_NAME)
+  .withPort(driverPort)
+  .withNewTargetPort(driverPort)
+  .endPort()
+.addNewPort()
+  .withName(BLOCK_MANAGER_PORT_NAME)
+  .withPort(driverBlockManagerPort)
+  .withNewTargetPort(driverBlockManagerPort)
+  .endPort()
+.endSpec()
+  .build()
+
+val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
+val driverHostname = 
s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
+val resolvedSparkConf = driverSpec.driverSparkConf.clone()
+  .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, 
driverHostname)
--- End diff --

I meant isn't `org.apache.spark.internal.config.DRIVER_HOST_ADDRESS` just 
`DRIVER_HOST_KEY`?


---

-
To unsubscribe, e-mail: 

[GitHub] spark issue #19873: [SPARK-20392][SQL] Set barrier to prevent re-entering a ...

2017-12-03 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19873
  
cc @cloud-fan @hvanhovell Basically this is the same changes in #17770.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19873: [SPARK-20392][SQL] Set barrier to prevent re-entering a ...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19873
  
**[Test build #84417 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84417/testReport)**
 for PR 19873 at commit 
[`136fd30`](https://github.com/apache/spark/commit/136fd30cb98609e648a8b689a28853c1bae67bf7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19873: [SPARK-20392][SQL] Set barrier to prevent re-ente...

2017-12-03 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/19873

[SPARK-20392][SQL] Set barrier to prevent re-entering a tree

## What changes were proposed in this pull request?

It is reported that there is performance downgrade when applying ML 
pipeline for dataset with many columns but few rows.

A big part of the performance downgrade comes from some operations (e.g., 
`select`) on DataFrame/Dataset which re-create new DataFrame/Dataset with a new 
`LogicalPlan`. The cost can be ignored in the usage of SQL, normally.

However, it's not rare to chain dozens of pipeline stages in ML. When the 
query plan grows incrementally during running those stages, the total cost 
spent on re-creation of DataFrame grows too. In particular, the `Analyzer` will 
go through the big query plan even most part of it is analyzed.

By eliminating part of the cost, the time to run the example code locally 
is reduced from about 1min to about 30 secs.

In particular, the time applying the pipeline locally is mostly spent on 
calling transform of the 137 `Bucketizer`s. Before the change, each call of 
`Bucketizer`'s transform can cost about 0.4 sec. So the total time spent on all 
`Bucketizer`s' transform is about 50 secs. After the change, each call only 
costs about 0.1 sec.

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-20392-reopen

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19873.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19873


commit 136fd30cb98609e648a8b689a28853c1bae67bf7
Author: Liang-Chi Hsieh 
Date:   2017-12-04T07:19:35Z

Add analysis barrier around analyzed plans.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r154571348
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -115,9 +120,35 @@ abstract class Expression extends TreeNode[Expression] 
{
 }
   }
 
+  /**
+   * Returns the input variables to this expression.
+   */
+  private def findInputVars(ctx: CodegenContext, eval: ExprCode): 
Seq[ExprInputVar] = {
+if (ctx.currentVars != null) {
+  val boundRefs = this.collect {
+case b @ BoundReference(ordinal, _, _) if ctx.currentVars(ordinal) 
!= null => (ordinal, b)
--- End diff --

shall we add an assert here to guarantee that, 
`ctx.currentVars(ordinal).code` is empty?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

2017-12-03 Thread zuotingbing
Github user zuotingbing commented on the issue:

https://github.com/apache/spark/pull/19841
  
I extract a separate function but it has too many parameters.  Could i 
extract several separate functions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19869
  
**[Test build #84416 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84416/testReport)**
 for PR 19869 at commit 
[`479fd8d`](https://github.com/apache/spark/commit/479fd8d4cb6ba6f0126d996362125746dfeac8f2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19869
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84410/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19869
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19869
  
**[Test build #84410 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84410/testReport)**
 for PR 19869 at commit 
[`9b8ae3d`](https://github.com/apache/spark/commit/9b8ae3d6635c5ed0323bf088e20d0de55dd1c098).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r154569899
  
--- Diff: python/pyspark/sql/group.py ---
@@ -89,8 +89,15 @@ def agg(self, *exprs):
 else:
 # Columns
 assert all(isinstance(c, Column) for c in exprs), "all exprs 
should be Column"
-jdf = self._jgd.agg(exprs[0]._jc,
-_to_seq(self.sql_ctx._sc, [c._jc for c in 
exprs[1:]]))
+if isinstance(exprs[0], UDFColumn):
+assert all(isinstance(c, UDFColumn) for c in exprs)
--- End diff --

A informative error message should be better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r154569953
  
--- Diff: python/pyspark/sql/group.py ---
@@ -89,8 +89,15 @@ def agg(self, *exprs):
 else:
 # Columns
 assert all(isinstance(c, Column) for c in exprs), "all exprs 
should be Column"
-jdf = self._jgd.agg(exprs[0]._jc,
-_to_seq(self.sql_ctx._sc, [c._jc for c in 
exprs[1:]]))
+if isinstance(exprs[0], UDFColumn):
+assert all(isinstance(c, UDFColumn) for c in exprs)
--- End diff --

Like `all exprs should be UDFColumn"`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r154569884
  
--- Diff: python/pyspark/sql/group.py ---
@@ -89,8 +89,15 @@ def agg(self, *exprs):
 else:
 # Columns
 assert all(isinstance(c, Column) for c in exprs), "all exprs 
should be Column"
-jdf = self._jgd.agg(exprs[0]._jc,
-_to_seq(self.sql_ctx._sc, [c._jc for c in 
exprs[1:]]))
+if isinstance(exprs[0], UDFColumn):
+assert all(isinstance(c, UDFColumn) for c in exprs)
+jdf = self._jgd.aggInPandas(
+_to_seq(self.sql_ctx._sc, [c._jc for c in exprs]))
+else:
+jdf = self._jgd.agg(exprs[0]._jc,
--- End diff --

If `exprs[n]` (n > 0) is a `UDFColumn`? I think we should make sure if any 
column is a `UDFColumn`, all columns should be `UDFColumn`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154569858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -768,12 +762,8 @@ case class HashAggregateExec(
 
 // generate hash code for key
 val hashExpr = Murmur3Hash(groupingExpressions, 42)
-ctx.currentVars = input
 val hashEval = BindReferences.bindReference(hashExpr, 
child.output).genCode(ctx)
 
-val inputAttr = aggregateBufferAttributes ++ child.output
-ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
-
 val (checkFallbackForGeneratedHashMap, 
checkFallbackForBytesToBytesMap, resetCounter,
 incCounter) = if (testFallbackStartsAt.isDefined) {
--- End diff --

ok, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154569741
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -768,12 +762,8 @@ case class HashAggregateExec(
 
 // generate hash code for key
 val hashExpr = Murmur3Hash(groupingExpressions, 42)
-ctx.currentVars = input
 val hashEval = BindReferences.bindReference(hashExpr, 
child.output).genCode(ctx)
 
-val inputAttr = aggregateBufferAttributes ++ child.output
-ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
-
 val (checkFallbackForGeneratedHashMap, 
checkFallbackForBytesToBytesMap, resetCounter,
 incCounter) = if (testFallbackStartsAt.isDefined) {
--- End diff --

I'll have another PR for this part, will reformat it at that time


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19872
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84415/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19872
  
**[Test build #84415 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84415/testReport)**
 for PR 19872 at commit 
[`a1058b8`](https://github.com/apache/spark/commit/a1058b8f91bc1093ef231bf41d6553d045788abc).
 * This patch **fails Python style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19872
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19872
  
**[Test build #84415 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84415/testReport)**
 for PR 19872 at commit 
[`a1058b8`](https://github.com/apache/spark/commit/a1058b8f91bc1093ef231bf41d6553d045788abc).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...

2017-12-03 Thread yaooqinn
Github user yaooqinn commented on the issue:

https://github.com/apache/spark/pull/19840
  
@ueshin case 8 should be client deploy mode, excuse me for copy mistake, 
fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r154569177
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -113,6 +113,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with 
PredicateHelper {
   def apply(plan: SparkPlan): SparkPlan = plan transformUp {
 // FlatMapGroupsInPandas can be evaluated directly in python worker
 // Therefore we don't need to extract the UDFs
--- End diff --

`FlatMapGroupsInPandas` and `AggregateInPandasExec` can be...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154569020
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
 ---
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# echo commands to the terminal output
+set -x
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154568773
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +604,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

Actually k8s does not yet support `kill` and `status`, nor does it support 
`spark.cores.max` yet. Updated `validateKillArguments` and 
`validateStatusRequestArguments`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...

2017-12-03 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/19840
  
@yaooqinn What's the difference between case 7 and 8? Looks like the same 
configuration but the different result?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154568554
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -159,6 +154,12 @@ public boolean hasNext() {
 @Override
 public UnsafeRow next() {
   try {
+if (!alreadyCalculated) {
--- End diff --

When `hasNext` is called, doesn't `sortedIterator` return no element 
anymore since we haven't added rows into it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154568281
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -159,6 +154,12 @@ public boolean hasNext() {
 @Override
 public UnsafeRow next() {
   try {
+if (!alreadyCalculated) {
+  while (inputIterator.hasNext()) {
+insertRow(inputIterator.next());
+  }
+  alreadyCalculated = true;
+}
 sortedIterator.loadNext();
--- End diff --

`sortedIterator` is already assigned at L143. When you insert rows when 
first time to call `next`, can the `sortedIterator` correctly return sorted 
elements?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19872
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19872
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84414/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19872
  
**[Test build #84414 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84414/testReport)**
 for PR 19872 at commit 
[`4cfaf0e`](https://github.com/apache/spark/commit/4cfaf0e9723bcfbb74dfd1b9d1f5e30682bf072f).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...

2017-12-03 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/19872
  
cc @HyukjinKwon @holdenk @ueshin 

Passing some basic tests. I will work on this more next week to clean up 
and add more testing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19872
  
**[Test build #84414 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84414/testReport)**
 for PR 19872 at commit 
[`4cfaf0e`](https://github.com/apache/spark/commit/4cfaf0e9723bcfbb74dfd1b9d1f5e30682bf072f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154567885
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

I think if we eventually decide to not have default docker images, we 
should make the options  `--param` ones. I'm not sure if we want to make a call 
and do that in this PR though. Can we defer this to a later time when we are 
clearer on how we publish and maintain the images?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18995: [SPARK-21787][SPARK-22672][SQL] Support for pushing down...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/18995
  
BTW, @cloud-fan . Do you mean literally `move`? I'm wondering if I'm 
thinking in a different way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-03 Thread icexelloss
GitHub user icexelloss opened a pull request:

https://github.com/apache/spark/pull/19872

WIP: [SPARK-22274][PySpark] User-defined aggregation functions with pandas 
udf

## What changes were proposed in this pull request?

Add support for pandas_udf in groupby().agg()

## How was this patch tested?

GroupbyAggTests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/icexelloss/spark SPARK-22274-groupby-agg

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19872.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19872


commit f71575782be3f9c41184eeafa275b5ba1cb5fb83
Author: Li Jin 
Date:   2017-12-01T17:26:26Z

Initial commit: wip

commit 2e03eec8de2ed6d38e807428c18f2500a8717b32
Author: Li Jin 
Date:   2017-12-01T22:54:02Z

Test working. Need clean up

commit 456c4a8adf646ee46b00f8ce51d4e9e8279abc3e
Author: Li Jin 
Date:   2017-12-04T06:34:16Z

Add tests

commit 35ff548ac942d210ccd99fb2a60b95e2d4a28e2a
Author: Li Jin 
Date:   2017-12-04T06:36:03Z

Clean up




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154567693
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -750,6 +756,8 @@ private[joins] class SortMergeJoinScanner(
   bufferedMatches.clear()
   false
 } else {
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Add a comment like 
https://github.com/apache/spark/pull/19862/files#r154567168.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19871
  
**[Test build #84413 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84413/testReport)**
 for PR 19871 at commit 
[`2e498f9`](https://github.com/apache/spark/commit/2e498f9c1a748bdb648705c16305f9f34e638ff8).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154567585
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -182,18 +183,14 @@ public UnsafeRow next() {
 }
   };
 } catch (IOException e) {
-  cleanupResources();
   throw e;
+} finally {
+  // Since we won't ever call next() on an empty iterator, we need to 
clean up resources
+  // here in order to prevent memory leaks.
+  cleanupResources();
--- End diff --

This makes the resource cleaned up when we return iterator too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154567319
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner(
   private[this] val bufferedMatches =
 new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
 
-  // Initialization (note: do _not_ want to advance streamed here).
-  advancedBufferedToRowWithNullFreeJoinKey()
+  // Initialization (note: do _not_ want to advance streamed here). This 
is made lazy to prevent
+  // unnecessary trigger of calculation.
+  private lazy val advancedBufferedIterRes = 
advancedBufferedToRowWithNullFreeJoinKey()
--- End diff --

`This is made lazy to run the initialization only once when accessing it.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154567195
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala 
---
@@ -54,11 +55,17 @@ class DDLSourceLoadSuite extends DataSourceTest with 
SharedSQLContext {
   .load().schema == StructType(Seq(StructField("stringType", 
StringType, nullable = false
   }
 
-  test("should fail to load ORC without Hive Support") {
-val e = intercept[AnalysisException] {
-  spark.read.format("orc").load()
+  test("should fail to load ORC only if spark.sql.orc.enabled=false and 
without Hive Support") {
--- End diff --

Oh, I confused with 
[SQLQuerySuite.scala](https://github.com/apache/spark/pull/19871/files#diff-1ea02a6fab84e938582f7f87cc4d9ea1R2157)
 in hive. Sorry, I'll remove this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154567168
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Then add a comment like `Initialization at the first time reaching here`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154567054
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile ---
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile .
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
+if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp 
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS
--- End diff --

I think this is debatable. They have some env variables with similar 
purposes, but they also have role-specific arguments/properties that probably 
make sense to warrant separate images.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18995: [SPARK-21787][SPARK-22672][SQL] Support for pushing down...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/18995
  
@cloud-fan .

For the other part, I'm restructuring to remove redundancy in the same 
manner of `InMemoryCatalogedDDLSuite`, `HiveCatalogedDDLSuite`, and `DDLSuite`.

This one cannot be the part of that restructuring, because the test 
function signatures are different due to `PredicateLeaf` and `SearchArgument`.

Could you review this first?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19871
  
**[Test build #84412 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84412/testReport)**
 for PR 19871 at commit 
[`8b7e88a`](https://github.com/apache/spark/commit/8b7e88a833adf1d3138ce426142b6bb6abe057df).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...

2017-12-03 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19869
  
LGTM for super minor comments...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154566562
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Oh. I see. `advancedBufferedIterRes` is a lazy val.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154566501
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -119,5 +139,60 @@ private[spark] object Config extends Logging {
 "must be a positive integer")
   .createWithDefault(10)
 
+  val WAIT_FOR_APP_COMPLETION =
+ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
+  .doc("In cluster mode, whether to wait for the application to finish 
before exiting the " +
+"launcher process.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val REPORT_INTERVAL =
+ConfigBuilder("spark.kubernetes.report.interval")
+  .doc("Interval between reports of the current app status in cluster 
mode.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("1s")
+
+  private[spark] val JARS_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
+  .doc("Location to download jars to in the driver and executors. When 
using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pod.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-jars")
+
+  private[spark] val FILES_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
+  .doc("Location to download files to in the driver and executors. 
When using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pods.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-files")
--- End diff --

See comment above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154566445
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -882,45 +851,65 @@ case class HashAggregateExec(
  |${evaluateVariables(unsafeRowBufferEvals)}
  |// update unsafe row buffer
  |${updateUnsafeRowBuffer.mkString("\n").trim}
-   """.stripMargin
+   """.stripMargin
 }
 
+val updateRowInHashMap: String = {
+  if (isFastHashMapEnabled) {
+ctx.INPUT_ROW = fastRowBuffer
+val boundUpdateExpr = 
updateExpr.map(BindReferences.bindReference(_, inputAttr))
+val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+val effectiveCodes = subExprs.codes.mkString("\n")
+val fastRowEvals = 
ctx.withSubExprEliminationExprs(subExprs.states) {
+  boundUpdateExpr.map(_.genCode(ctx))
+}
+val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
+  val dt = updateExpr(i).dataType
+  ctx.updateColumn(
+fastRowBuffer, dt, i, ev, updateExpr(i).nullable, 
isVectorizedHashMapEnabled)
+}
+
+// If fast hash map is on, we first generate code to update row in 
fast hash map, if the
+// previous loop up hit fast hash map. Otherwise, update row in 
regular hash map.
+s"""
+   |if ($fastRowBuffer != null) {
+   |  // common sub-expressions
+   |  $effectiveCodes
+   |  // evaluate aggregate function
+   |  ${evaluateVariables(fastRowEvals)}
+   |  // update fast row
+   |  ${updateFastRow.mkString("\n").trim}
+   |} else {
+   |  $updateRowInRegularHashMap
+   |}
+   """.stripMargin
+  } else {
+updateRowInRegularHashMap
+  }
+}
+
+val declareFastRowBuffer: String = if (isFastHashMapEnabled) {
+  val rowType = if (isVectorizedHashMapEnabled) {
+classOf[MutableColumnarRow].getName
+  } else {
+"UnsafeRow"
+  }
+  s"$rowType $fastRowBuffer = null;"
+} else ""
 
 // We try to do hash map based in-memory aggregation first. If there 
is not enough memory (the
 // hash map will return null for new key), we spill the hash map to 
disk to free memory, then
 // continue to do in-memory aggregation and spilling until all the 
rows had been processed.
 // Finally, sort the spilled aggregate buffers by key, and merge them 
together for same key.
 s"""
  UnsafeRow $unsafeRowBuffer = null;
--- End diff --

nit: How about this for just consistency?
```
s"""
 $declareRowBuffer

 $findOrInsertHashMap

 $incCounter

 $updateRowInHashMap
 """
```
Then,
```
val declareRowBuffer: String = if (isFastHashMapEnabled) {
  val rowType = if (isVectorizedHashMapEnabled) {
classOf[MutableColumnarRow].getName
  } else {
"UnsafeRow"
  }
  s"""
 |UnsafeRow $unsafeRowBuffer = null;
 |$rowType $fastRowBuffer = null;
   """.stripMargin
} else {
  s"UnsafeRow $unsafeRowBuffer = null;"
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154565659
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -768,12 +762,8 @@ case class HashAggregateExec(
 
 // generate hash code for key
 val hashExpr = Murmur3Hash(groupingExpressions, 42)
-ctx.currentVars = input
 val hashEval = BindReferences.bindReference(hashExpr, 
child.output).genCode(ctx)
 
-val inputAttr = aggregateBufferAttributes ++ child.output
-ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
-
 val (checkFallbackForGeneratedHashMap, 
checkFallbackForBytesToBytesMap, resetCounter,
 incCounter) = if (testFallbackStartsAt.isDefined) {
--- End diff --

nit: need some indents in the head?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154561232
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -621,34 +622,30 @@ case class HashAggregateExec(
 val iterTerm = ctx.freshName("mapIter")
 ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, 
iterTerm)
 
-def generateGenerateCode(): String = {
-  if (isFastHashMapEnabled) {
-if (isVectorizedHashMapEnabled) {
-  s"""
-   | 
${fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate()}
-  """.stripMargin
-} else {
-  s"""
-   | 
${fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate()}
-  """.stripMargin
-}
-  } else ""
+if (isFastHashMapEnabled) {
+  val generatedMap = if (isVectorizedHashMapEnabled) {
+
fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate()
+  } else {
+
fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate()
+  }
+  ctx.addInnerClass(generatedMap)
 }
-ctx.addInnerClass(generateGenerateCode())
 
 val doAgg = ctx.freshName("doAggregateWithKeys")
 val peakMemory = metricTerm(ctx, "peakMemory")
 val spillSize = metricTerm(ctx, "spillSize")
 val avgHashProbe = metricTerm(ctx, "avgHashProbe")
+val finishFashHashMap = if (isFastHashMapEnabled) {
--- End diff --

nit: can we merge this branch with the the branch above (line 625) for 
hashmap stuffs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154562413
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -672,48 +668,56 @@ case class HashAggregateExec(
 
 def outputFromRowBasedMap: String = {
   s"""
-   while ($iterTermForFastHashMap.next()) {
- $numOutput.add(1);
- UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
- UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
- $outputFunc($keyTerm, $bufferTerm);
-
- if (shouldStop()) return;
-   }
-   $fastHashMapTerm.close();
- """
+ |while ($iterTermForFastHashMap.next()) {
--- End diff --

super nit: can we also drop unnecessary spaces in the head from this file? 
e.g., 
```
  s"""
 | private void $doAgg() throws java.io.IOException {
```
```
  s"""
 |private void $doAgg() throws java.io.IOException {
```

https://github.com/cloud-fan/spark/blob/9b8ae3d6635c5ed0323bf088e20d0de55dd1c098/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L233


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154566463
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -700,38 +701,43 @@ private[joins] class SortMergeJoinScanner(
   bufferedMatches.clear()
   false
 } else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
+  // The new streamed row has the same join key as the previous row, 
so return the same
+  // matches.
--- End diff --

Unnecessary change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154566374
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Once we advance both the streamed and buffered iterators, and call 
`bufferMatchingRows` at the last turn, it will advance buffered iterator until 
the `bufferedRow` doesn't match with current `streamedRowKey`. 

In the next turn, the call of ``advancedBufferedIterRes` here will advance 
buffered iterator and so the `bufferedRow` will be missed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154566357
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: Option[MainAppResource],
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154566070
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile ---
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile 
.
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
+if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" 
-Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY 
-Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
$SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores 
$SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname 
$SPARK_EXECUTOR_POD_IP
--- End diff --

`SPARK_EXECUTOR_PORT` is no longer set as `spark.executor.port` is no 
longer used by Spark. Removed `SPARK_EXECUTOR_PORT`. `SPARK_MOUNTED_CLASSPATH` 
is set in `DependencyResolutionStep`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154566094
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind " +
+  "address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be " +
+  "managed via a Kubernetes service.")
+
+val preferredServiceName = 
s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
+  preferredServiceName
+} else {
+  val randomServiceId = clock.getTimeMillis()
+  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+  logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
+s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). 
Falling back to use " +
+s"$shorterServiceName as the driver service's name.")
+  shorterServiceName
+}
+
+val driverPort = submissionSparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT)
+val driverBlockManagerPort = submissionSparkConf.getInt(
+org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, 
DEFAULT_BLOCKMANAGER_PORT)
+val driverService = new ServiceBuilder()
+  .withNewMetadata()
+.withName(resolvedServiceName)
+.endMetadata()
+  .withNewSpec()
+.withClusterIP("None")
+.withSelector(driverLabels.asJava)
+.addNewPort()
+  .withName(DRIVER_PORT_NAME)
+  .withPort(driverPort)
+  .withNewTargetPort(driverPort)
+  .endPort()
+.addNewPort()
+  .withName(BLOCK_MANAGER_PORT_NAME)
+  .withPort(driverBlockManagerPort)
+  .withNewTargetPort(driverBlockManagerPort)
+  .endPort()
+.endSpec()
+  .build()
+
+val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
+val driverHostname = 
s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
+val resolvedSparkConf = driverSpec.driverSparkConf.clone()
+  .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, 
driverHostname)
--- End diff --

What do you meant here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154565988
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -568,8 +571,12 @@ object DataSource extends Logging {
 "org.apache.spark.Logging")
 
   /** Given a provider name, look up the data source class definition. */
-  def lookupDataSource(provider: String): Class[_] = {
-val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+  def lookupDataSource(conf: SQLConf, provider: String): Class[_] = {
+var provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+if (Seq("orc").contains(provider1.toLowerCase) && 
conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) {
--- End diff --

Oh. Yep.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19871
  
Thank you for review, @cloud-fan and @jiangxb1987 .
The PR is updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154565883
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala 
---
@@ -54,11 +55,17 @@ class DDLSourceLoadSuite extends DataSourceTest with 
SharedSQLContext {
   .load().schema == StructType(Seq(StructField("stringType", 
StringType, nullable = false
   }
 
-  test("should fail to load ORC without Hive Support") {
-val e = intercept[AnalysisException] {
-  spark.read.format("orc").load()
+  test("should fail to load ORC only if spark.sql.orc.enabled=false and 
without Hive Support") {
--- End diff --

that test also check the exception: 
https://github.com/apache/spark/pull/19871/files#diff-5a2e7f03d14856c8769fd3ddea8742bdR2790


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19871
  
**[Test build #84411 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84411/testReport)**
 for PR 19871 at commit 
[`e7beb02`](https://github.com/apache/spark/commit/e7beb02ef8b1f9761c77952fc69d087c7cc92d3f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154565755
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -568,8 +571,12 @@ object DataSource extends Logging {
 "org.apache.spark.Logging")
 
   /** Given a provider name, look up the data source class definition. */
-  def lookupDataSource(provider: String): Class[_] = {
-val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+  def lookupDataSource(conf: SQLConf, provider: String): Class[_] = {
+var provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+if (Seq("orc").contains(provider1.toLowerCase) && 
conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) {
--- End diff --

"orc".equalsIgnoreCast(...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154565597
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -296,6 +298,12 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   case (STANDALONE, CLUSTER) if args.isR =>
 printErrorAndExit("Cluster deploy mode is currently not supported 
for R " +
   "applications on standalone clusters.")
+  case (KUBERNETES, CLIENT) =>
+printErrorAndExit("Client mode is currently not supported for 
Kubernetes.")
+  case (KUBERNETES, _) if args.isPython =>
+printErrorAndExit("Python applications are currently not supported 
for Kubernetes.")
+  case (KUBERNETES, _) if args.isR =>
+printErrorAndExit("R applications are currently not supported for 
Kubernetes.")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154565348
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -568,8 +570,13 @@ object DataSource extends Logging {
 "org.apache.spark.Logging")
 
   /** Given a provider name, look up the data source class definition. */
-  def lookupDataSource(provider: String): Class[_] = {
-val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+  def lookupDataSource(sparkSession: SparkSession, provider: String): 
Class[_] = {
+var provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+if (Seq("orc", 
"org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) 
&&
+sparkSession.conf.get(SQLConf.ORC_ENABLED)) {
--- End diff --

It's done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19856: [SPARK-22664] The logs about "Connected to Zookee...

2017-12-03 Thread liu-zhaokun
Github user liu-zhaokun closed the pull request at:

https://github.com/apache/spark/pull/19856


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19871
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84409/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19871
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19871
  
**[Test build #84409 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84409/testReport)**
 for PR 19871 at commit 
[`37e240c`](https://github.com/apache/spark/commit/37e240cf12ea7463c2e0ea56501b812e12745869).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154563897
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner(
   private[this] val bufferedMatches =
 new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
 
-  // Initialization (note: do _not_ want to advance streamed here).
-  advancedBufferedToRowWithNullFreeJoinKey()
+  // Initialization (note: do _not_ want to advance streamed here). This 
is made lazy to prevent
+  // unnecessary trigger of calculation.
+  private lazy val advancedBufferedIterRes = 
advancedBufferedToRowWithNullFreeJoinKey()
--- End diff --

This function should be called (to try to set `BufferedRow`) before 
`BufferedRow` is checked, and it should be only once. This is the original 
requirement due to the logic. While to add this optimization, I think this is 
the best way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154564327
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

This advance function is only called once actually, so no `bufferedRow` 
will be missed. Or maybe I didn't understand your meaning?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154564488
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
+  if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
--- End diff --

I agree with you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154564066
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -251,6 +252,7 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
 YARN
   case m if m.startsWith("spark") => STANDALONE
   case m if m.startsWith("mesos") => MESOS
+  case m if m.startsWith("k8s") => KUBERNETES
   case m if m.startsWith("local") => LOCAL
   case _ =>
 printErrorAndExit("Master must either be yarn or start with spark, 
mesos, local")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154563993
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -363,6 +363,11 @@ object SQLConf {
 .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
 .createWithDefault("snappy")
 
+  val ORC_ENABLED = buildConf("spark.sql.orc.enabled")
+.doc("When true, use OrcFileFormat in sql/core module instead of the 
one in sql/hive module.")
--- End diff --

Yep. I'll elaborate more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154563902
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala 
---
@@ -54,11 +55,17 @@ class DDLSourceLoadSuite extends DataSourceTest with 
SharedSQLContext {
   .load().schema == StructType(Seq(StructField("stringType", 
StringType, nullable = false
   }
 
-  test("should fail to load ORC without Hive Support") {
-val e = intercept[AnalysisException] {
-  spark.read.format("orc").load()
+  test("should fail to load ORC only if spark.sql.orc.enabled=false and 
without Hive Support") {
--- End diff --

Ur, those tests cover different cases.
- In this test: `true` -> Use new OrcFileFormat, `false` -> Throw Exception 
(the existing behavior)
- In that test: `true` -> Use new OrcFileFormat, `false` -> Use old 
OrcFileFormat (the existing behavior).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154563532
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -568,8 +570,13 @@ object DataSource extends Logging {
 "org.apache.spark.Logging")
 
   /** Given a provider name, look up the data source class definition. */
-  def lookupDataSource(provider: String): Class[_] = {
-val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+  def lookupDataSource(sparkSession: SparkSession, provider: String): 
Class[_] = {
+var provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+if (Seq("orc", 
"org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) 
&&
--- End diff --

I see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154563501
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -568,8 +570,13 @@ object DataSource extends Logging {
 "org.apache.spark.Logging")
 
   /** Given a provider name, look up the data source class definition. */
-  def lookupDataSource(provider: String): Class[_] = {
-val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+  def lookupDataSource(sparkSession: SparkSession, provider: String): 
Class[_] = {
--- End diff --

Yep.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...

2017-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19871#discussion_r154563142
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -363,6 +363,11 @@ object SQLConf {
 .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
 .createWithDefault("snappy")
 
+  val ORC_ENABLED = buildConf("spark.sql.orc.enabled")
--- End diff --

Sure!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19855: [SPARK-22662] [SQL] Failed to prune columns after rewrit...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19855
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19855: [SPARK-22662] [SQL] Failed to prune columns after rewrit...

2017-12-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19855
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84407/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19855: [SPARK-22662] [SQL] Failed to prune columns after rewrit...

2017-12-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19855
  
**[Test build #84407 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84407/testReport)**
 for PR 19855 at commit 
[`f9e9d10`](https://github.com/apache/spark/commit/f9e9d10d4b581379c2af966a35362b77f35a7b6f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154556774
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -159,6 +159,12 @@ public boolean hasNext() {
 @Override
 public UnsafeRow next() {
   try {
+if (!alreadyCalculated) {
+  while (inputIterator.hasNext()) {
+insertRow(inputIterator.next());
+  }
+  alreadyCalculated = true;
--- End diff --

We have cleaned up resources when we have an empty iterator at L144. We 
should still follow it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154558106
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
+  if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
--- End diff --

This block can be excluded from this else block. It can be at the original 
position. We don't need to advance buffer rows too if this condition is hit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154560155
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

To advance buffer iterator here, won't we miss the `bufferedRow` advanced 
before?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17770: [SPARK-20392][SQL] Set barrier to prevent re-entering a ...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/17770
  
yea, a new PR sounds good, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154560524
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner(
   private[this] val bufferedMatches =
 new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
 
-  // Initialization (note: do _not_ want to advance streamed here).
-  advancedBufferedToRowWithNullFreeJoinKey()
+  // Initialization (note: do _not_ want to advance streamed here). This 
is made lazy to prevent
+  // unnecessary trigger of calculation.
+  private lazy val advancedBufferedIterRes = 
advancedBufferedToRowWithNullFreeJoinKey()
--- End diff --

Isn't it the same to simply call `advancedBufferedToRowWithNullFreeJoinKey` 
at needed places?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154560474
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -750,6 +756,8 @@ private[joins] class SortMergeJoinScanner(
   bufferedMatches.clear()
   false
 } else {
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

ditto. We may miss the bufferedRow advanced before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >