[GitHub] spark pull request #17062: [SPARK-17495] [SQL] Support date, timestamp and i...

2017-02-27 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/17062#discussion_r103357588
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 ---
@@ -169,6 +171,96 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 // scalastyle:on nonascii
   }
 
+  test("hive-hash for date type") {
+def checkHiveHashForDateType(dateString: String, expected: Long): Unit 
= {
+  checkHiveHash(
+DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
+DateType,
+expected)
+}
+
+// basic case
+checkHiveHashForDateType("2017-01-01", 17167)
+
+// boundary cases
+checkHiveHashForDateType("-01-01", -719530)
+checkHiveHashForDateType("-12-31", 2932896)
+
+// epoch
+checkHiveHashForDateType("1970-01-01", 0)
+
+// before epoch
+checkHiveHashForDateType("1800-01-01", -62091)
+
+// Invalid input: bad date string. Hive returns 0 for such cases
+intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0))
+
intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0))
+
intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0))
+
+// Invalid input: Empty string. Hive returns 0 for this case
+intercept[NoSuchElementException](checkHiveHashForDateType("", 0))
+
+// Invalid input: February 30th for a leap year. Hive supports this 
but Spark doesn't
+
intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861))
+  }
+
+  test("hive-hash for timestamp type") {
+def checkHiveHashForTimestampType(
+timestamp: String,
+expected: Long,
+timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = {
+  checkHiveHash(
+DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), 
timeZone).get,
+TimestampType,
+expected)
+}
+
+// basic case
+checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271)
+
+// with higher precision
+checkHiveHashForTimestampType("2017-02-24 10:56:29.11", 1353936655)
+
+// with different timezone
+checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445732471,
+  TimeZone.getTimeZone("US/Pacific"))
+
+// boundary cases
+checkHiveHashForTimestampType("0001-01-01 00:00:00", 1645926784)
+checkHiveHashForTimestampType("-01-01 00:00:00", -1081818240)
+
+// epoch
+checkHiveHashForTimestampType("1970-01-01 00:00:00", 0)
+
+// before epoch
+checkHiveHashForTimestampType("1800-01-01 03:12:45", -267420885)
+
+// Invalid input: bad timestamp string. Hive returns 0 for such cases
+intercept[NoSuchElementException](checkHiveHashForTimestampType("0-0-0 
0:0:0", 0))
+
intercept[NoSuchElementException](checkHiveHashForTimestampType("-99-99-99 
99:99:45", 0))
+
intercept[NoSuchElementException](checkHiveHashForTimestampType("55-5-",
 0))
+
+// Invalid input: Empty string. Hive returns 0 for this case
+intercept[NoSuchElementException](checkHiveHashForTimestampType("", 0))
+
+// Invalid input: February 30th for a leap year. Hive supports this 
but Spark doesn't
+
intercept[NoSuchElementException](checkHiveHashForTimestampType("2016-02-30 
00:00:00", 0))
+
+// Invalid input: Hive accepts upto 9 decimal place precision but 
Spark uses upto 6
+
intercept[TestFailedException](checkHiveHashForTimestampType("2017-02-24 
10:56:29.", 0))
+  }
+
+  test("hive-hash for CalendarInterval type") {
+def checkHiveHashForTimestampType(interval: String, expected: Long): 
Unit = {
+  checkHiveHash(CalendarInterval.fromString(interval), 
CalendarIntervalType, expected)
+}
+
+checkHiveHashForTimestampType("interval 1 day", 3220073)
+checkHiveHashForTimestampType("interval 6 day 15 hour", 21202073)
+checkHiveHashForTimestampType("interval -23 day 56 hour -113 
minute 9898989 second",
--- End diff --

 SELECT HASH ( INTERVAL '-23' DAY + INTERVAL '56' HOUR + INTERVAL 
'-113' MINUTE + INTERVAL '9898989' SECOND );


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #17062: [SPARK-17495] [SQL] Support date, timestamp and i...

2017-02-27 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/17062#discussion_r103300592
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 ---
@@ -169,6 +171,96 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 // scalastyle:on nonascii
   }
 
+  test("hive-hash for date type") {
+def checkHiveHashForDateType(dateString: String, expected: Long): Unit 
= {
+  checkHiveHash(
+DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
+DateType,
+expected)
+}
+
+// basic case
+checkHiveHashForDateType("2017-01-01", 17167)
+
+// boundary cases
+checkHiveHashForDateType("-01-01", -719530)
+checkHiveHashForDateType("-12-31", 2932896)
+
+// epoch
+checkHiveHashForDateType("1970-01-01", 0)
+
+// before epoch
+checkHiveHashForDateType("1800-01-01", -62091)
+
+// Invalid input: bad date string. Hive returns 0 for such cases
+intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0))
+
intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0))
+
intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0))
+
+// Invalid input: Empty string. Hive returns 0 for this case
+intercept[NoSuchElementException](checkHiveHashForDateType("", 0))
+
+// Invalid input: February 30th for a leap year. Hive supports this 
but Spark doesn't
+
intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861))
+  }
+
+  test("hive-hash for timestamp type") {
+def checkHiveHashForTimestampType(
+timestamp: String,
+expected: Long,
+timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = {
+  checkHiveHash(
+DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), 
timeZone).get,
+TimestampType,
+expected)
+}
+
+// basic case
+checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271)
+
+// with higher precision
+checkHiveHashForTimestampType("2017-02-24 10:56:29.11", 1353936655)
+
+// with different timezone
+checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445732471,
+  TimeZone.getTimeZone("US/Pacific"))
+
+// boundary cases
+checkHiveHashForTimestampType("0001-01-01 00:00:00", 1645926784)
+checkHiveHashForTimestampType("-01-01 00:00:00", -1081818240)
+
+// epoch
+checkHiveHashForTimestampType("1970-01-01 00:00:00", 0)
+
+// before epoch
+checkHiveHashForTimestampType("1800-01-01 03:12:45", -267420885)
+
+// Invalid input: bad timestamp string. Hive returns 0 for such cases
--- End diff --

same as `Date`, invalid timestamp values are not allowed in Spark and it 
will fail. Hive will not fail but fallback to `null` and return `0` as hash 
value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16715: [Spark-18080][ML][PYTHON] Python API & Examples f...

2017-02-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/16715#discussion_r103357342
  
--- Diff: python/pyspark/ml/feature.py ---
@@ -120,6 +122,196 @@ def getThreshold(self):
 return self.getOrDefault(self.threshold)
 
 
