[GitHub] spark issue #17359: [SPARK-20028][SQL] Add aggreagate expression nGrams

2017-10-03 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/17359
  
@gatorsmile I will try to take a look again. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19404
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19404
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82433/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18861: [SPARK-19426][SQL] Custom coalescer for Dataset

2017-10-03 Thread maropu
Github user maropu closed the pull request at:

https://github.com/apache/spark/pull/18861


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18861: [SPARK-19426][SQL] Custom coalescer for Dataset

2017-10-03 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/18861
  
ok, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19404
  
**[Test build #82433 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82433/testReport)**
 for PR 19404 at commit 
[`9cd3ee6`](https://github.com/apache/spark/commit/9cd3ee69b9ddd678b77d8b78feec51ea8c55e377).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19083
  
**[Test build #82435 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82435/testReport)**
 for PR 19083 at commit 
[`dfde49b`](https://github.com/apache/spark/commit/dfde49bcc487ecbc0135cd301e8d9c3ad17921be).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142556519
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
--- End diff --

Add a config constant now you're using this in more places?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...

2017-10-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19061#discussion_r142556541
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// In case of shells, spark.ui.showConsoleProgress can be true by 
default or by user.
+if (isShell(args.primaryResource)) {
+  if (!sparkConf.contains("spark.ui.showConsoleProgress")) {
+sysProps("spark.ui.showConsoleProgress") = "true"
+  }
+} else {
+  sysProps("spark.ui.showConsoleProgress") = "false"
--- End diff --

This is not needed right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18861: [SPARK-19426][SQL] Custom coalescer for Dataset

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/18861
  
Yeah, maybe close it first. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142554065
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
--- End diff --

Could you split the whole test case to multiple independent smaller unit 
test cases?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfigurati...

2017-10-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19413


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...

2017-10-03 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/19413
  
(FYI, didn't merge to 2.0.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553845
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.orc.compression.codec' taking effect on hive table 
writing") {
+
+val hadoopConf = spark.sessionState.newHadoopConf()
+
+val partitionStr = "p=1"
+
+case class TableCompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[TableCompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${if (isPartitioned) "PARTITIONED BY (p int)" else ""}
+  |STORED AS $format
+  |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+  |${if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else ""}
+""".stripMargin)
+  }
+
+  def insertOverwriteTable(): Unit = {
+sql(
+  s"""
+  |INSERT OVERWRITE TABLE $tableName
+  |${if (isPartitioned) s"partition ($partitionStr)" else ""}
+  |SELECT * from table_source
+""".stripMargin)
+  }
+}
+
+def getTableCompressionCodec(path: String, format: String): String = {
+  val codecs = format match {
+case "parquet" => for {
+  footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+  block <- footer.getParquetMetadata.getBlocks.asScala
+  column <- block.getColumns.asScala
+} yield column.getCodec.name()
+case "orc" => new File(path).listFiles().filter{ file =>
+  file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+}.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+}.toSeq
+  }
+
+  assert(codecs.distinct.length == 1)
+  codecs.head
+}
+
+def checkCompressionCodecForTable(format: String, isPartitioned: 
Boolean,
+  compressionConf: Option[TableCompressionConf])(assertion: String => 
Unit): Unit = {
+  val table = TableDefine(s"tbl_$format${isPartitioned}",
+isPartitioned, format, compressionConf)
+  withTempDir { tmpDir =>
+withTable(table.tableName) {
+  table.createTable(tmpDir)
+  table.insertOverwriteTable()
+  val partition = if (table.isPartitioned) partitionStr else ""
+  val path = 
s"${tmpDir.getPath.stripSuffix("/")}/${table.tableName}/$partition"
+  assertion(getTableCompressionCodec(path, table.format))
+}
+  }
+}
+
+def getConvertMetastoreConfName(format: String): String = format match 
{
+  case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+  case "orc" => "spark.sql.hive.convertMetastoreOrc"
+}
+
+def getSparkCompressionConfName(format: String): String = format match 
{
+  case "parquet" => "spark.sql.parquet.compression.codec"
+  case "orc" => "spark.sql.orc.compression.codec"
+}
+
+def checkTableCompressionCodecForCodecs(format: String, isPartitioned: 
Boolean,
+  convertMetastore: Boolean, compressionCodecs: List[String],
+  tableCompressionConf: List[TableCompressionConf])
--- End diff --

Could you update the indents for all of them in this PR? See the link: 
https://github.com/databricks/scala-style-guide#indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...

2017-10-03 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/19413
  
LGTM. Merging to master and back to 2.0 unless I hit a conflict.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553648
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.orc.compression.codec' taking effect on hive table 
writing") {
+
+val hadoopConf = spark.sessionState.newHadoopConf()
+
+val partitionStr = "p=1"
+
+case class TableCompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
--- End diff --

Use a function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553517
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.orc.compression.codec' taking effect on hive table 
writing") {
+
+val hadoopConf = spark.sessionState.newHadoopConf()
+
+val partitionStr = "p=1"
+
+case class TableCompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[TableCompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${if (isPartitioned) "PARTITIONED BY (p int)" else ""}
--- End diff --

Please do not embed it. Just create a parameter above this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553387
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.orcCompressionCodec) match {
+  case "UNCOMPRESSED" => "NONE"
--- End diff --

Why ORC and Parquet are different?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19422
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19422
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82432/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553277
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.orcCompressionCodec) match {
+  case "UNCOMPRESSED" => "NONE"
+  case _@x => x
--- End diff --

`case x => x`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19422
  
**[Test build #82432 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82432/testReport)**
 for PR 19422 at commit 
[`b66f5bb`](https://github.com/apache/spark/commit/b66f5bb43622ee85bc0b534e0cfaa6d52179d0e7).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553192
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.parquetCompressionCodec) match {
--- End diff --

```Scala
 val compressionCodec = getCompressionByPriority(
fileSinkConf, 
compressionConf = "parquet.compression",
default = sparkSession.sessionState.conf.parquetCompressionCodec) match 
{
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142552754
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
--- End diff --

Is it case sensitive? Should we convert it to lower case and upper case for 
string comparison?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142552658
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -86,6 +106,14 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   options = Map.empty)
   }
 
+  private def getCompressionByPriority(fileSinkConf: FileSinkDesc,
+compressionConf: String, default: String): String = {
--- End diff --

Could you add the description to explain the priority sequences?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142552413
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -86,6 +106,14 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   options = Map.empty)
   }
 
+  private def getCompressionByPriority(fileSinkConf: FileSinkDesc,
+compressionConf: String, default: String): String = {
--- End diff --

```Scala
private def getCompressionByPriority(
fileSinkConf: FileSinkDesc,
compressionConf: String,
default: String): String = {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142547222
  
--- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js 
---
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ajaxStop($.unblockUI);
+$(document).ajaxStart(function () {
+$.blockUI({message: 'Loading Tasks Page...'});
+});
+
+$.extend( $.fn.dataTable.ext.type.order, {
+"file-size-pre": ConvertDurationString,
+
+"file-size-asc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? -1 : ((a > b) ? 1 : 0));
+},
+
+"file-size-desc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? 1 : ((a > b) ? -1 : 0));
+}
+} );
+
+function createTemplateURI(appId) {
+var words = document.baseURI.split('/');
+var ind = words.indexOf("proxy");
+if (ind > 0) {
+var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + 
'/static/stagespage-template.html';
+return baseURI;
+}
+ind = words.indexOf("history");
+if(ind > 0) {
+var baseURI = words.slice(0, ind).join('/') + 
'/static/stagespage-template.html';
+return baseURI;
+}
+return location.origin + "/static/stagespage-template.html";
+}
+
+// This function will only parse the URL under certain formate
+// e.g. 
https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0
+function StageEndPoint(appId) {
+var words = document.baseURI.split('/');
+var words2 = document.baseURI.split('?');
+var ind = words.indexOf("proxy");
+if (ind > 0) {
+var appId = words[ind + 1];
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+var newBaseURI = words.slice(0, ind + 2).join('/');
+return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + 
stageId;
+}
+ind = words.indexOf("history");
+if (ind > 0) {
+var appId = words[ind + 1];
+var attemptId = words[ind + 2];
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+var newBaseURI = words.slice(0, ind).join('/');
+if (isNaN(attemptId) || attemptId == "0") {
+return newBaseURI + "/api/v1/applications/" + appId + 
"/stages/" + stageId;
+} else {
+return newBaseURI + "/api/v1/applications/" + appId + "/" + 
attemptId + "/stages/" + stageId;
+}
+}
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+return location.origin + "/api/v1/applications/" + appId + "/stages/" 
+ stageId;
+}
+
+function sortNumber(a,b) {
+return a - b;
+}
+
+function quantile(array, percentile) {
+index = percentile/100. * (array.length-1);
+if (Math.floor(index) == index) {
+   result = array[index];
+} else {
+var i = Math.floor(index);
+fraction = index - i;
+result = array[i];
+}
+return result;
+}
+
+$(document).ready(function () {
+$.extend($.fn.dataTable.defaults, {
--- End diff --

this might also be worth moving to `utils.js` since we set the same 
defaults on all our pages


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142546728
  
--- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js 
---
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ajaxStop($.unblockUI);
+$(document).ajaxStart(function () {
+$.blockUI({message: 'Loading Tasks Page...'});
+});
+
+$.extend( $.fn.dataTable.ext.type.order, {
+"file-size-pre": ConvertDurationString,
+
+"file-size-asc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? -1 : ((a > b) ? 1 : 0));
+},
+
+"file-size-desc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? 1 : ((a > b) ? -1 : 0));
+}
+} );
+
+function createTemplateURI(appId) {
+var words = document.baseURI.split('/');
+var ind = words.indexOf("proxy");
+if (ind > 0) {
+var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + 
'/static/stagespage-template.html';
+return baseURI;
+}
+ind = words.indexOf("history");
+if(ind > 0) {
+var baseURI = words.slice(0, ind).join('/') + 
'/static/stagespage-template.html';
+return baseURI;
+}
+return location.origin + "/static/stagespage-template.html";
+}
+
+// This function will only parse the URL under certain formate
+// e.g. 
https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0
+function StageEndPoint(appId) {
+var words = document.baseURI.split('/');
+var words2 = document.baseURI.split('?');
+var ind = words.indexOf("proxy");
+if (ind > 0) {
+var appId = words[ind + 1];
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+var newBaseURI = words.slice(0, ind + 2).join('/');
+return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + 
stageId;
+}
+ind = words.indexOf("history");
+if (ind > 0) {
+var appId = words[ind + 1];
+var attemptId = words[ind + 2];
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+var newBaseURI = words.slice(0, ind).join('/');
+if (isNaN(attemptId) || attemptId == "0") {
+return newBaseURI + "/api/v1/applications/" + appId + 
"/stages/" + stageId;
+} else {
+return newBaseURI + "/api/v1/applications/" + appId + "/" + 
attemptId + "/stages/" + stageId;
+}
+}
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
--- End diff --

iiuc this doesn't account for anchors in the url (`#tasks-section`), I'd 
look into some url parsing js functions libs, I've seen some good ones before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142539880
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala ---
@@ -138,21 +155,61 @@ private[v1] object AllStagesResource {
 }
   }
 
-  def convertTaskData(uiData: TaskUIData, lastUpdateTime: Option[Long]): 
TaskData = {
+  private def getGettingResultTime(info: TaskInfo, currentTime: Long): 
Long = {
--- End diff --

`currentTime` is never used


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142546939
  
--- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js 
---
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ajaxStop($.unblockUI);
+$(document).ajaxStart(function () {
+$.blockUI({message: 'Loading Tasks Page...'});
+});
+
+$.extend( $.fn.dataTable.ext.type.order, {
+"file-size-pre": ConvertDurationString,
+
+"file-size-asc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? -1 : ((a > b) ? 1 : 0));
+},
+
+"file-size-desc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? 1 : ((a > b) ? -1 : 0));
+}
+} );
+
+function createTemplateURI(appId) {
+var words = document.baseURI.split('/');
+var ind = words.indexOf("proxy");
+if (ind > 0) {
+var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + 
'/static/stagespage-template.html';
+return baseURI;
+}
+ind = words.indexOf("history");
+if(ind > 0) {
+var baseURI = words.slice(0, ind).join('/') + 
'/static/stagespage-template.html';
+return baseURI;
+}
+return location.origin + "/static/stagespage-template.html";
+}
+
+// This function will only parse the URL under certain formate
+// e.g. 
https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0
+function StageEndPoint(appId) {
+var words = document.baseURI.split('/');
+var words2 = document.baseURI.split('?');
+var ind = words.indexOf("proxy");
+if (ind > 0) {
+var appId = words[ind + 1];
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+var newBaseURI = words.slice(0, ind + 2).join('/');
+return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + 
stageId;
+}
+ind = words.indexOf("history");
+if (ind > 0) {
+var appId = words[ind + 1];
+var attemptId = words[ind + 2];
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+var newBaseURI = words.slice(0, ind).join('/');
+if (isNaN(attemptId) || attemptId == "0") {
+return newBaseURI + "/api/v1/applications/" + appId + 
"/stages/" + stageId;
+} else {
+return newBaseURI + "/api/v1/applications/" + appId + "/" + 
attemptId + "/stages/" + stageId;
+}
+}
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
--- End diff --

Also I'm not quite sure what the `3`s are doing here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142545431
  
--- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js 
---
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ajaxStop($.unblockUI);
+$(document).ajaxStart(function () {
+$.blockUI({message: 'Loading Tasks Page...'});
+});
+
+$.extend( $.fn.dataTable.ext.type.order, {
+"file-size-pre": ConvertDurationString,
+
+"file-size-asc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? -1 : ((a > b) ? 1 : 0));
+},
+
+"file-size-desc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? 1 : ((a > b) ? -1 : 0));
+}
+} );
+
+function createTemplateURI(appId) {
+var words = document.baseURI.split('/');
+var ind = words.indexOf("proxy");
+if (ind > 0) {
+var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + 
'/static/stagespage-template.html';
+return baseURI;
+}
+ind = words.indexOf("history");
+if(ind > 0) {
+var baseURI = words.slice(0, ind).join('/') + 
'/static/stagespage-template.html';
+return baseURI;
+}
+return location.origin + "/static/stagespage-template.html";
+}
+
+// This function will only parse the URL under certain formate
+// e.g. 
https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0
+function StageEndPoint(appId) {
--- End diff --

style: `stageEndPoint`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142547431
  
--- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js 
---
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ajaxStop($.unblockUI);
+$(document).ajaxStart(function () {
+$.blockUI({message: 'Loading Tasks Page...'});
+});
+
+$.extend( $.fn.dataTable.ext.type.order, {
+"file-size-pre": ConvertDurationString,
+
+"file-size-asc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? -1 : ((a > b) ? 1 : 0));
+},
+
+"file-size-desc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? 1 : ((a > b) ? -1 : 0));
+}
+} );
+
+function createTemplateURI(appId) {
+var words = document.baseURI.split('/');
+var ind = words.indexOf("proxy");
+if (ind > 0) {
+var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + 
'/static/stagespage-template.html';
+return baseURI;
+}
+ind = words.indexOf("history");
+if(ind > 0) {
+var baseURI = words.slice(0, ind).join('/') + 
'/static/stagespage-template.html';
+return baseURI;
+}
+return location.origin + "/static/stagespage-template.html";
+}
+
+// This function will only parse the URL under certain formate
+// e.g. 
https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0
+function StageEndPoint(appId) {
+var words = document.baseURI.split('/');
+var words2 = document.baseURI.split('?');
+var ind = words.indexOf("proxy");
+if (ind > 0) {
+var appId = words[ind + 1];
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+var newBaseURI = words.slice(0, ind + 2).join('/');
+return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + 
stageId;
+}
+ind = words.indexOf("history");
+if (ind > 0) {
+var appId = words[ind + 1];
+var attemptId = words[ind + 2];
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+var newBaseURI = words.slice(0, ind).join('/');
+if (isNaN(attemptId) || attemptId == "0") {
+return newBaseURI + "/api/v1/applications/" + appId + 
"/stages/" + stageId;
+} else {
+return newBaseURI + "/api/v1/applications/" + appId + "/" + 
attemptId + "/stages/" + stageId;
+}
+}
+var stageIdLen = words2[1].indexOf('&');
+var stageId = words2[1].substr(3, stageIdLen - 3);
+return location.origin + "/api/v1/applications/" + appId + "/stages/" 
+ stageId;
+}
+
+function sortNumber(a,b) {
+return a - b;
+}
+
+function quantile(array, percentile) {
+index = percentile/100. * (array.length-1);
+if (Math.floor(index) == index) {
+   result = array[index];
+} else {
+var i = Math.floor(index);
+fraction = index - i;
+result = array[i];
+}
+return result;
+}
+
+$(document).ready(function () {
+$.extend($.fn.dataTable.defaults, {
+stateSave: true,
+lengthMenu: [[20, 40, 60, 100, -1], [20, 40, 60, 100, "All"]],
+pageLength: 20
+});
+
+$("#showAdditionalMetrics").append(
--- End diff --

If this is just adding static html we can probably leave it in 
`StagePage.scala`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org


[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142543852
  
--- Diff: core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala ---
@@ -346,7 +346,7 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
 
   for {
 stageId <- 0 to 1
-attemptId <- 0 to 1
+attemptId <- 1 to 0
--- End diff --

Why is this changed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142540750
  
--- Diff: core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala 
---
@@ -67,14 +68,13 @@ class ExecutorsListener(storageStatusListener: 
StorageStatusListener, conf: Spar
 extends SparkListener {
   val executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]()
   var executorEvents = new ListBuffer[SparkListenerEvent]()
+  val executorIdToAddress = mutable.HashMap[String, String]()
 
   private val maxTimelineExecutors = 
conf.getInt("spark.ui.timeline.executors.maximum", 1000)
   private val retainedDeadExecutors = 
conf.getInt("spark.ui.retainedDeadExecutors", 100)
 
   def activeStorageStatusList: Seq[StorageStatus] = 
storageStatusListener.storageStatusList
-
   def deadStorageStatusList: Seq[StorageStatus] = 
storageStatusListener.deadStorageStatusList
-
--- End diff --

Why remove these lines? They don't seem to be an issue


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142544701
  
--- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js 
---
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ajaxStop($.unblockUI);
+$(document).ajaxStart(function () {
+$.blockUI({message: 'Loading Tasks Page...'});
--- End diff --

This is more commonly referred to as the Stage Page than the Tasks Page


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142543153
  
--- Diff: core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala 
---
@@ -170,6 +170,17 @@ class ExecutorsListener(storageStatusListener: 
StorageStatusListener, conf: Spar
 execTaskSummary.isBlacklisted = isBlacklisted
   }
 
+  def getExecutorHost(eid: String): String = {
--- End diff --

Is this how `getExecutorHost` works in other classes? How did you decide to 
implement it this way?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...

2017-10-03 Thread ajbozarth
Github user ajbozarth commented on a diff in the pull request:

https://github.com/apache/spark/pull/19270#discussion_r142545284
  
--- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js 
---
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ajaxStop($.unblockUI);
+$(document).ajaxStart(function () {
+$.blockUI({message: 'Loading Tasks Page...'});
+});
+
+$.extend( $.fn.dataTable.ext.type.order, {
+"file-size-pre": ConvertDurationString,
+
+"file-size-asc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? -1 : ((a > b) ? 1 : 0));
+},
+
+"file-size-desc": function ( a, b ) {
+a = ConvertDurationString( a );
+b = ConvertDurationString( b );
+return ((a < b) ? 1 : ((a > b) ? -1 : 0));
+}
+} );
+
+function createTemplateURI(appId) {
--- End diff --

This think this function can be abstracted out (with the template file name 
added as a param) and moved to utils.js


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82431/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82431 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82431/testReport)**
 for PR 18732 at commit 
[`1ea2b71`](https://github.com/apache/spark/commit/1ea2b71801784842d1797863456a07c7fcfc2531).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19404
  
**[Test build #82434 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82434/testReport)**
 for PR 19404 at commit 
[`f945f39`](https://github.com/apache/spark/commit/f945f39438c6a1cabff3f18489dd2082c4cba24c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18098: [SPARK-16944][Mesos] Improve data locality when launchin...

2017-10-03 Thread skonto
Github user skonto commented on the issue:

https://github.com/apache/spark/pull/18098
  
@ArtRand +1 for the pluggable interface... in general we are planning to 
optimize resource allocation and locality in another PR, (fenzo form Netflix is 
something we would like to test which is already used by others like Flink). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19413
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82429/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19413
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19413
  
**[Test build #82429 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82429/testReport)**
 for PR 19413 at commit 
[`079a4e2`](https://github.com/apache/spark/commit/079a4e26ec900745411fdd60e4cd242b9a43320b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix

2017-10-03 Thread rekhajoshm
Github user rekhajoshm commented on the issue:

https://github.com/apache/spark/pull/19422
  
@srowen got it, just the pull you mean, didn't think of it then. thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19404
  
**[Test build #82433 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82433/testReport)**
 for PR 19404 at commit 
[`9cd3ee6`](https://github.com/apache/spark/commit/9cd3ee69b9ddd678b77d8b78feec51ea8c55e377).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...

2017-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r142536858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec(
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  override def output: Seq[Attribute] = joinType match {
+case _: InnerLike => left.output ++ right.output
+case LeftOuter => left.output ++ 
right.output.map(_.withNullability(true))
+case RightOuter => left.output.map(_.withNullability(true)) ++ 
right.output
+case _ =>
+  throwBadJoinTypeException()
+  Seq()
+  }
 
   override def outputPartitioning: Partitioning = joinType match {
 case _: InnerLike =>
   PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
+case LeftOuter => PartitioningCollection(Seq(right.outputPartitioning))
--- End diff --

Resolved offline. LeftOuterJoin should produce left.outputPartitioning


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19327
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19327
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82428/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...

2017-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r142533513
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala 
---
@@ -425,6 +426,10 @@ class StreamingJoinSuite extends StreamTest with 
StateStoreMetricsTest with Befo
 
 // Test static comparisons
 assert(watermarkFrom("cast(leftTime AS LONG) > 10") === Some(1))
+
+// Test non-positive results
+assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) 
- 10") === Some(0))
+assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) 
- 100") === Some(-9))
--- End diff --

It does not build a physical plan, it simply uses the SparkSQL 
QueryExecution to pass the expression through the analyzer and optimizer, to 
get the conditions in a similar form as a real query would. Instead we can 
simply using SimpleAnalyzer and SimpleTestOptimizer
```
val analyzedPlan = SimpleAnalyzer.execute(plan)
val optimizedPlan = SimpleTestOptimizer.execute(analyzedPlan)
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19327
  
**[Test build #82428 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82428/testReport)**
 for PR 19327 at commit 
[`b8ba1e2`](https://github.com/apache/spark/commit/b8ba1e2c34748c47d843c147c595bf92df4c41c7).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142533141
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2058,7 +2058,7 @@ def __init__(self, func, returnType, name=None, 
vectorized=False):
 self._name = name or (
 func.__name__ if hasattr(func, '__name__')
 else func.__class__.__name__)
-self._vectorized = vectorized
+self.vectorized = vectorized
--- End diff --

I kind of dislike the inconsistency between `UserDefinedFunction` and its 
wrapped function. I think they are just the same thing except for the wrapped 
function has doc string. For ease of mind, I think we should make them either 
both private or public. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix

2017-10-03 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19422
  
OK, don't bother with a JIRA for items like this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18098: [SPARK-16944][Mesos] Improve data locality when launchin...

2017-10-03 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/18098
  
Hello @gpang, after thinking about this a lot I'm glad that you ended up 
merging this. However, I think it’s worth considering the implications of 
changing the offer evaluation logic in the driver. My main concern with the 
method you’ve proposed is in a cluster with many frameworks and concurrent 
Spark jobs (potentially with heterogeneous locality wait times) this solution 
may not be effective. Further I believe that your algorithm doesn’t account 
for adding an executor on an agent that already contains an executor 
(https://github.com/apache/spark/pull/18098/files#diff-387c5d0c916278495fc28420571adf9eR534),
 which may be what you want in some situations (because there is no other way 
to increase the cores for an executor already placed on an agent). I realize 
this is an edge case, and that that the old behavior (essentially random 
placement) wouldn’t afford better performance. 

With this in mind I’d like to propose that we make offer evaluation in 
Spark on Mesos a pluggable interface. For example, right now there is no way to 
easily spread executors over agents, pack executors on agents, or other 
context-specific behaviors that a user may want. I think one of Spark’s 
strengths is its tunability, and we should expose this to users who wish to use 
it. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...

2017-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r142530623
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala 
---
@@ -470,3 +475,222 @@ class StreamingJoinSuite extends StreamTest with 
StateStoreMetricsTest with Befo
 }
   }
 }
+
+class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with BeforeAndAfter {
+
+  import testImplicits._
+  import org.apache.spark.sql.functions._
+
+  before {
+SparkSession.setActiveSession(spark) // set this before force 
initializing 'joinExec'
+spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+  }
+
+  after {
+StateStore.stop()
+  }
+
+  private def setupStream(prefix: String, multiplier: Int): 
(MemoryStream[Int], DataFrame) = {
+val input = MemoryStream[Int]
+val df = input.toDF
+  .select(
+'value as "key",
+'value.cast("timestamp") as s"${prefix}Time",
+('value * multiplier) as s"${prefix}Value")
+  .withWatermark(s"${prefix}Time", "10 seconds")
+
+return (input, df)
+  }
+
+  private def setupWindowedJoin(joinType: String):
+  (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+val (input1, df1) = setupStream("left", 2)
+val (input2, df2) = setupStream("right", 3)
+val windowed1 = df1.select('key, window('leftTime, "10 second"), 
'leftValue)
+val windowed2 = df2.select('key, window('rightTime, "10 second"), 
'rightValue)
+val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+  .select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+
+(input1, input2, joined)
+  }
+
+  test("windowed left outer join") {
+val (leftInput, rightInput, joined) = setupWindowedJoin("left_outer")
+
+testStream(joined)(
+  // Test inner part of the join.
+  AddData(leftInput, 1, 2, 3, 4, 5),
+  AddData(rightInput, 3, 4, 5, 6, 7),
+  CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+  // Old state doesn't get dropped until the batch *after* it gets 
introduced, so the
+  // nulls won't show up until the next batch after the watermark 
advances.
+  AddData(leftInput, 21),
+  AddData(rightInput, 22),
+  CheckLastBatch(),
+  assertNumStateRows(total = 12, updated = 2),
+  AddData(leftInput, 22),
+  CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 
4, null)),
+  assertNumStateRows(total = 3, updated = 1)
+)
+  }
+
+  test("windowed right outer join") {
+val (leftInput, rightInput, joined) = setupWindowedJoin("right_outer")
+
+testStream(joined)(
+  // Test inner part of the join.
+  AddData(leftInput, 1, 2, 3, 4, 5),
+  AddData(rightInput, 3, 4, 5, 6, 7),
+  CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+  // Old state doesn't get dropped until the batch *after* it gets 
introduced, so the
+  // nulls won't show up until the next batch after the watermark 
advances.
+  AddData(leftInput, 21),
+  AddData(rightInput, 22),
+  CheckLastBatch(),
+  assertNumStateRows(total = 12, updated = 2),
+  AddData(leftInput, 22),
+  CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, 
null, 21)),
+  assertNumStateRows(total = 3, updated = 1)
+)
+  }
+
+  Seq(
+("left_outer", Row(3, null, 5, null)),
+("right_outer", Row(null, 2, null, 5))
+  ).foreach { case (joinType: String, outerResult) =>
+test(s"${joinType.replaceAllLiterally("_", " ")} with watermark range 
condition") {
+  import org.apache.spark.sql.functions._
+
+  val leftInput = MemoryStream[(Int, Int)]
+  val rightInput = MemoryStream[(Int, Int)]
+
+  val df1 = leftInput.toDF.toDF("leftKey", "time")
+.select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey 
* 2) as "leftValue")
+.withWatermark("leftTime", "10 seconds")
+
+  val df2 = rightInput.toDF.toDF("rightKey", "time")
+.select('rightKey, 'time.cast("timestamp") as "rightTime", 
('rightKey * 3) as "rightValue")
+.withWatermark("rightTime", "10 seconds")
+
+  val joined =
+df1.join(
+  df2,
+  expr("leftKey = rightKey AND " +
+"leftTime BETWEEN rightTime - interval 5 seconds AND rightTime 
+ interval 5 seconds"),
+  joinType)
+  .select('leftKey, 'rightKey, 'leftTime.cast("int"), 
'rightTime.cast("int"))
+  testStream(joined)(
+AddData(leftInput, (1, 5), 

[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...

2017-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r142530435
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala 
---
@@ -470,3 +475,222 @@ class StreamingJoinSuite extends StreamTest with 
StateStoreMetricsTest with Befo
 }
   }
 }
+
+class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with BeforeAndAfter {
+
+  import testImplicits._
+  import org.apache.spark.sql.functions._
+
+  before {
+SparkSession.setActiveSession(spark) // set this before force 
initializing 'joinExec'
+spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+  }
+
+  after {
+StateStore.stop()
+  }
+
+  private def setupStream(prefix: String, multiplier: Int): 
(MemoryStream[Int], DataFrame) = {
+val input = MemoryStream[Int]
+val df = input.toDF
+  .select(
+'value as "key",
+'value.cast("timestamp") as s"${prefix}Time",
+('value * multiplier) as s"${prefix}Value")
+  .withWatermark(s"${prefix}Time", "10 seconds")
+
+return (input, df)
+  }
+
+  private def setupWindowedJoin(joinType: String):
+  (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+val (input1, df1) = setupStream("left", 2)
+val (input2, df2) = setupStream("right", 3)
+val windowed1 = df1.select('key, window('leftTime, "10 second"), 
'leftValue)
+val windowed2 = df2.select('key, window('rightTime, "10 second"), 
'rightValue)
+val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+  .select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+
+(input1, input2, joined)
+  }
+
+  test("windowed left outer join") {
+val (leftInput, rightInput, joined) = setupWindowedJoin("left_outer")
+
+testStream(joined)(
+  // Test inner part of the join.
+  AddData(leftInput, 1, 2, 3, 4, 5),
+  AddData(rightInput, 3, 4, 5, 6, 7),
+  CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+  // Old state doesn't get dropped until the batch *after* it gets 
introduced, so the
+  // nulls won't show up until the next batch after the watermark 
advances.
+  AddData(leftInput, 21),
+  AddData(rightInput, 22),
+  CheckLastBatch(),
+  assertNumStateRows(total = 12, updated = 2),
+  AddData(leftInput, 22),
+  CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 
4, null)),
+  assertNumStateRows(total = 3, updated = 1)
+)
+  }
+
+  test("windowed right outer join") {
+val (leftInput, rightInput, joined) = setupWindowedJoin("right_outer")
+
+testStream(joined)(
+  // Test inner part of the join.
+  AddData(leftInput, 1, 2, 3, 4, 5),
+  AddData(rightInput, 3, 4, 5, 6, 7),
+  CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+  // Old state doesn't get dropped until the batch *after* it gets 
introduced, so the
+  // nulls won't show up until the next batch after the watermark 
advances.
+  AddData(leftInput, 21),
+  AddData(rightInput, 22),
+  CheckLastBatch(),
+  assertNumStateRows(total = 12, updated = 2),
+  AddData(leftInput, 22),
+  CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, 
null, 21)),
+  assertNumStateRows(total = 3, updated = 1)
+)
+  }
+
+  Seq(
+("left_outer", Row(3, null, 5, null)),
+("right_outer", Row(null, 2, null, 5))
+  ).foreach { case (joinType: String, outerResult) =>
+test(s"${joinType.replaceAllLiterally("_", " ")} with watermark range 
condition") {
+  import org.apache.spark.sql.functions._
+
+  val leftInput = MemoryStream[(Int, Int)]
+  val rightInput = MemoryStream[(Int, Int)]
+
+  val df1 = leftInput.toDF.toDF("leftKey", "time")
+.select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey 
* 2) as "leftValue")
+.withWatermark("leftTime", "10 seconds")
+
+  val df2 = rightInput.toDF.toDF("rightKey", "time")
+.select('rightKey, 'time.cast("timestamp") as "rightTime", 
('rightKey * 3) as "rightValue")
+.withWatermark("rightTime", "10 seconds")
+
+  val joined =
+df1.join(
+  df2,
+  expr("leftKey = rightKey AND " +
+"leftTime BETWEEN rightTime - interval 5 seconds AND rightTime 
+ interval 5 seconds"),
+  joinType)
+  .select('leftKey, 'rightKey, 'leftTime.cast("int"), 
'rightTime.cast("int"))
+  testStream(joined)(
+AddData(leftInput, (1, 5), 

[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...

2017-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r142530086
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala 
---
@@ -470,3 +475,222 @@ class StreamingJoinSuite extends StreamTest with 
StateStoreMetricsTest with Befo
 }
   }
 }
+
+class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with BeforeAndAfter {
+
+  import testImplicits._
+  import org.apache.spark.sql.functions._
+
+  before {
+SparkSession.setActiveSession(spark) // set this before force 
initializing 'joinExec'
+spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+  }
+
+  after {
+StateStore.stop()
+  }
+
+  private def setupStream(prefix: String, multiplier: Int): 
(MemoryStream[Int], DataFrame) = {
+val input = MemoryStream[Int]
+val df = input.toDF
+  .select(
+'value as "key",
+'value.cast("timestamp") as s"${prefix}Time",
+('value * multiplier) as s"${prefix}Value")
+  .withWatermark(s"${prefix}Time", "10 seconds")
+
+return (input, df)
+  }
+
+  private def setupWindowedJoin(joinType: String):
+  (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+val (input1, df1) = setupStream("left", 2)
+val (input2, df2) = setupStream("right", 3)
+val windowed1 = df1.select('key, window('leftTime, "10 second"), 
'leftValue)
+val windowed2 = df2.select('key, window('rightTime, "10 second"), 
'rightValue)
+val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+  .select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+
+(input1, input2, joined)
+  }
+
+  test("windowed left outer join") {
+val (leftInput, rightInput, joined) = setupWindowedJoin("left_outer")
+
+testStream(joined)(
+  // Test inner part of the join.
+  AddData(leftInput, 1, 2, 3, 4, 5),
+  AddData(rightInput, 3, 4, 5, 6, 7),
+  CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+  // Old state doesn't get dropped until the batch *after* it gets 
introduced, so the
+  // nulls won't show up until the next batch after the watermark 
advances.
+  AddData(leftInput, 21),
+  AddData(rightInput, 22),
+  CheckLastBatch(),
+  assertNumStateRows(total = 12, updated = 2),
+  AddData(leftInput, 22),
+  CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 
4, null)),
+  assertNumStateRows(total = 3, updated = 1)
+)
+  }
+
+  test("windowed right outer join") {
+val (leftInput, rightInput, joined) = setupWindowedJoin("right_outer")
+
+testStream(joined)(
+  // Test inner part of the join.
+  AddData(leftInput, 1, 2, 3, 4, 5),
+  AddData(rightInput, 3, 4, 5, 6, 7),
+  CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+  // Old state doesn't get dropped until the batch *after* it gets 
introduced, so the
+  // nulls won't show up until the next batch after the watermark 
advances.
+  AddData(leftInput, 21),
+  AddData(rightInput, 22),
+  CheckLastBatch(),
+  assertNumStateRows(total = 12, updated = 2),
+  AddData(leftInput, 22),
+  CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, 
null, 21)),
+  assertNumStateRows(total = 3, updated = 1)
+)
+  }
+
+  Seq(
+("left_outer", Row(3, null, 5, null)),
+("right_outer", Row(null, 2, null, 5))
+  ).foreach { case (joinType: String, outerResult) =>
+test(s"${joinType.replaceAllLiterally("_", " ")} with watermark range 
condition") {
+  import org.apache.spark.sql.functions._
+
+  val leftInput = MemoryStream[(Int, Int)]
+  val rightInput = MemoryStream[(Int, Int)]
+
+  val df1 = leftInput.toDF.toDF("leftKey", "time")
+.select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey 
* 2) as "leftValue")
+.withWatermark("leftTime", "10 seconds")
+
+  val df2 = rightInput.toDF.toDF("rightKey", "time")
+.select('rightKey, 'time.cast("timestamp") as "rightTime", 
('rightKey * 3) as "rightValue")
+.withWatermark("rightTime", "10 seconds")
+
+  val joined =
+df1.join(
+  df2,
+  expr("leftKey = rightKey AND " +
+"leftTime BETWEEN rightTime - interval 5 seconds AND rightTime 
+ interval 5 seconds"),
+  joinType)
+  .select('leftKey, 'rightKey, 'leftTime.cast("int"), 
'rightTime.cast("int"))
+  testStream(joined)(
+AddData(leftInput, (1, 5), 

[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142529852
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2058,7 +2058,7 @@ def __init__(self, func, returnType, name=None, 
vectorized=False):
 self._name = name or (
 func.__name__ if hasattr(func, '__name__')
 else func.__class__.__name__)
-self._vectorized = vectorized
+self.vectorized = vectorized
--- End diff --

Oh sorry, I should have been more clear.  This should stay 
`self._vectorized` since it is a private variable to the class, it's only 
`wrapped.vectorized` (which you already changed below), isn't being used as 
private so shouldn't have an underscore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...

2017-10-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19083#discussion_r142527524
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -1020,10 +1006,14 @@ abstract class CodeGenerator[InType <: AnyRef, 
OutType <: AnyRef] extends Loggin
 }
 
 object CodeGenerator extends Logging {
+
+  // This is the value of HugeMethodLimit in the OpenJDK JVM settings
+  val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000
+
   /**
* Compile the Java source code into a Java class, using Janino.
*/
-  def compile(code: CodeAndComment): GeneratedClass = try {
+  def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19422
  
**[Test build #82432 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82432/testReport)**
 for PR 19422 at commit 
[`b66f5bb`](https://github.com/apache/spark/commit/b66f5bb43622ee85bc0b534e0cfaa6d52179d0e7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19422: [SPARK-22193][SQL] Minor typo fix

2017-10-03 Thread rekhajoshm
GitHub user rekhajoshm opened a pull request:

https://github.com/apache/spark/pull/19422

[SPARK-22193][SQL] Minor typo fix

## What changes were proposed in this pull request?

[SPARK-22193][SQL] Minor typo fix

## How was this patch tested?
existing tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rekhajoshm/spark SPARK-22193

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19422


commit e3677c9fa9697e0d34f9df52442085a6a481c9e9
Author: Rekha Joshi 
Date:   2015-05-05T23:10:08Z

Merge pull request #1 from apache/master

Pulling functionality from apache spark

commit 106fd8eee8f6a6f7c67cfc64f57c1161f76d8f75
Author: Rekha Joshi 
Date:   2015-05-08T21:49:09Z

Merge pull request #2 from apache/master

pull latest from apache spark

commit 0be142d6becba7c09c6eba0b8ea1efe83d649e8c
Author: Rekha Joshi 
Date:   2015-06-22T00:08:08Z

Merge pull request #3 from apache/master

Pulling functionality from apache spark

commit 6c6ee12fd733e3f9902e10faf92ccb78211245e3
Author: Rekha Joshi 
Date:   2015-09-17T01:03:09Z

Merge pull request #4 from apache/master

Pulling functionality from apache spark

commit b123c601e459d1ad17511fd91dd304032154882a
Author: Rekha Joshi 
Date:   2015-11-25T18:50:32Z

Merge pull request #5 from apache/master

pull request from apache/master

commit c73c32aadd6066e631956923725a48d98a18777e
Author: Rekha Joshi 
Date:   2016-03-18T19:13:51Z

Merge pull request #6 from apache/master

pull latest from apache spark

commit 7dbf7320057978526635bed09dabc8cf8657a28a
Author: Rekha Joshi 
Date:   2016-04-05T20:26:40Z

Merge pull request #8 from apache/master

pull latest from apache spark

commit 5e9d71827f8e2e4d07027281b80e4e073e7fecd1
Author: Rekha Joshi 
Date:   2017-05-01T23:00:30Z

Merge pull request #9 from apache/master

Pull apache spark

commit 63d99b3ce5f222d7126133170a373591f0ac67dd
Author: Rekha Joshi 
Date:   2017-09-30T22:26:44Z

Merge pull request #10 from apache/master

pull latest apache spark

commit b66f5bb43622ee85bc0b534e0cfaa6d52179d0e7
Author: rjoshi2 
Date:   2017-10-03T21:14:29Z

[SPARK-22193][SQL] Minor typo fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142524349
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +63,22 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+
+val batchIter = if (batchSize > 0) {
+  new BatchIterator(iter, batchSize)
+} else {
+  Iterator(iter)
+}
--- End diff --

This could be done on one line, not sure if others agree with that style 
though


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142523671
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType
  */
 @InterfaceStability.Stable
 class RelationalGroupedDataset protected[sql](
-df: DataFrame,
+val df: DataFrame,
--- End diff --

I don't think this needs to be a `val`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142523354
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

I think we need to think a little more about how do we handle different 
formats of arrow data. 

Currently, the input of arrow serializer is a list of (pd.Series, 
DataType), I feel it's cleaner that this class not deal with type coercion and 
just serialization. It could take a `pyarrow.Table` for instance and let caller 
construct the `pyarrow.Table`.

Another thing to think about is whatever the data we are passing are not 
purely `pd.Series` and `pd.DataFrame`. What if, for instance, we want to 
serialize a (pd.Series, pd.DataFrame) tuple or a tuple of (scalar value, 
pd.DataFrame). Maybe somehow making the serializer composable is more 
flexiable. i.e. a class knows how to serialize `pd.Series`, a class knows how 
to serialize `pd.DataFrame` and if we want to serialize (pd.Series, 
pd.DataFrame) tuple we can compose them.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142522405
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +75,37 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+import pyarrow as pa
+
+arrow_return_types = list(to_arrow_type(field.dataType) for field 
in return_type)
--- End diff --

I prefer list comprehensions, but it's up to you


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142521502
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +75,37 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+import pyarrow as pa
--- End diff --

import not used


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r14252
  
--- Diff: python/pyspark/worker.py ---
@@ -32,8 +32,9 @@
 from pyspark.serializers import write_with_length, write_int, read_long, \
 write_long, read_int, SpecialLengths, PythonEvalType, 
UTF8Deserializer, PickleSerializer, \
 BatchedSerializer, ArrowStreamPandasSerializer
-from pyspark.sql.types import toArrowType
+from pyspark.sql.types import to_arrow_type
 from pyspark import shuffle
+from pyspark.sql.types import StructType
--- End diff --

combine this with `to_arrow_type` import


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142520328
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import pandas_udf, array, explode, col, 
lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
+
+foo_udf = pandas_udf(
+foo,
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v=df.v + 1)
+return ret
+
+@pandas_udf(StructType([StructField('id', LongType()), 
StructField('v', DoubleType())]))
+def foo(df):
--- End diff --

isn't this a redeclaration of `foo`?  maybe some of these could be lamdas?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142519825
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import pandas_udf, array, explode, col, 
lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
+
+foo_udf = pandas_udf(
+foo,
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
--- End diff --

same simplification as above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142519186
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3377,133 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import pandas_udf, array, explode, col, 
lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
--- End diff --

could you simplify to
```
return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142518730
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,28 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private object BatchIterator {
+  class InnerIterator[T](iter: Iterator[T], batchSize: Int) extends 
Iterator[T] {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82431 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82431/testReport)**
 for PR 18732 at commit 
[`1ea2b71`](https://github.com/apache/spark/commit/1ea2b71801784842d1797863456a07c7fcfc2531).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142517310
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

What do you all think about making the `ArrowPandasSerializer` also able to 
serialize pandas.DataFrames?  Then it wouldn't require this extra wrapping and 
I think it could be useful for other things in the future as well.  
@HyukjinKwon @ueshin @viirya ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparision should respect case-...

2017-10-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/18460
  
Hi, @gatorsmile .
Could you review this `case-(in)sensitive type comparison` PR, too?
Unfortunately, this PR doesn't get any feedback for recent three months.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19041
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19041
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82427/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19041
  
**[Test build #82427 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82427/testReport)**
 for PR 19041 at commit 
[`95d5eb5`](https://github.com/apache/spark/commit/95d5eb5c792f51903db139972129d24710e2738c).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142514594
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,28 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private object BatchIterator {
+  class InnerIterator[T](iter: Iterator[T], batchSize: Int) extends 
Iterator[T] {
--- End diff --

could you just make `InnerIterator` a nested class in `BatchIterator` 
instead of here in the companion object?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82430 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82430/testReport)**
 for PR 18732 at commit 
[`d37a9e6`](https://github.com/apache/spark/commit/d37a9e6a19e3f2b5bef796ba20cdb5bc46817f62).
 * This patch **fails Python style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18732
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82430/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82430 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82430/testReport)**
 for PR 18732 at commit 
[`d37a9e6`](https://github.com/apache/spark/commit/d37a9e6a19e3f2b5bef796ba20cdb5bc46817f62).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18704
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...

2017-10-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18704
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82426/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18704
  
**[Test build #82426 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82426/testReport)**
 for PR 18704 at commit 
[`c16230d`](https://github.com/apache/spark/commit/c16230d34472e0337b87ce858289fec9a1d88ab4).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PassThroughSuite extends SparkFunSuite `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19083
  
@maropu Thanks for working on it. LGTM except two minor comments. 

cc @rednaxelafx @kiszk @viirya @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19083#discussion_r142503474
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -32,7 +32,7 @@ import org.codehaus.commons.compiler.CompileException
 import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, 
JaninoRuntimeException, SimpleCompiler}
 import org.codehaus.janino.util.ClassFile
 
-import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException}
--- End diff --

revert it back?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19083#discussion_r142503203
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -1020,10 +1006,14 @@ abstract class CodeGenerator[InType <: AnyRef, 
OutType <: AnyRef] extends Loggin
 }
 
 object CodeGenerator extends Logging {
+
+  // This is the value of HugeMethodLimit in the OpenJDK JVM settings
+  val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000
+
   /**
* Compile the Java source code into a Java class, using Janino.
*/
-  def compile(code: CodeAndComment): GeneratedClass = try {
+  def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
--- End diff --

Please add `@return` to explain what are returned.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142501134
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2181,31 +2186,69 @@ def udf(f=None, returnType=StringType()):
 @since(2.3)
 def pandas_udf(f=None, returnType=StringType()):
 """
-Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
-`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+Creates a :class:`Column` expression representing a vectorized user 
defined function (UDF).
+
+The user-defined function can define one of the following 
transformations:
+1. One or more `pandas.Series` -> A `pandas.Series`
+
+   This udf is used with `DataFrame.withColumn` and `DataFrame.select`.
+   The returnType should be a primitive data type, e.g., DoubleType()
+
+   Example:
+
+   >>> from pyspark.sql.types import IntegerType, StringType
+   >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+   >>> @pandas_udf(returnType=StringType())
+   ... def to_upper(s):
+   ... return s.str.upper()
+   ...
+   >>> @pandas_udf(returnType="integer")
+   ... def add_one(x):
+   ... return x + 1
+   ...
+   >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", 
"name", "age"))
+   >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
+   ... .show()  # doctest: +SKIP
+   +--+--++
+   |slen(name)|to_upper(name)|add_one(age)|
+   +--+--++
+   | 8|  JOHN DOE|  22|
+   +--+--++
+
+2. A `pandas.DataFrame` -> A `pandas.DataFrame`
+
+   This udf is used with `GroupedData.apply`
+   The returnType should be a StructType describing the schema of the 
returned
+   `pandas.DataFrame`.
+
+   Example:
+
+   >>> df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 
4.0)], ("id", "v"))
+   >>> @pandas_udf(returnType=df.schema)
+   ... def normalize(df):
+   ... v = df.v
+   ... ret = df.assign(v=(v - v.mean()) / v.std())
--- End diff --

Yup, it was an overlook. You are correct.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19405: [SPARK-22178] [SQL] Refresh Persistent Views by R...

2017-10-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19405


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19395: [SPARK-22171] [SQL] Describe Table Extended Failed when ...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19395
  
cc @cloud-fan @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18851: [SPARK-21644][SQL] LocalLimit.maxRows is defined ...

2017-10-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18851


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19393: [SPARK-21644][SQL] LocalLimit.maxRows is defined ...

2017-10-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19393


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19405: [SPARK-22178] [SQL] Refresh Persistent Views by REFRESH ...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19405
  
Thanks! Merged to master/2.2


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19393: [SPARK-21644][SQL] LocalLimit.maxRows is defined incorre...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19393
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142500980
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2129,8 +2130,12 @@ def _create_udf(f, returnType, vectorized):
 def _udf(f, returnType=StringType(), vectorized=vectorized):
 if vectorized:
 import inspect
-if len(inspect.getargspec(f).args) == 0:
-raise NotImplementedError("0-parameter pandas_udfs are not 
currently supported")
+argspec = inspect.getargspec(f)
+if len(argspec.args) == 0 and argspec.varargs is None:
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142499712
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2129,8 +2130,12 @@ def _create_udf(f, returnType, vectorized):
 def _udf(f, returnType=StringType(), vectorized=vectorized):
 if vectorized:
 import inspect
-if len(inspect.getargspec(f).args) == 0:
-raise NotImplementedError("0-parameter pandas_udfs are not 
currently supported")
+argspec = inspect.getargspec(f)
+if len(argspec.args) == 0 and argspec.varargs is None:
--- End diff --

I think `varargs` are fine. I will add the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...

2017-10-03 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19413
  
**[Test build #82429 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82429/testReport)**
 for PR 19413 at commit 
[`079a4e2`](https://github.com/apache/spark/commit/079a4e26ec900745411fdd60e4cd242b9a43320b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfigurati...

2017-10-03 Thread sahilTakiar
Github user sahilTakiar commented on a diff in the pull request:

https://github.com/apache/spark/pull/19413#discussion_r142499531
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -157,20 +157,23 @@ class HadoopRDD[K, V](
   if (conf.isInstanceOf[JobConf]) {
 logDebug("Re-using user-broadcasted JobConf")
 conf.asInstanceOf[JobConf]
-  } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
-logDebug("Re-using cached JobConf")
-HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
   } else {
-// Create a JobConf that will be cached and used across this RDD's 
getJobConf() calls in the
-// local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
-// The caching helps minimize GC, since a JobConf can contain 
~10KB of temporary objects.
-// Synchronize to prevent ConcurrentModificationException 
(SPARK-1097, HADOOP-10456).
-HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
-  logDebug("Creating new JobConf and caching it for later re-use")
-  val newJobConf = new JobConf(conf)
-  initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
-  HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
-  newJobConf
+val newJobConf = HadoopRDD.getCachedMetadata(jobConfCacheKey)
--- End diff --

Thanks for the tip. Updated the patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   >