+class LSHParams(Params):
+"""
+Mixin for Locality Sensitive Hashing (LSH) algorithm parameters.
+"""
+
+numHashTables = Param(Params._dummy(), "numHashTables", "number of 
hash tables, where " +
+  "increasing number of hash tables lowers the 
false negative rate, " +
+  "and decreasing it improves the running 
performance.",
+  typeConverter=TypeConverters.toInt)
+
+def __init__(self):
+super(LSHParams, self).__init__()
+
+def setNumHashTables(self, value):
+"""
+Sets the value of :py:attr:`numHashTables`.
+"""
+return self._set(numHashTables=value)
+
+def getNumHashTables(self):
+"""
+Gets the value of numHashTables or its default value.
+"""
+return self.getOrDefault(self.numHashTables)
+
+
+class LSHModel(JavaModel):
+"""
+Mixin for Locality Sensitive Hashing (LSH) models.
+"""
+
+def approxNearestNeighbors(self, dataset, key, numNearestNeighbors, 
distCol="distCol"):
+"""
+Given a large dataset and an item, approximately find at most k 
items which have the
+closest distance to the item. If the :py:attr:`outputCol` is 
missing, the method will
+transform the data; if the :py:attr:`outputCol` exists, it will 
use that. This allows
+caching of the transformed data when necessary.
+
+.. note:: This method is experimental and will likely change 
behavior in the next release.
+
+:param dataset: The dataset to search for nearest neighbors of the 
key.
+:param key: Feature vector representing the item to search for.
+:param numNearestNeighbors: The maximum number of nearest 
neighbors.
+:param distCol: Output column for storing the distance between 
each result row and the key.
+Use "distCol" as default value if it's not 
specified.
+:return: A dataset containing at most k items closest to the key. 
A column "distCol" is
+ added to show the distance between each row and the key.
+"""
+return self._call_java("approxNearestNeighbors", dataset, key, 
numNearestNeighbors,
+   distCol)
+
+def approxSimilarityJoin(self, datasetA, datasetB, threshold, 
distCol="distCol"):
+"""
+Join two datasets to approximately find all pairs of rows whose 
distance are smaller than
+the threshold. If the :py:attr:`outputCol` is missing, the method 
will transform the data;
+if the :py:attr:`outputCol` exists, it will use that. This allows 
caching of the
+transformed data when necessary.
+
+:param datasetA: One of the datasets to join.
+:param datasetB: Another dataset to join.
+:param threshold: The threshold for the distance of row pairs.
+:param distCol: Output column for storing the distance between 
each pair of rows. Use
+"distCol" as default value if it's not specified.
+:return: A joined dataset containing pairs of rows. The original 
rows are in columns
+ "datasetA" and "datasetB", and a column "distCol" is 
added to show the distance
+ between each pair.
+"""
+return self._call_java("approxSimilarityJoin", datasetA, datasetB, 
threshold, distCol)
+
+
+@inherit_doc
+class BucketedRandomProjectionLSH(JavaEstimator, LSHParams, HasInputCol, 
HasOutputCol, HasSeed,
+  JavaMLReadable, JavaMLWritable):
+"""
+.. note:: Experimental
+
+LSH class for Euclidean distance metrics.
+The input is dense or sparse vectors, each of which represents a point 
in the Euclidean
+distance space. The output will be vectors of configurable dimension. 
Hash values in the same
+dimension are calculated by the same hash function.
+
+.. seealso:: `Stable Distributions \
+
`_
+.. seealso:: `Hashing for Similarity Search: A Survey 
`_
+
+>>> from pyspark.ml.linalg import Vectors
+>>> from pyspark.sql.functions import col
+>>> data = [(0, Vectors.dense([-1.0, -1.0 ]),),
+... (1, Vectors.dense([-1.0, 

[GitHub] spark pull request #17062: [SPARK-17495] [SQL] Support date, timestamp and i...

2017-02-27 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/17062#discussion_r103281696
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 ---
@@ -169,6 +171,96 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 // scalastyle:on nonascii
   }
 
+  test("hive-hash for date type") {
+def checkHiveHashForDateType(dateString: String, expected: Long): Unit 
= {
+  checkHiveHash(
+DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
+DateType,
+expected)
+}
+
+// basic case
+checkHiveHashForDateType("2017-01-01", 17167)
--- End diff --

expected values computed over hive 1.2. using:

```
SELECT HASH( CAST( "2017-01-01" AS DATE) )
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17062: [SPARK-17495] [SQL] Support date, timestamp and i...

2017-02-27 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/17062#discussion_r103300013
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 ---
@@ -169,6 +171,96 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 // scalastyle:on nonascii
   }
 
+  test("hive-hash for date type") {
+def checkHiveHashForDateType(dateString: String, expected: Long): Unit 
= {
+  checkHiveHash(
+DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
+DateType,
+expected)
+}
+
+// basic case
+checkHiveHashForDateType("2017-01-01", 17167)
+
+// boundary cases
+checkHiveHashForDateType("-01-01", -719530)
+checkHiveHashForDateType("-12-31", 2932896)
+
+// epoch
+checkHiveHashForDateType("1970-01-01", 0)
+
+// before epoch
+checkHiveHashForDateType("1800-01-01", -62091)
+
+// Invalid input: bad date string. Hive returns 0 for such cases
--- End diff --

Spark does not allow creating `Date` which do not fit its spec and throws 
exception. Hive will not fail but fallback to `null` and return `0` as hash 
value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17062: [SPARK-17495] [SQL] Support date, timestamp and i...

2017-02-27 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/17062#discussion_r103357472
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 ---
@@ -169,6 +171,96 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 // scalastyle:on nonascii
   }
 
+  test("hive-hash for date type") {
+def checkHiveHashForDateType(dateString: String, expected: Long): Unit 
= {
+  checkHiveHash(
+DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
+DateType,
+expected)
+}
+
+// basic case
+checkHiveHashForDateType("2017-01-01", 17167)
+
+// boundary cases
+checkHiveHashForDateType("-01-01", -719530)
+checkHiveHashForDateType("-12-31", 2932896)
+
+// epoch
+checkHiveHashForDateType("1970-01-01", 0)
+
+// before epoch
+checkHiveHashForDateType("1800-01-01", -62091)
+
+// Invalid input: bad date string. Hive returns 0 for such cases
+intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0))
+
intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0))
+
intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0))
+
+// Invalid input: Empty string. Hive returns 0 for this case
+intercept[NoSuchElementException](checkHiveHashForDateType("", 0))
+
+// Invalid input: February 30th for a leap year. Hive supports this 
but Spark doesn't
+
intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861))
+  }
+
+  test("hive-hash for timestamp type") {
+def checkHiveHashForTimestampType(
+timestamp: String,
+expected: Long,
+timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = {
+  checkHiveHash(
+DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), 
timeZone).get,
+TimestampType,
+expected)
+}
+
+// basic case
+checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271)
+
+// with higher precision
+checkHiveHashForTimestampType("2017-02-24 10:56:29.11", 1353936655)
+
+// with different timezone
+checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445732471,
+  TimeZone.getTimeZone("US/Pacific"))
+
+// boundary cases
+checkHiveHashForTimestampType("0001-01-01 00:00:00", 1645926784)
+checkHiveHashForTimestampType("-01-01 00:00:00", -1081818240)
+
+// epoch
+checkHiveHashForTimestampType("1970-01-01 00:00:00", 0)
+
+// before epoch
+checkHiveHashForTimestampType("1800-01-01 03:12:45", -267420885)
+
+// Invalid input: bad timestamp string. Hive returns 0 for such cases
+intercept[NoSuchElementException](checkHiveHashForTimestampType("0-0-0 
0:0:0", 0))
+
intercept[NoSuchElementException](checkHiveHashForTimestampType("-99-99-99 
99:99:45", 0))
+
intercept[NoSuchElementException](checkHiveHashForTimestampType("55-5-",
 0))
+
+// Invalid input: Empty string. Hive returns 0 for this case
+intercept[NoSuchElementException](checkHiveHashForTimestampType("", 0))
+
+// Invalid input: February 30th for a leap year. Hive supports this 
but Spark doesn't
+
intercept[NoSuchElementException](checkHiveHashForTimestampType("2016-02-30 
00:00:00", 0))
+
+// Invalid input: Hive accepts upto 9 decimal place precision but 
Spark uses upto 6
+
intercept[TestFailedException](checkHiveHashForTimestampType("2017-02-24 
10:56:29.", 0))
+  }
+
+  test("hive-hash for CalendarInterval type") {
+def checkHiveHashForTimestampType(interval: String, expected: Long): 
Unit = {
+  checkHiveHash(CalendarInterval.fromString(interval), 
CalendarIntervalType, expected)
+}
+
+checkHiveHashForTimestampType("interval 1 day", 3220073)
+checkHiveHashForTimestampType("interval 6 day 15 hour", 21202073)
--- End diff --

SELECT HASH ( INTERVAL '1' DAY + INTERVAL '15' HOUR );


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17062: [SPARK-17495] [SQL] Support date, timestamp and i...

2017-02-27 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/17062#discussion_r103300293
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 ---
@@ -169,6 +171,96 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 // scalastyle:on nonascii
   }
 
+  test("hive-hash for date type") {
+def checkHiveHashForDateType(dateString: String, expected: Long): Unit 
= {
+  checkHiveHash(
+DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
+DateType,
+expected)
+}
+
+// basic case
+checkHiveHashForDateType("2017-01-01", 17167)
+
+// boundary cases
+checkHiveHashForDateType("-01-01", -719530)
+checkHiveHashForDateType("-12-31", 2932896)
+
+// epoch
+checkHiveHashForDateType("1970-01-01", 0)
+
+// before epoch
+checkHiveHashForDateType("1800-01-01", -62091)
+
+// Invalid input: bad date string. Hive returns 0 for such cases
+intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0))
+
intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0))
+
intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0))
+
+// Invalid input: Empty string. Hive returns 0 for this case
+intercept[NoSuchElementException](checkHiveHashForDateType("", 0))
+
+// Invalid input: February 30th for a leap year. Hive supports this 
but Spark doesn't
+
intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861))
+  }
+
+  test("hive-hash for timestamp type") {
+def checkHiveHashForTimestampType(
+timestamp: String,
+expected: Long,
+timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = {
+  checkHiveHash(
+DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), 
timeZone).get,
+TimestampType,
+expected)
+}
+
+// basic case
+checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271)
--- End diff --

Corresponding hive query.
```
select HASH(CAST("2017-02-24 10:56:29" AS TIMESTAMP));
```

Note that this is with system's timezone set to UTC (export 
TZ=/usr/share/zoneinfo/UTC)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17079: [SPARK-19748][SQL]refresh function has a wrong or...

2017-02-27 Thread windpiger
Github user windpiger commented on a diff in the pull request:

https://github.com/apache/spark/pull/17079#discussion_r103357633
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 ---
@@ -178,6 +178,34 @@ class FileIndexSuite extends SharedSQLContext {
   assert(catalog2.allFiles().nonEmpty)
 }
   }
+
+  test("refresh for InMemoryFileIndex with FileStatusCache") {
+withTempDir { dir =>
+  val fileStatusCache = FileStatusCache.getOrCreate(spark)
+  val dirPath = new Path(dir.getAbsolutePath)
+  val catalog = new InMemoryFileIndex(spark, Seq(dirPath), Map.empty,
+None, fileStatusCache) {
+def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+  }
+
+  assert(catalog.leafDirPaths.isEmpty)
+  assert(catalog.leafFilePaths.isEmpty)
+
+  val file = new File(dir, "text.txt")
+  stringToFile(file, "text")
+
+  catalog.refresh()
+
+  assert(catalog.leafFilePaths.size == 1)
+  assert(catalog.leafFilePaths.head.toString.stripSuffix("/") ==
+s"file:${file.getAbsolutePath.stripSuffix("/")}")
--- End diff --

ok, let me modify~ thanks~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16774: [SPARK-19357][ML] Adding parallel model evaluation in ML...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16774
  
**[Test build #73545 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73545/testReport)**
 for PR 16774 at commit 
[`1c2e391`](https://github.com/apache/spark/commit/1c2e391e74696c60690f477fe4e0b636f0d98318).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17080: [SPARK-19739][CORE] propagate S3 session token to cluser

2017-02-27 Thread uncleGen
Github user uncleGen commented on the issue:

https://github.com/apache/spark/pull/17080
  
@steveloughran IMHO, there is no need to use 
`org.apache.hadoop.fs.s3a.Constants` and 
`com.amazonaws.SDKGlobalConfiguration`, otherwise we will import `hadoop-aws` 
and `aws-java-sdk-core` into Spark core. 
also cc @srowen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17079: [SPARK-19748][SQL]refresh function has a wrong order to ...

2017-02-27 Thread windpiger
Github user windpiger commented on the issue:

https://github.com/apache/spark/pull/17079
  
there is no related test case for InMemoryFileIndex with FileStatusCache.
When I do this [PR](https://github.com/apache/spark/pull/17081), and add a 
fileStatusCache in DataSource, I found this bug..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17012: [SPARK-19677][SS] Renaming a file atop an existing one s...

2017-02-27 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/17012
  
What is the proposed semantics from this PR now ?
- If file exists, ignore.
- If file does not exist, try to rename - if fails, throw exception.

Is this right ? If yes, the PR title/description should be changed.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17070: [SPARK-19721][SS] Good error message for version mismatc...

2017-02-27 Thread marmbrus
Github user marmbrus commented on the issue:

https://github.com/apache/spark/pull/17070
  
/cc @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103351299
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -248,18 +248,18 @@ class ALSModel private[ml] (
   @Since("1.3.0")
   def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
 
+  private val predict = udf { (userFeatures: Seq[Float], itemFeatures: 
Seq[Float]) =>
--- End diff --

You could rename userFeatures, itemFeatures to be featuresA, featuresB or 
something to make it clear that there is no ordering here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103352432
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -285,6 +285,43 @@ class ALSModel private[ml] (
 
   @Since("1.6.0")
   override def write: MLWriter = new ALSModel.ALSModelWriter(this)
+
+  @Since("2.2.0")
+  def recommendForAllUsers(num: Int): DataFrame = {
+recommendForAll(userFactors, itemFactors, $(userCol), num)
+  }
+
+  @Since("2.2.0")
+  def recommendForAllItems(num: Int): DataFrame = {
+recommendForAll(itemFactors, userFactors, $(itemCol), num)
+  }
+
+  /**
+   * Makes recommendations for all users (or items).
+   * @param srcFactors src factors for which to generate recommendations
+   * @param dstFactors dst factors used to make recommendations
+   * @param srcOutputColumn name of the column for the source in the 
output DataFrame
+   * @param num number of recommendations for each record
+   * @return a DataFrame of (srcOutputColumn: Int, recommendations), where 
recommendations are
+   * stored as an array of (dstId: Int, ratingL: Double) tuples.
--- End diff --

ratingL -> rating


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103350750
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -285,6 +285,43 @@ class ALSModel private[ml] (
 
   @Since("1.6.0")
   override def write: MLWriter = new ALSModel.ALSModelWriter(this)
+
+  @Since("2.2.0")
+  def recommendForAllUsers(num: Int): DataFrame = {
--- End diff --

Maybe rename to numItems (and numUsers in the other method)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103353799
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.recommendation
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is 
the score value. Finds
+ * the top `num` K2 items based on the given Ordering.
+ */
+
+private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, 
V: TypeTag]
+  (num: Int, ord: Ordering[(K2, V)])
+  extends Aggregator[(K1, K2, V), BoundedPriorityQueue[(K2, V)], 
Array[(K2, V)]] {
+
+  override def zero: BoundedPriorityQueue[(K2, V)] = new 
BoundedPriorityQueue[(K2, V)](num)(ord)
+  override def reduce(
+q: BoundedPriorityQueue[(K2, V)],
+a: (K1, K2, V)): BoundedPriorityQueue[(K2, V)] = q += {(a._2, a._3)}
+  override def merge(
+  q1: BoundedPriorityQueue[(K2, V)],
+  q2: BoundedPriorityQueue[(K2, V)]): BoundedPriorityQueue[(K2, V)] = 
q1 ++= q2
+  override def finish(r: BoundedPriorityQueue[(K2, V)]): Array[(K2, V)] =
+r.toArray.sorted(ord.reverse)
+  override def bufferEncoder: Encoder[BoundedPriorityQueue[(K2, V)]] =
+Encoders.kryo[BoundedPriorityQueue[(K2, V)]]
+  override def outputEncoder: Encoder[Array[(K2, V)]] = 
ExpressionEncoder[Array[(K2, V)]]
--- End diff --

IntelliJ style complaint: include "()" at end


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103354132
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -285,6 +286,55 @@ class ALSModel private[ml] (
 
   @Since("1.6.0")
   override def write: MLWriter = new ALSModel.ALSModelWriter(this)
+
+  /**
+   * Returns top `num` items recommended for each user, for all users.
+   * @param num number of recommendations for each user
--- End diff --

number -> max number


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103353184
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -285,6 +285,43 @@ class ALSModel private[ml] (
 
   @Since("1.6.0")
   override def write: MLWriter = new ALSModel.ALSModelWriter(this)
+
+  @Since("2.2.0")
+  def recommendForAllUsers(num: Int): DataFrame = {
+recommendForAll(userFactors, itemFactors, $(userCol), num)
+  }
+
+  @Since("2.2.0")
+  def recommendForAllItems(num: Int): DataFrame = {
+recommendForAll(itemFactors, userFactors, $(itemCol), num)
+  }
+
+  /**
+   * Makes recommendations for all users (or items).
+   * @param srcFactors src factors for which to generate recommendations
+   * @param dstFactors dst factors used to make recommendations
+   * @param srcOutputColumn name of the column for the source in the 
output DataFrame
+   * @param num number of recommendations for each record
+   * @return a DataFrame of (srcOutputColumn: Int, recommendations), where 
recommendations are
+   * stored as an array of (dstId: Int, ratingL: Double) tuples.
+   */
+  private def recommendForAll(
+  srcFactors: DataFrame,
+  dstFactors: DataFrame,
+  srcOutputColumn: String,
+  num: Int): DataFrame = {
+import srcFactors.sparkSession.implicits._
+
+val ratings = srcFactors.crossJoin(dstFactors)
+  .select(
+srcFactors("id").as("srcId"),
+dstFactors("id").as("dstId"),
+predict(srcFactors("features"), 
dstFactors("features")).as($(predictionCol)))
+// We'll force the IDs to be Int. Unfortunately this converts IDs to 
Int in the output.
+val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, 
Ordering.by(_._2))
+ratings.as[(Int, Int, 
Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
--- End diff --

It'd be nice to specify field names for dstId and rating and to document 
the schema in the recommend methods.  That will help users extract 
recommendations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14273: [SPARK-9140] [ML] Replace TimeTracker by MultiSto...

2017-02-27 Thread MechCoder
Github user MechCoder closed the pull request at:

https://github.com/apache/spark/pull/14273


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16959: [SPARK-19631][CORE] OutputCommitCoordinator shoul...

2017-02-27 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16959#discussion_r103354382
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -111,13 +115,13 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
 val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
 java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
 synchronized {
-  authorizedCommittersByStage(stage) = arr
+  stageStates(stage) = new StageState(arr)
--- End diff --

ah sorry now that I see this I realized it probably makes sense to 
initialize `arr` in the StageState constructor too (so this line would look 
like `new StageState(maxPartitionId +1)`, and the `StageState` constructor just 
takes in `numPartitions`).  Would you mind making that change too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103354299
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
+case In(_, Seq(_: ListQuery)) => true
+case _ => false
+  }.isDefined
+}.isDefined
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = {
+e.transform {
+  case OuterReference(r) => r
+}
+  }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * OuterReference(min(b)) instead of min(OuterReference(b)).
+   * TODO: Currently we don't allow deep correlation. Also, we don't allow 
mixing of
+   * outer references and local references under an aggregate expression.
+   * For example (SQL):
+   * {{{
+   *   SELECT .. FROM p1
+   *   WHERE EXISTS (SELECT ...
+   * FROM p2
+   * WHERE EXISTS (SELECT ...
+   *   FROM sq
+   *   WHERE min(p1.a + p2.b) = sq.c))
+   *
+   *  

[GitHub] spark issue #17090: [Spark-19535][ML] RecommendForAllUsers RecommendForAllIt...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17090
  
**[Test build #73543 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73543/testReport)**
 for PR 17090 at commit 
[`832b066`](https://github.com/apache/spark/commit/832b066f490c212b5a79fd045460525afd9576b9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16959: [SPARK-19631][CORE] OutputCommitCoordinator should not a...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16959
  
**[Test build #73544 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73544/testReport)**
 for PR 16959 at commit 
[`20f028a`](https://github.com/apache/spark/commit/20f028ad5e6f746842ca3dd10ea12811a4a699a4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17052: [SPARK-19690][SS] Join a streaming DataFrame with a batc...

2017-02-27 Thread uncleGen
Github user uncleGen commented on the issue:

https://github.com/apache/spark/pull/17052
  
working on unit test failure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103353840
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2512,3 +2522,67 @@ object ResolveCreateNamedStruct extends 
Rule[LogicalPlan] {
   CreateNamedStruct(children.toList)
   }
 }
+
+/**
+ * The aggregate expressions from subquery referencing outer query block 
are pushed
+ * down to the outer query block for evaluation. This rule below updates 
such outer references
+ * as AttributeReference referring attributes from the parent/outer query 
block.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT l.a FROM l GROUP BY 1 HAVING EXISTS (SELECT 1 FROM r WHERE r.d 
< min(l.b))
+ * }}}
+ * Plan before the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < min(outer(b#227)))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ * Plan after the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < outer(min(b#227)#249))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ */
+object UpdateOuterReferences extends Rule[LogicalPlan] {
+  private def stripAlias(expr: Expression): Expression = expr match { case 
a: Alias => a.child }
+
+  private def updateOuterReferenceInSubquery(
+  plan: LogicalPlan,
+  refExprs: Seq[Expression]): LogicalPlan = {
+plan transformAllExpressions { case e =>
+  val outerAlias =
+
refExprs.find(stripAlias(_).semanticEquals(SubExprUtils.stripOuterReference(e)))
+  outerAlias match {
+case Some(a: Alias) => OuterReference(a.toAttribute)
+case _ => e
+  }
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+plan transform {
+  case f @ Filter(_, a: Aggregate) if f.resolved =>
--- End diff --

@hvanhovell At the time of executing this rule, the aggregate expressions 
from the subquery plan are already pushed down to the Aggregate operator 
through ResolveAggregateFunctions. Here we are just updating the outer 
references in the subquery plan. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17068: [SPARK-19709][SQL] Read empty file with CSV data ...

2017-02-27 Thread wojtek-szymanski
Github user wojtek-szymanski commented on a diff in the pull request:

https://github.com/apache/spark/pull/17068#discussion_r103353729
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
 ---
@@ -40,7 +41,19 @@ private[csv] object CSVInferSchema {
   csv: Dataset[String],
   caseSensitive: Boolean,
   options: CSVOptions): StructType = {
-val firstLine: String = CSVUtils.filterCommentAndEmpty(csv, 
options).first()
+val lines = CSVUtils.filterCommentAndEmpty(csv, options)
--- End diff --

You are absolutely right. Relying on exception handling is smelly, while 
`Option` gives more opportunities. I also see no difference from performance 
point of view, since both `first()` and `take(1)` call the the same function 
`head(1)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16826: [SPARK-19540][SQL] Add ability to clone SparkSession whe...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16826
  
**[Test build #73542 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73542/testReport)**
 for PR 16826 at commit 
[`300d3a0`](https://github.com/apache/spark/commit/300d3a05e43dee853b452973eea8a707d486dd61).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17091: [SPARK-19757][CORE] Executor with task scheduled could b...

2017-02-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17091
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103352266
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -17,43 +17,60 @@
 
 package org.apache.spark.sql.internal
 
-import java.io.File
-
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryManager}
+import org.apache.spark.sql.streaming.StreamingQueryManager
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]].
  */
-private[sql] class SessionState(sparkSession: SparkSession) {
+private[sql] class SessionState(
+sparkContext: SparkContext,
+sharedState: SharedState,
+val conf: SQLConf,
+val experimentalMethods: ExperimentalMethods,
+val functionRegistry: FunctionRegistry,
+val catalog: SessionCatalog,
+val sqlParser: ParserInterface,
+val analyzer: Analyzer,
+val streamingQueryManager: StreamingQueryManager,
+val queryExecutionCreator: LogicalPlan => QueryExecution) {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12762: [SPARK-14891][ML] Add schema validation for ALS

2017-02-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/12762#discussion_r103352114
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -242,16 +263,19 @@ class ALSModel private[ml] (
   }
 }
 dataset
-  .join(userFactors, dataset($(userCol)) === userFactors("id"), "left")
-  .join(itemFactors, dataset($(itemCol)) === itemFactors("id"), "left")
+  .join(userFactors,
+checkedCast(dataset($(userCol)).cast(DoubleType)) === 
userFactors("id"), "left")
--- End diff --

Right, makes sense, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17078: [SPARK-19746][ML] Faster indexing for logistic aggregato...

2017-02-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17078
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17078: [SPARK-19746][ML] Faster indexing for logistic aggregato...

2017-02-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17078
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73537/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307491
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): HiveSessionState = {
+
+val initHelper = SessionState(sparkSession, conf)
+
+val sparkContext = sparkSession.sparkContext
+
+val catalog = HiveSessionCatalog(
+  sparkSession,
+  SessionState.createFunctionResourceLoader(sparkContext, 
sparkSession.sharedState),
+  initHelper.functionRegistry,
+  initHelper.conf,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
+  initHelper.sqlParser)
+
+// A Hive client used for interacting with the metastore.
+val metadataHive: HiveClient =
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+// An analyzer that uses the Hive metastore.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+
+val plannerCreator = createPlannerCreator(
+  sparkSession,
+  initHelper.conf,
+  initHelper.experimentalMethods)
+
+new HiveSessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  initHelper.conf,
+  initHelper.experimentalMethods,
+  initHelper.functionRegistry,
+  catalog,
+  initHelper.sqlParser,
+  metadataHive,
+  analyzer,
+  initHelper.streamingQueryManager,
+  initHelper.queryExecutionCreator,
+  plannerCreator)
+  }
+
+  def createAnalyzer(
+  sparkSession: SparkSession,
+  catalog: HiveSessionCatalog,
+  sqlConf: SQLConf): Analyzer = {
+
+new Analyzer(catalog, sqlConf) {
+  override val extendedResolutionRules =
+new ResolveHiveSerdeTable(sparkSession) ::
+new FindDataSourceTable(sparkSession) ::
+new FindHiveSerdeTable(sparkSession) ::
+new ResolveSQLOnFile(sparkSession) :: Nil
+
+  override val postHocResolutionRules =
+catalog.ParquetConversions ::
+catalog.OrcConversions ::
+PreprocessTableCreation(sparkSession) ::
+PreprocessTableInsertion(sqlConf) ::
+DataSourceAnalysis(sqlConf) ::
+HiveAnalysis :: Nil
+
+  override val extendedCheckRules = Seq(PreWriteCheck)
+}
+  }
+
+  def createPlannerCreator(
+  associatedSparkSession: SparkSession,
+  sqlConf: SQLConf,
+  experimentalMethods: ExperimentalMethods): () => SparkPlanner = {
+
--- End diff 

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103338672
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class SessionStateSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  protected var activeSession: SparkSession = _
+
+  protected def createSession(): Unit = {
+activeSession = SparkSession.builder().master("local").getOrCreate()
+  }
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {
+activeSession.stop()
+  }
+
+  test("fork new session and inherit RuntimeConfig options") {
+val key = "spark-config-clone"
+activeSession.conf.set(key, "active")
+
+// inheritance
+val forkedSession = activeSession.cloneSession()
+assert(forkedSession ne activeSession)
+assert(forkedSession.conf ne activeSession.conf)
+assert(forkedSession.conf.get(key) == "active")
+
+// independence
+forkedSession.conf.set(key, "forked")
+assert(activeSession.conf.get(key) == "active")
+activeSession.conf.set(key, "dontcopyme")
+assert(forkedSession.conf.get(key) == "forked")
+  }
+
+  test("fork new session and inherit function registry and udf") {
+activeSession.udf.register("strlenScala", (_: String).length + (_: 
Int))
+val forkedSession = activeSession.cloneSession()
+
+// inheritance
+assert(forkedSession ne activeSession)
+assert(forkedSession.sessionState.functionRegistry ne
+  activeSession.sessionState.functionRegistry)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+
+// independence
+forkedSession.sessionState.functionRegistry.dropFunction("strlenScala")
+
assert(activeSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+activeSession.udf.register("addone", (_: Int) + 1)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("addone").isEmpty)
+  }
+
+  test("fork new session and inherit experimental methods") {
+object DummyRule1 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+object DummyRule2 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+val optimizations = List(DummyRule1, DummyRule2)
+
+activeSession.experimental.extraOptimizations = optimizations
+
+val forkedSession = activeSession.cloneSession()
+
+// inheritance
+assert(forkedSession ne activeSession)
+assert(forkedSession.experimental ne activeSession.experimental)
+assert(forkedSession.experimental.extraOptimizations.toSet ==
+  activeSession.experimental.extraOptimizations.toSet)
+
+// independence
+forkedSession.experimental.extraOptimizations = List(DummyRule2)
+assert(activeSession.experimental.extraOptimizations == optimizations)
+activeSession.experimental.extraOptimizations = List(DummyRule1)
+assert(forkedSession.experimental.extraOptimizations == 
List(DummyRule2))
+  }
+
+  test("fork new sessions and run query on inherited table") {
+def checkTableExists(sparkSession: SparkSession): Unit = {
+  QueryTest.checkAnswer(sparkSession.sql(
+"""
+  |SELECT x.str, COUNT(*)
+  |FROM df x JOIN df y ON x.str = y.str
+  |GROUP BY x.str
+""".stripMargin),
+Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
+}
+

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103305709
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -90,110 +208,29 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 }
   }
 
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  lazy val catalog = new SessionCatalog(
-sparkSession.sharedState.externalCatalog,
-sparkSession.sharedState.globalTempViewManager,
-functionResourceLoader,
-functionRegistry,
-conf,
-newHadoopConf(),
-sqlParser)
+  def newHadoopConf(copyHadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
+val hadoopConf = new Configuration(copyHadoopConf)
+sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
hadoopConf.set(k, v) }
+hadoopConf
+  }
 
-  /**
-   * Interface exposed to the user for registering user-defined functions.
-   * Note that the user-defined functions must be deterministic.
-   */
-  lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
+  def createAnalyzer(
+  sparkSession: SparkSession,
+  catalog: SessionCatalog,
+  sqlConf: SQLConf): Analyzer = {
 
-  /**
-   * Logical query plan analyzer for resolving unresolved attributes and 
relations.
-   */
-  lazy val analyzer: Analyzer = {
-new Analyzer(catalog, conf) {
+new Analyzer(catalog, sqlConf) {
   override val extendedResolutionRules =
 new FindDataSourceTable(sparkSession) ::
 new ResolveSQLOnFile(sparkSession) :: Nil
 
   override val postHocResolutionRules =
 PreprocessTableCreation(sparkSession) ::
-PreprocessTableInsertion(conf) ::
-DataSourceAnalysis(conf) :: Nil
+PreprocessTableInsertion(sqlConf) ::
+DataSourceAnalysis(sqlConf) :: Nil
 
   override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck)
 }
   }
 
-  /**
-   * Logical query plan optimizer.
-   */
-  lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, 
experimentalMethods)
-
-  /**
-   * Parser that extracts expressions, plans, table identifiers etc. from 
SQL texts.
-   */
-  lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
-
-  /**
-   * Planner that converts optimized logical plans to physical plans.
-   */
-  def planner: SparkPlanner =
-new SparkPlanner(sparkSession.sparkContext, conf, 
experimentalMethods.extraStrategies)
-
-  /**
-   * An interface to register custom 
[[org.apache.spark.sql.util.QueryExecutionListener]]s
-   * that listen for execution metrics.
-   */
-  lazy val listenerManager: ExecutionListenerManager = new 
ExecutionListenerManager
-
-  /**
-   * Interface to start and stop [[StreamingQuery]]s.
-   */
-  lazy val streamingQueryManager: StreamingQueryManager = {
-new StreamingQueryManager(sparkSession)
-  }
-
-  private val jarClassLoader: NonClosableMutableURLClassLoader =
-sparkSession.sharedState.jarClassLoader
-
-  // Automatically extract all entries and put it in our SQLConf
-  // We need to call it after all of vals have been initialized.
-  sparkSession.sparkContext.getConf.getAll.foreach { case (k, v) =>
-conf.setConfString(k, v)
-  }
-
-  // --
-  //  Helper methods, partially leftover from pre-2.0 days
-  // --
-
-  def executePlan(plan: LogicalPlan): QueryExecution = new 
QueryExecution(sparkSession, plan)
-
-  def refreshTable(tableName: String): Unit = {
-catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
-  }
-
-  def addJar(path: String): Unit = {
-sparkSession.sparkContext.addJar(path)
-
-val uri = new Path(path).toUri
-val jarURL = if (uri.getScheme == null) {
-  // `path` is a local file path without a URL scheme
-  new File(path).toURI.toURL
-} else {
-  // `path` is a URL with a scheme
-  uri.toURL
-}
-jarClassLoader.addURL(jarURL)
-Thread.currentThread().setContextClassLoader(jarClassLoader)
-  }
-
-  /**
-   * Analyzes the given table in the current database to generate 
statistics, which will be
-   * used in query optimizations.
-   */
-  def analyze(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = 
{
--- End diff --

Cool.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project 

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r10330
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class SessionStateSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  protected var activeSession: SparkSession = _
+
+  protected def createSession(): Unit = {
+activeSession = SparkSession.builder().master("local").getOrCreate()
+  }
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {
+activeSession.stop()
+  }
+
+  test("fork new session and inherit RuntimeConfig options") {
+val key = "spark-config-clone"
+activeSession.conf.set(key, "active")
+
+// inheritance
+val forkedSession = activeSession.cloneSession()
+assert(forkedSession ne activeSession)
+assert(forkedSession.conf ne activeSession.conf)
+assert(forkedSession.conf.get(key) == "active")
+
+// independence
+forkedSession.conf.set(key, "forked")
+assert(activeSession.conf.get(key) == "active")
+activeSession.conf.set(key, "dontcopyme")
+assert(forkedSession.conf.get(key) == "forked")
+  }
+
+  test("fork new session and inherit function registry and udf") {
+activeSession.udf.register("strlenScala", (_: String).length + (_: 
Int))
+val forkedSession = activeSession.cloneSession()
+
+// inheritance
+assert(forkedSession ne activeSession)
+assert(forkedSession.sessionState.functionRegistry ne
+  activeSession.sessionState.functionRegistry)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+
+// independence
+forkedSession.sessionState.functionRegistry.dropFunction("strlenScala")
+
assert(activeSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+activeSession.udf.register("addone", (_: Int) + 1)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("addone").isEmpty)
+  }
+
+  test("fork new session and inherit experimental methods") {
+object DummyRule1 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+object DummyRule2 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+val optimizations = List(DummyRule1, DummyRule2)
+
+activeSession.experimental.extraOptimizations = optimizations
+
--- End diff --

removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103337066
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
@@ -136,6 +139,26 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
 }
 SparkSession.sqlListener.get()
   }
+
+  /*
+   * This belongs here more than in `SessionState`. However, does not seem 
that it can be
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103336475
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103295794
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 ---
@@ -1196,4 +1198,28 @@ class SessionCatalogSuite extends PlanTest {
   catalog.listFunctions("unknown_db", "func*")
 }
   }
+
+  test("copy SessionCatalog") {
--- End diff --

changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103347559
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala ---
@@ -493,6 +493,28 @@ class CatalogSuite
 }
   }
 
+  test("clone Catalog") {
+// need to test tempTables are cloned
+assert(spark.catalog.listTables().collect().isEmpty)
+
+createTempTable("my_temp_table")
+assert(spark.catalog.listTables().collect().map(_.name).toSet == 
Set("my_temp_table"))
+
+// inheritance
+val forkedSession = spark.cloneSession()
+assert(spark ne forkedSession)
+assert(spark.catalog ne forkedSession.catalog)
+assert(forkedSession.catalog.listTables().collect().map(_.name).toSet 
== Set("my_temp_table"))
+
+// independence
+dropTable("my_temp_table") // drop table in original session
+assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
+assert(forkedSession.catalog.listTables().collect().map(_.name).toSet 
== Set("my_temp_table"))
+forkedSession.sessionState.catalog
+  .createTempView("fork_table", Range(1, 2, 3, 4), overrideIfExists = 
true)
+assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
+  }
+
   // TODO: add tests for the rest of them
--- End diff --

After that TODO was added, there have been many additions of tests.
This is a large interface, so the tests are scattered over this suite and 
`GlobalTempViewSuite`, `CachedTableSuite`, 
`PartitionProviderCompatibilitySuite`, `(Hive)MetadataCacheSuite`, `DDLSuite`, 
`ParquetQuerySuite` to name a few.
Though I do see at least one test for every method in the trait `Catalog`.
Checked with @cloud-fan in person, we should be fine with removing this 
TODO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17078: [SPARK-19746][ML] Faster indexing for logistic aggregato...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17078
  
**[Test build #73537 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73537/testReport)**
 for PR 17078 at commit 
[`44ee113`](https://github.com/apache/spark/commit/44ee1137efc5e23ffc6a5bfb8dec54d95e7a72e2).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103351494
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

can we use a java collection so that we can remove elements while iterating?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103331408
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -17,89 +17,50 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.execution.SparkPlanner
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, 
SparkSqlParser}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.streaming.StreamingQueryManager
 
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]] backed by Hive.
  */
-private[hive] class HiveSessionState(sparkSession: SparkSession)
-  extends SessionState(sparkSession) {
-
-  self =>
-
-  /**
-   * A Hive client used for interacting with the metastore.
-   */
-  lazy val metadataHive: HiveClient =
-
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.newSession()
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  override lazy val catalog = {
-new HiveSessionCatalog(
-  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
-  sparkSession.sharedState.globalTempViewManager,
-  sparkSession,
-  functionResourceLoader,
-  functionRegistry,
+private[hive] class HiveSessionState(
+sparkContext: SparkContext,
+sharedState: SharedState,
+conf: SQLConf,
+experimentalMethods: ExperimentalMethods,
+functionRegistry: FunctionRegistry,
+override val catalog: HiveSessionCatalog,
+sqlParser: ParserInterface,
+val metadataHive: HiveClient,
+override val analyzer: Analyzer,
--- End diff --

Previous implementation needed it to be that way. But can remove `override` 
now. Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103336696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
+
+val sparkContext = sparkSession.sparkContext
+
+// SQL-specific key-value configurations.
--- End diff --

changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103302329
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
+
+val sparkContext = sparkSession.sparkContext
+
+// SQL-specific key-value configurations.
+val sqlConf = conf.getOrElse(new SQLConf)
+
+// Automatically extract all entries and put them in our SQLConf
+mergeSparkConf(sqlConf, sparkContext.getConf)
+
+// Internal catalog for managing functions registered by the user.
+val functionRegistry = FunctionRegistry.builtin.clone()
+
+// A class for loading resources specified by a function.
+val functionResourceLoader: FunctionResourceLoader =
+  createFunctionResourceLoader(sparkContext, sparkSession.sharedState)
+
+// Parser that extracts expressions, plans, table identifiers etc. 
from SQL texts.
+val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
+
+// Internal catalog for managing table and database states.
+val catalog = new SessionCatalog(
+  sparkSession.sharedState.externalCatalog,
+  sparkSession.sharedState.globalTempViewManager,
+  functionResourceLoader,
+  functionRegistry,
+  sqlConf,
+  newHadoopConf(sparkContext.hadoopConfiguration, sqlConf),
+  sqlParser)
+
+// Logical query plan analyzer for resolving unresolved attributes and 
relations.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, sqlConf)
+
+// Interface to start and stop [[StreamingQuery]]s.
+val streamingQueryManager: StreamingQueryManager = new 
StreamingQueryManager(sparkSession)
+
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(sparkSession, plan)
+
+new SessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  sqlConf,
+  new ExperimentalMethods,
+  functionRegistry,
+  catalog,
+  sqlParser,
+  analyzer,
+  streamingQueryManager,
+  queryExecutionCreator)
+  }
+
+  def createFunctionResourceLoader(
--- End diff --

`createFunctionResourceLoader` is also used in `HiveSessionState.apply`, 
private would make it inaccessible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, 

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103295639
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -1178,4 +1181,36 @@ class SessionCatalog(
 }
   }
 
+  /**
+   * Get an identical copy of the `SessionCatalog`.
+   * The temporary tables and function registry are retained.
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307776
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): HiveSessionState = {
+
+val initHelper = SessionState(sparkSession, conf)
+
+val sparkContext = sparkSession.sparkContext
+
+val catalog = HiveSessionCatalog(
+  sparkSession,
+  SessionState.createFunctionResourceLoader(sparkContext, 
sparkSession.sharedState),
+  initHelper.functionRegistry,
+  initHelper.conf,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
+  initHelper.sqlParser)
+
+// A Hive client used for interacting with the metastore.
+val metadataHive: HiveClient =
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+// An analyzer that uses the Hive metastore.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+
+val plannerCreator = createPlannerCreator(
+  sparkSession,
+  initHelper.conf,
+  initHelper.experimentalMethods)
+
+new HiveSessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  initHelper.conf,
+  initHelper.experimentalMethods,
+  initHelper.functionRegistry,
+  catalog,
+  initHelper.sqlParser,
+  metadataHive,
+  analyzer,
+  initHelper.streamingQueryManager,
+  initHelper.queryExecutionCreator,
+  plannerCreator)
+  }
+
+  def createAnalyzer(
+  sparkSession: SparkSession,
+  catalog: HiveSessionCatalog,
+  sqlConf: SQLConf): Analyzer = {
+
+new Analyzer(catalog, sqlConf) {
+  override val extendedResolutionRules =
+new ResolveHiveSerdeTable(sparkSession) ::
+new FindDataSourceTable(sparkSession) ::
+new FindHiveSerdeTable(sparkSession) ::
+new ResolveSQLOnFile(sparkSession) :: Nil
+
+  override val postHocResolutionRules =
+catalog.ParquetConversions ::
+catalog.OrcConversions ::
+PreprocessTableCreation(sparkSession) ::
+PreprocessTableInsertion(sqlConf) ::
+DataSourceAnalysis(sqlConf) ::
+HiveAnalysis :: Nil
+
+  override val extendedCheckRules = Seq(PreWriteCheck)
+}
+  }
+
+  def createPlannerCreator(
+  associatedSparkSession: SparkSession,
+  sqlConf: SQLConf,
+  experimentalMethods: ExperimentalMethods): () => SparkPlanner = {
+
+() =>

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103328320
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala 
---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+class HiveSessionStateSuite extends SessionStateSuite
+  with TestHiveSingleton with BeforeAndAfterEach {
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {}
+
+  override def createSession(): Unit = {
+activeSession = spark.newSession()
+  }
+
--- End diff --

removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103303272
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -90,110 +203,37 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 }
   }
 
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  lazy val catalog = new SessionCatalog(
-sparkSession.sharedState.externalCatalog,
-sparkSession.sharedState.globalTempViewManager,
-functionResourceLoader,
-functionRegistry,
-conf,
-newHadoopConf(),
-sqlParser)
+  def newHadoopConf(copyHadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
--- End diff --

Changed name.
This is also used in `HiveSessionState.apply`, would be rendered 
inaccessible if `private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103295692
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -1178,4 +1181,36 @@ class SessionCatalog(
 }
   }
 
+  /**
+   * Get an identical copy of the `SessionCatalog`.
+   * The temporary tables and function registry are retained.
+   * The table relation cache will not be populated.
+   * @note `externalCatalog` and `globalTempViewManager` are from shared 
state, don't need deep
+   * copy. `FunctionResourceLoader` is effectively stateless, also does 
not need deep copy.
+   * All arguments passed in should be associated with a particular 
`SparkSession`.
+   */
+  def clone(
+  conf: CatalystConf,
+  hadoopConf: Configuration,
+  functionRegistry: FunctionRegistry,
+  parser: ParserInterface): SessionCatalog = {
+
+val catalog = new SessionCatalog(
+  externalCatalog,
+  globalTempViewManager,
+  functionResourceLoader,
+  functionRegistry,
+  conf,
+  hadoopConf,
+  parser)
+
+synchronized {
+  catalog.currentDb = currentDb
+  // copy over temporary tables
+  tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
+}
+
+catalog
+  }
+
--- End diff --

Removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103306212
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
@@ -136,6 +139,26 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
 }
 SparkSession.sqlListener.get()
   }
+
+  /*
+   * This belongs here more than in `SessionState`. However, does not seem 
that it can be
+   * removed from `SessionState` and `HiveSessionState` without using 
reflection in
+   * `AddJarCommand`.
+   */
+  def addJar(path: String): Unit = {
+sparkContext.addJar(path)
+
--- End diff --

removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307299
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala ---
@@ -212,3 +247,31 @@ private[sql] class HiveSessionCatalog(
 "histogram_numeric"
   )
 }
+
+private[sql] object HiveSessionCatalog {
+
+  def apply(
+  sparkSession: SparkSession,
+  functionResourceLoader: FunctionResourceLoader,
+  functionRegistry: FunctionRegistry,
+  conf: SQLConf,
+  hadoopConf: Configuration,
+  parser: ParserInterface): HiveSessionCatalog = {
+
+// Catalog for handling data source tables. TODO: This really doesn't 
belong here since it is
+// essentially a cache for metastore tables. However, it relies on a 
lot of session-specific
+// things so it would be a lot of work to split its functionality 
between HiveSessionCatalog
+// and HiveCatalog. We should still do it at some point...
+val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
+
+new HiveSessionCatalog(
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+  sparkSession.sharedState.globalTempViewManager,
+  metastoreCatalog,
+  functionResourceLoader: FunctionResourceLoader,
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103329699
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---
@@ -144,11 +145,37 @@ private[hive] class TestHiveSparkSession(
 existingSharedState.getOrElse(new SharedState(sc))
   }
 
-  // TODO: Let's remove TestHiveSessionState. Otherwise, we are not really 
testing the reflection
-  // logic based on the setting of CATALOG_IMPLEMENTATION.
+  private def createHiveSessionState: HiveSessionState = {
--- End diff --

Neat, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103298123
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala ---
@@ -46,4 +46,10 @@ class ExperimentalMethods private[sql]() {
 
   @volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil
 
+  override def clone(): ExperimentalMethods = {
--- End diff --

Good point, added a sync block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103297916
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 ---
@@ -1196,4 +1198,28 @@ class SessionCatalogSuite extends PlanTest {
   catalog.listFunctions("unknown_db", "func*")
 }
   }
+
+  test("copy SessionCatalog") {
+val externalCatalog = newEmptyCatalog()
+val original = new SessionCatalog(externalCatalog)
+val tempTable1 = Range(1, 10, 1, 10)
+original.createTempView("copytest1", tempTable1, overrideIfExists = 
false)
+
+// check if tables copied over
+val clone = original.clone(
+  SimpleCatalystConf(caseSensitiveAnalysis = true),
+  new Configuration(),
+  new SimpleFunctionRegistry,
+  CatalystSqlParser)
+assert(original ne clone)
+assert(clone.getTempView("copytest1") == Option(tempTable1))
+
+// check if clone and original independent
+clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = 
false, purge = false)
+assert(original.getTempView("copytest1") == Option(tempTable1))
+
+val tempTable2 = Range(1, 20, 2, 10)
--- End diff --

Added a test for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307383
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103305545
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -90,110 +203,37 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 }
   }
 
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  lazy val catalog = new SessionCatalog(
-sparkSession.sharedState.externalCatalog,
-sparkSession.sharedState.globalTempViewManager,
-functionResourceLoader,
-functionRegistry,
-conf,
-newHadoopConf(),
-sqlParser)
+  def newHadoopConf(copyHadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
+val hadoopConf = new Configuration(copyHadoopConf)
+sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
hadoopConf.set(k, v) }
+hadoopConf
+  }
 
-  /**
-   * Interface exposed to the user for registering user-defined functions.
-   * Note that the user-defined functions must be deterministic.
-   */
-  lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
+  def createAnalyzer(
--- End diff --

Added docs, changed to private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307420
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): HiveSessionState = {
+
+val initHelper = SessionState(sparkSession, conf)
+
+val sparkContext = sparkSession.sparkContext
+
+val catalog = HiveSessionCatalog(
+  sparkSession,
+  SessionState.createFunctionResourceLoader(sparkContext, 
sparkSession.sharedState),
+  initHelper.functionRegistry,
+  initHelper.conf,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
+  initHelper.sqlParser)
+
+// A Hive client used for interacting with the metastore.
+val metadataHive: HiveClient =
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+// An analyzer that uses the Hive metastore.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+
+val plannerCreator = createPlannerCreator(
+  sparkSession,
+  initHelper.conf,
+  initHelper.experimentalMethods)
+
+new HiveSessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  initHelper.conf,
+  initHelper.experimentalMethods,
+  initHelper.functionRegistry,
+  catalog,
+  initHelper.sqlParser,
+  metadataHive,
+  analyzer,
+  initHelper.streamingQueryManager,
+  initHelper.queryExecutionCreator,
+  plannerCreator)
+  }
+
+  def createAnalyzer(
--- End diff --

Yes!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103336676
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
+
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103298622
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -17,43 +17,60 @@
 
 package org.apache.spark.sql.internal
 
-import java.io.File
-
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryManager}
+import org.apache.spark.sql.streaming.StreamingQueryManager
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]].
  */
-private[sql] class SessionState(sparkSession: SparkSession) {
+private[sql] class SessionState(
+sparkContext: SparkContext,
+sharedState: SharedState,
+val conf: SQLConf,
+val experimentalMethods: ExperimentalMethods,
+val functionRegistry: FunctionRegistry,
+val catalog: SessionCatalog,
+val sqlParser: ParserInterface,
+val analyzer: Analyzer,
+val streamingQueryManager: StreamingQueryManager,
+val queryExecutionCreator: LogicalPlan => QueryExecution) {
+
+  /**
+   * Interface exposed to the user for registering user-defined functions.
+   * Note that the user-defined functions must be deterministic.
+   */
+  val udf: UDFRegistration = new UDFRegistration(functionRegistry)
 
-  // Note: These are all lazy vals because they depend on each other (e.g. 
conf) and we
-  // want subclasses to override some of the fields. Otherwise, we would 
get a lot of NPEs.
+  // Logical query plan optimizer.
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103308243
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala 
---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.internal.SQLConf
+
+class HiveSessionCatalogSuite extends SessionCatalogSuite {
--- End diff --

Good point, removed extends.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103308317
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala 
---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+class HiveSessionStateSuite extends SessionStateSuite
+  with TestHiveSingleton with BeforeAndAfterEach {
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {}
+
+  override def createSession(): Unit = {
+activeSession = spark.newSession()
--- End diff --

Added comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17078: [SPARK-19746][ML] Faster indexing for logistic aggregato...

2017-02-27 Thread dbtsai
Github user dbtsai commented on the issue:

https://github.com/apache/spark/pull/17078
  
Thanks. Merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
Github user sueann commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103350775
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -248,18 +248,18 @@ class ALSModel private[ml] (
   @Since("1.3.0")
   def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
 
+  private val predict = udf { (userFeatures: Seq[Float], itemFeatures: 
Seq[Float]) =>
+if (userFeatures != null && itemFeatures != null) {
+  blas.sdot(rank, userFeatures.toArray, 1, itemFeatures.toArray, 1)
--- End diff --

Good point! But since I copy-pasted this block in this PR, maybe it's okay 
to try it out in another PR? At least with what we have here we know it's not a 
regression. Want to make sure we get some version of ALS recommendForAll* in 
2.2. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
Github user sueann commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103350410
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.recommendation
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is 
the score value. Finds
+ * the top `num` K2 items based on the given Ordering.
+ */
+
+private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, 
V: TypeTag]
+  (num: Int, ord: Ordering[(K2, V)])
+  extends Aggregator[(K1, K2, V), BoundedPriorityQueue[(K2, V)], 
Array[(K2, V)]] {
+
+  override def zero: BoundedPriorityQueue[(K2, V)] = new 
BoundedPriorityQueue[(K2, V)](num)(ord)
+  override def reduce(
--- End diff --

👍


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17091: [SPARK-19757][CORE] Executor with task scheduled ...

2017-02-27 Thread jxiang
GitHub user jxiang opened a pull request:

https://github.com/apache/spark/pull/17091

[SPARK-19757][CORE] Executor with task scheduled could be killed due to 
idleness

## What changes were proposed in this pull request?
In makeOffers, put in one synchronization block to check if
an executor is alive, and schedule a task to it. So that the
executor won't be killed in the middle

(Please fill in changes proposed in this fix)

## How was this patch tested?
manual tests
(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jxiang/spark spark-19757

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17091.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17091


commit 6b57d7b0d7ffb511e0348ef3bdcf6f1061225984
Author: Jimmy Xiang 
Date:   2017-02-28T00:27:45Z

[CORE] Executor with task scheduled could be killed due to idleness




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17079: [SPARK-19748][SQL]refresh function has a wrong order to ...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/17079
  
good catch! Can you show a real example that fails because of this bug? I'm 
wondering why the existing unit tests didn't expose this bug...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17078: [SPARK-19746][ML] Faster indexing for logistic ag...

2017-02-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17078


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17079: [SPARK-19748][SQL]refresh function has a wrong or...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17079#discussion_r103350023
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 ---
@@ -178,6 +178,34 @@ class FileIndexSuite extends SharedSQLContext {
   assert(catalog2.allFiles().nonEmpty)
 }
   }
+
+  test("refresh for InMemoryFileIndex with FileStatusCache") {
+withTempDir { dir =>
+  val fileStatusCache = FileStatusCache.getOrCreate(spark)
+  val dirPath = new Path(dir.getAbsolutePath)
+  val catalog = new InMemoryFileIndex(spark, Seq(dirPath), Map.empty,
+None, fileStatusCache) {
+def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+  }
+
+  assert(catalog.leafDirPaths.isEmpty)
+  assert(catalog.leafFilePaths.isEmpty)
+
+  val file = new File(dir, "text.txt")
+  stringToFile(file, "text")
+
+  catalog.refresh()
+
+  assert(catalog.leafFilePaths.size == 1)
+  assert(catalog.leafFilePaths.head.toString.stripSuffix("/") ==
+s"file:${file.getAbsolutePath.stripSuffix("/")}")
--- End diff --

this looks hacky, can you turn them into `Path` and compare?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17090: [Spark-19535][ML] RecommendForAllUsers RecommendForAllIt...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17090
  
**[Test build #73540 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73540/testReport)**
 for PR 17090 at commit 
[`707bc6b`](https://github.com/apache/spark/commit/707bc6b153a7f899fbf3fe2a5675cacba1f95711).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17089: [SPARK-19756][SQL] drop the table cache after inserting ...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17089
  
**[Test build #73541 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73541/testReport)**
 for PR 17089 at commit 
[`8bca8d3`](https://github.com/apache/spark/commit/8bca8d35e04e582f73052411e42811a8c90329de).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16959: [SPARK-19631][CORE] OutputCommitCoordinator shoul...

2017-02-27 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16959#discussion_r103348105
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   private type StageId = Int
   private type PartitionId = Int
   private type TaskAttemptNumber = Int
+  private case class StageState(
+  authorizedCommitters: Array[TaskAttemptNumber],
+  failures: mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]])
--- End diff --

Why not define failures as an member variable (and initialize it there with 
an empty map), rather than forcing the caller to pass in an empty map?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103349590
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -248,18 +248,18 @@ class ALSModel private[ml] (
   @Since("1.3.0")
   def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
 
+  private val predict = udf { (userFeatures: Seq[Float], itemFeatures: 
Seq[Float]) =>
+if (userFeatures != null && itemFeatures != null) {
+  blas.sdot(rank, userFeatures.toArray, 1, itemFeatures.toArray, 1)
--- End diff --

I wonder how the overhead of converting to an array compares with the 
efficiency of calling sdot -- could be faster to just do the Seqs by hand? is 
it possible to operate on something besides Seq?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17079: [SPARK-19748][SQL]refresh function has a wrong or...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17079#discussion_r103349639
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 ---
@@ -178,6 +178,34 @@ class FileIndexSuite extends SharedSQLContext {
   assert(catalog2.allFiles().nonEmpty)
 }
   }
+
+  test("refresh for InMemoryFileIndex with FileStatusCache") {
+withTempDir { dir =>
+  val fileStatusCache = FileStatusCache.getOrCreate(spark)
+  val dirPath = new Path(dir.getAbsolutePath)
+  val catalog = new InMemoryFileIndex(spark, Seq(dirPath), Map.empty,
+None, fileStatusCache) {
--- End diff --

nit:
```
val catalog =
  new XXX(...) {
def xxx
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103349429
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.recommendation
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is 
the score value. Finds
+ * the top `num` K2 items based on the given Ordering.
+ */
+
+private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, 
V: TypeTag]
+  (num: Int, ord: Ordering[(K2, V)])
+  extends Aggregator[(K1, K2, V), BoundedPriorityQueue[(K2, V)], 
Array[(K2, V)]] {
+
+  override def zero: BoundedPriorityQueue[(K2, V)] = new 
BoundedPriorityQueue[(K2, V)](num)(ord)
+  override def reduce(
--- End diff --

I think you need to throw some spaces and braces in here to make it a bit 
more readable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103349345
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

but we are still modifying it during iteration, after the `filter`. can you 
be more specific about what the problem is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17075: [SPARK-19727][SQL] Fix for round function that modifies ...

2017-02-27 Thread wojtek-szymanski
Github user wojtek-szymanski commented on the issue:

https://github.com/apache/spark/pull/17075
  
I have just started refactoring of `changePrecission` in order to make it 
immutable. 
My idea was to change the signature from:
 `def changePrecision(precision: Int, scale: Int, mode: Int): Boolean`
into 
 `def changePrecision(precision: Int, scale: Int, mode: Int): 
Option[Decimal]`

 Here are my first thoughts:
- `org.apache.spark.sql.types.Decimal` is mutable by definition, so making 
one method immutable makes its contract very inconsistent

- I am afraid of performance degradation in micro-benchmarks since in some 
use cases, an instance needs to be created twice

- `changePrecission` is called 10 times in Scala, 10 times in **code gen** 
functions and 3 times in Java **unsafe** writers (`UnsafeArrayWriter`, 
`UnsafeRowWriter`)

I would be grateful if you could confirm if it's the right way to go.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
Github user sueann commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103349139
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.recommendation
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is 
the score value. Finds
+ * the top `num` K2 items based on the given Ordering.
+ */
+
+private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, 
V: TypeTag]
--- End diff --

(It'd need its own unit tests, though not sure if we'll get everything in 
for 2.2)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
GitHub user sueann opened a pull request:

https://github.com/apache/spark/pull/17090

[Spark-19535][ML] RecommendForAllUsers RecommendForAllItems for ALS on 
Dataframe 

## What changes were proposed in this pull request?

This is a simple implementation of RecommendForAllUsers & 
RecommendForAllItems for the Dataframe version of ALS. It uses Dataframe 
operations (not a wrapper on the RDD implementation). Haven't benchmarked 
against a wrapper, but unit test examples do work.

## How was this patch tested?

Unit tests
```
$ build/sbt
> mllib/testOnly *ALSSuite -- -z "recommendFor"
> mllib/testOnly
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sueann/spark SPARK-19535

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17090






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17089: [SPARK-19756][SQL] drop the table cache after inserting ...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/17089
  
cc @gatorsmile 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
Github user sueann commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103349080
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.recommendation
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is 
the score value. Finds
+ * the top `num` K2 items based on the given Ordering.
+ */
+
+private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, 
V: TypeTag]
--- End diff --

we may want to put this somewhere more general to be used ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17089: [SPARK-19756][SQL] drop the table cache after ins...

2017-02-27 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/17089

[SPARK-19756][SQL] drop the table cache after inserting into a data source 
table

## What changes were proposed in this pull request?

When we inserting into a table, we should uncache it to avoid exposing 
stale data. This is the existing behavior for hive tables, see 
`InsertIntoHiveTable`, this PR fixes this problem for data source tables.

## How was this patch tested?

new regression test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark minor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17089






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17039: [SPARK-19710][SQL][TESTS] Fix ordering of rows in query ...

2017-02-27 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17039
  
@hvanhovell Is that possible the SQL queries are used to verify the 
behavior of ORDER BY? Do you think we should explicitly leave a comment to say 
SQLQueryTestSuite will not be used for this goal?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...

2017-02-27 Thread thunterdb
Github user thunterdb commented on the issue:

https://github.com/apache/spark/pull/15770
  
Note that any of these formats would cause trouble for a graph with high 
centrality (lady gaga in the twitter graph). That being said, I do not have a 
strong opinion as to which option we pick, in order to move things along.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103348444
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

This kind of collection can't be modified during iterating. Some elements 
are not iterated over if we delete/add elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103347540
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2332,6 +2337,11 @@ class Analyzer(
 override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveExpressions {
   case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
 e.withTimeZone(conf.sessionLocalTimeZone)
+  // Casts could be added in the subquery plan through the rule 
TypeCoercion while coercing
+  // the types between the value expression and list query expression 
of IN expression.
+  // We need to subject the subquery plan through ResolveTimeZone 
again to setup timezone
+  // information for time zone aware expressions.
+  case e: ListQuery => e.withNewPlan(ResolveTimeZone.apply(e.plan))
--- End diff --

@hvanhovell Thank you.. I will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14273: [SPARK-9140] [ML] Replace TimeTracker by MultiStopwatch

2017-02-27 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/14273
  
OK apologies @MechCoder for the delay.  I guess we can close this issue, 
and someone else can open up a PR based on yours.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17012: [SPARK-19677][SS] Renaming a file atop an existing one s...

2017-02-27 Thread hejix
Github user hejix commented on the issue:

https://github.com/apache/spark/pull/17012
  
Just some feedback that I did some initial regression testing with this 
pull request on a full YARN (v2.7.3) 4 node cluster on GCP and it appears to 
have fixed the two issues we had- our structured streaming drivers now restart 
normally and our complex aggregation driver runs for the first time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103346762
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
--- End diff --

@hvanhovell The code here validates the correlated references as well as 
collects them to rewrite the outer plan to record the outer references. You are 
suggesting to move the "check" portion to checkAnalysis ? I will change to use 
foreachUp. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14273: [SPARK-9140] [ML] Replace TimeTracker by MultiStopwatch

2017-02-27 Thread sethah
Github user sethah commented on the issue:

https://github.com/apache/spark/pull/14273
  
@jkbradley I do not think @MechCoder is working on Spark for the time being.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16959: [SPARK-19631][CORE] OutputCommitCoordinator should not a...

2017-02-27 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/16959
  
Looks ok to me, but let me ping some others @squito @kayousterhout 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17081: [SPARK-18726][SQL][FOLLOW-UP]resolveRelation for FileFor...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17081
  
**[Test build #73539 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73539/testReport)**
 for PR 17081 at commit 
[`f1da0a4`](https://github.com/apache/spark/commit/f1da0a4cf457f4efb6128beca3c08ccf95ef37a0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17079: [SPARK-19748][SQL]refresh function has a wrong order to ...

2017-02-27 Thread windpiger
Github user windpiger commented on the issue:

https://github.com/apache/spark/pull/17079
  
cc @cloud-fan @gatorsmile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16965: [SPARK-18450][ML] Scala API Change for LSH AND-am...

2017-02-27 Thread Yunni
Github user Yunni closed the pull request at:

https://github.com/apache/spark/pull/16965


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16965: [SPARK-18450][ML] Scala API Change for LSH AND-amplifica...

2017-02-27 Thread Yunni
Github user Yunni commented on the issue:

https://github.com/apache/spark/pull/16965
  
Looks like the rebase is making it even worse. I will reopen a PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17078: [SPARK-19746][ML] Faster indexing for logistic aggregato...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17078
  
**[Test build #73537 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73537/testReport)**
 for PR 17078 at commit 
[`44ee113`](https://github.com/apache/spark/commit/44ee1137efc5e23ffc6a5bfb8dec54d95e7a72e2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16965: [SPARK-18450][ML] Scala API Change for LSH AND-amplifica...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16965
  
**[Test build #73538 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73538/testReport)**
 for PR 16965 at commit 
[`0b46461`](https://github.com/apache/spark/commit/0b4646199cf061d1f358a78122ef8bdf164ac839).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17078: [SPARK-19746][ML] Faster indexing for logistic ag...

2017-02-27 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/17078#discussion_r103343872
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 ---
@@ -1431,7 +1431,12 @@ private class LogisticAggregator(
   private var weightSum = 0.0
   private var lossSum = 0.0
 
-  private val gradientSumArray = Array.fill[Double](coefficientSize)(0.0D)
+  @transient private lazy val coefficientsArray = bcCoefficients.value 
match {
--- End diff --

Yeah, I'll update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103340921
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
+case In(_, Seq(_: ListQuery)) => true
+case _ => false
+  }.isDefined
+}.isDefined
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = {
+e.transform {
+  case OuterReference(r) => r
+}
+  }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * OuterReference(min(b)) instead of min(OuterReference(b)).
+   * TODO: Currently we don't allow deep correlation. Also, we don't allow 
mixing of
+   * outer references and local references under an aggregate expression.
+   * For example (SQL):
+   * {{{
+   *   SELECT .. FROM p1
+   *   WHERE EXISTS (SELECT ...
+   * FROM p2
+   * WHERE EXISTS (SELECT ...
+   *   FROM sq
+   *   WHERE min(p1.a + p2.b) = sq.c))
+   *
+   *   

<    1   2   3   4   5   6   7   >