[GitHub] spark issue #17640: [SPARK-17608][SPARKR]:Long type has incorrect serializat...

2017-04-14 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/17640
  
Overall, this looks like a sensible approach to a messy problem.
You might want to think about adding some overflow handling to the SQL-->R 
translation. That is, if a Dataframe contains a `bigint` value that cannot be 
expressed as a `Double`, it would be safer to convert that value to NaN instead 
of stripping the lower-order bits off the `bigint`. The `bigint` column in the 
source Dataframe could hold a unique identifier or a hash value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15027: [SPARK-17475] [STREAMING] Delete CRC files if the filesy...

2016-11-02 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15027
  
@viirya to answer your question re deleting vs moving the files: Deleting 
is easier to implement, because once the .crc file is deleted, you can be sure 
it won't appear again. Moving the checksum files would be better but would 
require tracking down other Spark code that does filesystem operations on the 
files later on. These log files are very short by HDFS standards, so the 
checksums aren't providing much value anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15027: [SPARK-17475] [STREAMING] Delete CRC files if the filesy...

2016-10-27 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15027
  
When I comment out line 155 in HDFSMetadataLog.scala on this branch (`if 
(fileManager.exists(crcPath)) fileManager.delete(crcPath)`) and run the test 
case attached to this PR, the test case fails:
```
[freiss@fuzzy]:~/spark/from_git/spark-17475$ build/sbt -Phadoop-2.7 
-Pscala-2.11 "test-only 
org.apache.spark.sql.execution.streaming.HDFSMetadataLogSuite"
...
[info] - HDFSMetadataLog: purge *** FAILED *** (135 milliseconds)
[info]   
Array(/Users/freiss/spark/from_git/spark-17475/target/tmp/spark-682aa8da-3f04-494f-846f-13c97d3e5538/..29ef67f7-1712-4350-8552-1f8bc6424d0b.tmp.crc,
 
/Users/freiss/spark/from_git/spark-17475/target/tmp/spark-682aa8da-3f04-494f-846f-13c97d3e5538/..ab9bafcb-bdf5-4411-9a9b-60d293d653a6.tmp.crc,
 
/Users/freiss/spark/from_git/spark-17475/target/tmp/spark-682aa8da-3f04-494f-846f-13c97d3e5538/..f79fbc34-58c5-4856-a40d-84eef49c8b9e.tmp.crc,
 
/Users/freiss/spark/from_git/spark-17475/target/tmp/spark-682aa8da-3f04-494f-846f-13c97d3e5538/2)
 had size 4 instead of expected size 1 (HDFSMetadataLogSuite.scala:126)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
...
```
Note how the test case failure message contains a list of orphan .crc files 
left in HDFSMetadataLog's temp directory. So, while the filesystem code appears 
on the surface to be immune to this problem, the problem is clearly happening 
in the context of the unit tests. Determining exactly what is going on will 
require more in-depth investigation. Depending on the true root cause, it's 
possible that this problem also occurs in some distributed settings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15162: [SPARK-17386] [STREAMING] [WIP] Make polling rate adapti...

2016-10-26 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15162
  
Closing the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15162: [SPARK-17386] [STREAMING] [WIP] Make polling rate...

2016-10-26 Thread frreiss
Github user frreiss closed the pull request at:

https://github.com/apache/spark/pull/15162


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source trait ...

2016-10-26 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14553
  
Updated the branch and addressed new review comments. Looks like my last 
push missed a one-line change to memory.scala. Tests are running now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...

2016-10-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r85227714
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -111,6 +126,23 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   }
   }
 
+  override def commit(end: Offset): Unit = synchronized {
+if (end.isInstanceOf[LongOffset]) {
+  val newOffset = end.asInstanceOf[LongOffset]
+  val offsetDiff = (newOffset.offset - 
lastOffsetCommitted.offset).toInt
+
+  if (offsetDiff < 0) {
+sys.error(s"Offsets committed out of order: $lastOffsetCommitted 
followed by $end")
+  }
+
+  batches.trimStart(offsetDiff)
+  lastOffsetCommitted = newOffset
+} else {
+  sys.error(s"MemoryStream.commit() received an offset ($end) that did 
not originate with " +
+s"an instance of this class")
--- End diff --

Corrected in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...

2016-10-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r85227658
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -111,6 +126,23 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   }
   }
 
+  override def commit(end: Offset): Unit = synchronized {
+if (end.isInstanceOf[LongOffset]) {
--- End diff --

Corrected in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...

2016-10-21 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r84569335
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -337,17 +343,27 @@ class StreamExecution(
 }
 if (hasNewData) {
   reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
-assert(
-  offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
+assert(offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
   s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId")
 logInfo(s"Committed offsets for batch $currentBatchId.")
 
+// NOTE: The following code is correct because runBatches() 
processes exactly one
+// batch at a time. If we add pipeline parallelism (multiple 
batches in flight at
+// the same time), this cleanup logic will need to change.
+
+// Now that we've updated the scheduler's persistent checkpoint, 
it is safe for the
+// sources to discard data from the previous batch.
+val prevBatchOff = offsetLog.get(currentBatchId - 1)
+if (prevBatchOff.isDefined) {
+  prevBatchOff.get.toStreamProgress(sources).foreach {
+case (src, off) => src.commit(off)
+  }
+}
+
 // Now that we have logged the new batch, no further processing 
will happen for
-// the previous batch, and it is safe to discard the old metadata.
-// Note that purge is exclusive, i.e. it purges everything before 
currentBatchId.
-// NOTE: If StreamExecution implements pipeline parallelism 
(multiple batches in
-// flight at the same time), this cleanup logic will need to 
change.
-offsetLog.purge(currentBatchId)
+// the batch before the previous batch, and it is safe to discard 
the old metadata.
+// Note that purge is exclusive, i.e. it purges everything before 
the target ID.
+offsetLog.purge(currentBatchId - 1)
--- End diff --

I can move this change to another JIRA if you'd like, but we really should 
change `currentBatchId` to `currentBatchId - 1` at some point. The call to 
`offsetLog.purge(currentBatchId)`, which I introduced in my PR for SPARK-17513, 
contains a subtle bug. The recovery logic in `populateStartOffsets()` reads the 
last and second-to-last entries in `offsetLog`. `populateStartOffsets()` uses 
those entries to populate `availableOffsets` and `committedOffsets`, 
respectively. Calling `offsetLog.purge(currentBatchId)` at line 350/366 results 
in the `offsetLog` being truncated to one entry, which in turn results in 
`committedOffsets` being left empty on recovery, which in turn causes the first 
call to `getBatch()` for any source to have `None` as its first argument. 
Sources that do not prune buffered data in their `commit()` methods will return 
a previously committed data in response to such a `getBatch()` call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...

2016-10-21 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r84539662
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -336,17 +342,27 @@ class StreamExecution(
 }
 if (hasNewData) {
   reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
-assert(
-  offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
+assert(offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
   s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId")
 logInfo(s"Committed offsets for batch $currentBatchId.")
 
+// Now that we've updated the scheduler's persistent checkpoint, 
it is safe for the
+// sources to discard data from *before* the previous batch.
+// The scheduler might still request the previous batch from a 
source in some cases
+// if a crash and recovery occured.
+val prevBatchOff = offsetLog.get(currentBatchId - 2)
--- End diff --

Yes, `currentBatchId - 1` does work here, and I've updated my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...

2016-10-21 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r84538526
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -336,17 +342,27 @@ class StreamExecution(
 }
 if (hasNewData) {
   reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
-assert(
-  offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
+assert(offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
   s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId")
 logInfo(s"Committed offsets for batch $currentBatchId.")
 
+// Now that we've updated the scheduler's persistent checkpoint, 
it is safe for the
+// sources to discard data from *before* the previous batch.
+// The scheduler might still request the previous batch from a 
source in some cases
+// if a crash and recovery occured.
--- End diff --

Fixed in my local copy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source trait ...

2016-10-21 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14553
  
I've been running tests since this morning; should have updates in soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...

2016-10-19 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r84138507
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala 
---
@@ -30,16 +30,30 @@ trait Source  {
   /** Returns the schema of the data from this source */
   def schema: StructType
 
-  /** Returns the maximum available offset for this source. */
+  /**
+   * Returns the maximum available offset for this source.
+   * Returns `None` if this source has never received any data.
+   */
   def getOffset: Option[Offset]
 
   /**
-   * Returns the data that is between the offsets (`start`, `end`]. When 
`start` is `None` then
-   * the batch should begin with the first available record. This method 
must always return the
-   * same data for a particular `start` and `end` pair.
+   * Returns the data that is between the offsets (`start`, `end`]. When 
`start` is `None`,
+   * then the batch should begin with the first record. This method must 
always return the
+   * same data for a particular `start` and `end` pair; even after the 
Source has been restarted
+   * on a different node.
+   *
+   * Higher layers will always call this method with a value of `start` 
greater than or equal
+   * to the last value passed to `commit` and a value of `end` less than 
or equal to the
+   * last value returned by `getMaxOffset`
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source trait ...

2016-10-17 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14553
  
All my changes are in now, and regression tests pass. As far as I can see, 
all the review comments have been addressed at this point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source trait ...

2016-10-14 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14553
  
Sorry, I missed the last few email notifications about this PR. I've merged 
with the head version and made updates to address the most recent round of 
review comments. Currently running regression tests locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...

2016-10-14 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r83524491
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -92,21 +105,64 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
   else TextSocketSource.SCHEMA_REGULAR
 
-  /** Returns the maximum available offset for this source. */
+  override def lastCommittedOffset: Option[Offset] = synchronized {
+if (lastOffsetCommitted.offset == -1) {
+  None
+} else {
+  Some(lastOffsetCommitted)
+}
+  }
+
   override def getOffset: Option[Offset] = synchronized {
-if (lines.isEmpty) None else Some(LongOffset(lines.size - 1))
+if (currentOffset.offset == -1) {
+  None
+} else {
+  Some(currentOffset)
+}
   }
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 
1).getOrElse(0)
-val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
-val data = synchronized { lines.slice(startIdx, endIdx) }
+val startOrdinal =
+  
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
+val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+
+// Internal buffer only holds the batches after lastOffsetCommitted
+val rawList = synchronized {
+  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
+  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+  batches.slice(sliceStart, sliceEnd)
+}
+
 import sqlContext.implicits._
+val rawBatch = sqlContext.createDataset(rawList)
+
+
+
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...

2016-10-14 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r83524216
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala 
---
@@ -30,16 +30,37 @@ trait Source  {
   /** Returns the schema of the data from this source */
   def schema: StructType
 
-  /** Returns the maximum available offset for this source. */
+  /**
+   * Returns the highest offset that this source has removed from 
its internal buffer
+   * in response to a call to `commit`.
+   * Returns `None` if this source has not removed any data.
+   */
+  def lastCommittedOffset: Option[Offset] = (None)
+
+  /**
+   * Returns the maximum available offset for this source.
+   * Returns `None` if this source has never received any data.
+   */
   def getOffset: Option[Offset]
 
   /**
-   * Returns the data that is between the offsets (`start`, `end`]. When 
`start` is `None` then
-   * the batch should begin with the first available record. This method 
must always return the
-   * same data for a particular `start` and `end` pair.
+   * Returns the data that is between the offsets (`start`, `end`]. When 
`start` is `None`,
+   * then the batch should begin with the first record. This method must 
always return the
+   * same data for a particular `start` and `end` pair; even after the 
Source has been restarted
+   * on a different node.
+   * 
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15352: [SPARK-17780][SQL]Report Throwable to user in Str...

2016-10-05 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15352#discussion_r82085912
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -207,13 +207,18 @@ class StreamExecution(
   })
 } catch {
   case _: InterruptedException if state == TERMINATED => // 
interrupted by stop()
-  case NonFatal(e) =>
+  case e: Throwable =>
 streamDeathCause = new StreamingQueryException(
   this,
   s"Query $name terminated with exception: ${e.getMessage}",
   e,
   Some(committedOffsets.toCompositeOffset(sources)))
 logError(s"Query $name terminated with error", e)
+// Rethrow the fatal errors to allow the user using 
`Thread.UncaughtExceptionHandler` to
+// handle them
+if (!NonFatal(e)) {
+  throw e
--- End diff --

`StreamExecution.stop()` won't have been called if the microbatch thread 
arrives at this location in the code, and the user likely won't call it after 
seeing the exception. You should probably call the `stop` method on each of the 
sources before exiting the microbatch thread here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81684775
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -136,16 +139,30 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
+  override def queryStatus: StreamingQueryInfo = {
+this.toInfo
+  }
+
   /** Returns current status of all the sources. */
   override def sourceStatuses: Array[SourceStatus] = {
 val localAvailableOffsets = availableOffsets
 sources.map(s =>
-  new SourceStatus(s.toString, 
localAvailableOffsets.get(s).map(_.toString))).toArray
+  new SourceStatus(
--- End diff --

Actually, you can probably drop most of the synchronization if you keep two 
`StreamMetrics` objects and preallocate the slots for counters. At least the 
way things are now, each counter in `StreamMetrics` is written once per batch. 
If you tweak `sourceStatuses()` to return the metrics from the most recent 
completed batch (i.e. the `StreamMetrics` object that's not currently being 
written to), there should be no overlap between readers and writers. Eventually 
you'll want to have more than one `StreamMetrics` object anyway, since the 
scheduler will need to pipeline multiple batches to reach latencies below the 
50-100ms level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81672432
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -317,15 +358,18 @@ class StreamExecution(
 // TODO: Move this to IncrementalExecution.
 
 // Request unprocessed data from all sources.
-val newData = availableOffsets.flatMap {
-  case (source, available)
+val newData = timeIt(GET_BATCH_LATENCY) {
--- End diff --

Note that the time interval being measured here will have different 
semantics for different sources, depending on how much computation occurs 
inside the source's `getBatch` method vs. lazily when the data is read from the 
resulting Dataframe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81672040
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -136,16 +139,30 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
+  override def queryStatus: StreamingQueryInfo = {
+this.toInfo
+  }
+
   /** Returns current status of all the sources. */
   override def sourceStatuses: Array[SourceStatus] = {
 val localAvailableOffsets = availableOffsets
 sources.map(s =>
-  new SourceStatus(s.toString, 
localAvailableOffsets.get(s).map(_.toString))).toArray
+  new SourceStatus(
--- End diff --

If this method is intended to be called from threads other than the 
scheduler thread, then the entire map really ought to be synchronized on 
`streamMetrics`'s lock. Otherwise this method could return a mixture of 
statistics from different points of time, even within a single source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81668841
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -56,7 +57,12 @@ case class StateStoreRestoreExec(
 child: SparkPlan)
   extends execution.UnaryExecNode with StatefulOperator {
 
+  override lazy val metrics = Map(
+"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
--- End diff --

The metric names should probably be in a separate, centralized list of 
constants. Users will want a single place in the API docs to find a list of all 
available metrics, and the list is likely to change quite frequently as 
Structured Streaming evolves.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15262: [SPARK-17690][STREAMING][SQL] Add mini-dfs cluster based...

2016-09-28 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15262
  
LGTM overall. We may want to switch more of the test cases to use HDFS in a 
follow-on JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15258: [SPARK-17689][SQL][STREAMING] added excludeFiles option ...

2016-09-27 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15258
  
This change allows FileInputStream to consume partial outputs of a system 
such as Hadoop or another copy of Spark, provided that the system adheres 
rigidly to the write policy of recent versions of Hadoop. That is: First, write 
to a temporary file. Then close and flush the temporary file. Then rename the 
temporary file, using one of the newer, atomic HDFS APIs for renaming files. I 
worry that users might write data in a subtly different way that does not 
follow this procedure 100%, which could result in Spark reading incorrect data 
every once in a while. I recommend documenting under exactly what conditions 
the "ignore temporary files" option guarantees correct behavior.

Also, it would be a good idea to include a mode in which FileInputStream 
will ignore a directory of files until the special file _SUCCESS appears, 
indicating that the directory is complete. Otherwise, Spark could end up 
consuming partial results from failed jobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15258: [SPARK-17689][SQL][STREAMING] added excludeFiles ...

2016-09-27 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15258#discussion_r80838376
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -50,6 +50,19 @@ class ListingFileCatalog(
 
   refresh()
 
+  /**
+   * Often HDFS create temporary files while copying to a new directory or 
writing new content.
+   * These files are unintentionally picked up by streaming - causing job 
failures. This option lets
+   * HDFS skip these files matching the configured regex-patterns from 
being picked up by Streaming
+   * Job.
+   */
+  private lazy val excludeFiles: Set[String] = parameters
+.getOrElse("excludeFiles", ".*._COPYING_,_temporary").split(",").toSet
+
+  private def isExcludedFile(path: Path): Boolean = {
+excludeFiles.map(path.getName.matches).fold(false)(_ || _)
--- End diff --

Probably better to precompile a regex here, as this method will be called a 
lot (100X per second X number of files under the root directory, by default)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15262: [SPARK-17690][STREAMING][SQL] Add mini-dfs cluste...

2016-09-27 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15262#discussion_r80826485
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -330,15 +353,42 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
   val filtered = textStream.filter($"value" contains "keep")
 
   testStream(filtered)(
-AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+AddTextLocalFileData("drop1\nkeep2\nkeep3", src, tmp),
+CheckAnswer("keep2", "keep3"),
+StopStream,
+AddTextLocalFileData("drop4\nkeep5\nkeep6", src, tmp),
+StartStream(),
+CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+AddTextLocalFileData("drop7\nkeep8\nkeep9", src, tmp),
+CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+  )
+}
+  }
+
+  test("read from text files using hdfs") {
+withTempDirs { case (_src, tmp) =>
+  // Create a mini dfs cluster.
+  System.clearProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA)
+  val conf = new HdfsConfiguration()
+  conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmp.getAbsolutePath)
+  val cluster = new MiniDFSCluster.Builder(conf).build()
+  val hdfsHomeDirectory: Path = cluster.getFileSystem.getHomeDirectory
+  cluster.getFileSystem.mkdirs(hdfsHomeDirectory)
+  cluster.waitClusterUp()
+  val textStream = createFileStream("text", hdfsHomeDirectory.toString)
+  val filtered = textStream.filter($"value" contains "keep")
+  val src = hdfsHomeDirectory
+  testStream(filtered)(
+AddTextHDFSFileData("drop1\nkeep2\nkeep3", src, tmp, conf),
 CheckAnswer("keep2", "keep3"),
 StopStream,
-AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+AddTextHDFSFileData("drop4\nkeep5\nkeep6", src, tmp, conf),
 StartStream(),
 CheckAnswer("keep2", "keep3", "keep5", "keep6"),
-AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+AddTextHDFSFileData("drop7\nkeep8\nkeep9", src, tmp, conf),
 CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
   )
+cluster.shutdown()
--- End diff --

You'll probably want to put this cleanup code somewhere where it will be 
called even if another part of the test case crashes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...

2016-09-23 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15005
  
Thanks @srowen for all the thoughtful comments! It's great to see 
committers spending time to help improve the build experience for new 
developers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15005: [SPARK-17421] [DOCS] Documenting the current trea...

2016-09-21 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15005#discussion_r79902326
  
--- Diff: docs/building-spark.md ---
@@ -16,24 +16,32 @@ Building Spark using Maven requires Maven 3.3.9 or 
newer and Java 7+.
 
 ### Setting up Maven's Memory Usage
 
-You'll need to configure Maven to use more memory than usual by setting 
`MAVEN_OPTS`. We recommend the following settings:
+You'll need to configure Maven to use more memory than usual by setting 
`MAVEN_OPTS`:
 
-export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M 
-XX:ReservedCodeCacheSize=512m"
+export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
 
-If you don't run this, you may see errors like the following:
+When compiling with Java 7, you will need to add the additional option 
"-XX:MaxPermSize=512M" to MAVEN_OPTS.
+
+If you don't add these parameters to `MAVEN_OPTS`, you may see errors and 
warnings like the following:
 
 [INFO] Compiling 203 Scala sources and 9 Java sources to 
/Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
 [ERROR] PermGen space -> [Help 1]
 
 [INFO] Compiling 203 Scala sources and 9 Java sources to 
/Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
 [ERROR] Java heap space -> [Help 1]
 
-You can fix this by setting the `MAVEN_OPTS` variable as discussed before.
+[INFO] Compiling 233 Scala sources and 41 Java sources to 
/Users/me/Development/spark/sql/core/target/scala-{site.SCALA_BINARY_VERSION}/classes...
+OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been 
disabled.
+OpenJDK 64-Bit Server VM warning: Try increasing the code cache size 
using -XX:ReservedCodeCacheSize=
+
+You can fix these problems by setting the `MAVEN_OPTS` variable as 
discussed before.
 
 **Note:**
 
-* For Java 8 and above this step is not required.
-* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automate 
this for you.
+* If using `build/mvn` with no `MAVEN_OPTS` set, the script will 
automatically add the above options for Java 7 to the `MAVEN_OPTS` environment 
variable.
--- End diff --

What I meant to say was that build/mvn adds all three options, but I can 
see how the statement here can interpreted differently. Checking in an edited 
version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15005: [SPARK-17421] [DOCS] Documenting the current trea...

2016-09-21 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15005#discussion_r79766509
  
--- Diff: docs/building-spark.md ---
@@ -16,24 +16,27 @@ Building Spark using Maven requires Maven 3.3.9 or 
newer and Java 7+.
 
 ### Setting up Maven's Memory Usage
 
-You'll need to configure Maven to use more memory than usual by setting 
`MAVEN_OPTS`. We recommend the following settings:
+If you are compiling with Java 7, you'll need to configure Maven to use 
more memory than usual by setting `MAVEN_OPTS`:
--- End diff --

Leaving out `-XX:ReservedCodeCacheSize` caused a warning in my tests:
```OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been 
disabled.
OpenJDK 64-Bit Server VM warning: Try increasing the code cache size using 
-XX:ReservedCodeCacheSize=```
Left that option in and reworded the beginning of this section as you 
suggested.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15005: [SPARK-17421] [DOCS] Documenting the current trea...

2016-09-21 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15005#discussion_r79765892
  
--- Diff: docs/building-spark.md ---
@@ -16,24 +16,31 @@ Building Spark using Maven requires Maven 3.3.9 or 
newer and Java 7+.
 
 ### Setting up Maven's Memory Usage
 
-You'll need to configure Maven to use more memory than usual by setting 
`MAVEN_OPTS`. We recommend the following settings:
+If you are compiling with Java 7, you'll need to configure Maven to use 
more memory than usual by setting `MAVEN_OPTS`:
 
 export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M 
-XX:ReservedCodeCacheSize=512m"
 
-If you don't run this, you may see errors like the following:
+When compiling with Java 8, a similar but smaller set of memory parameters 
is necessary:
+
+export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
+
+If you don't add these parameters to `MAVEN_OPTS`, you may see errors like 
the following:
 
 [INFO] Compiling 203 Scala sources and 9 Java sources to 
/Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
 [ERROR] PermGen space -> [Help 1]
 
 [INFO] Compiling 203 Scala sources and 9 Java sources to 
/Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
 [ERROR] Java heap space -> [Help 1]
 
-You can fix this by setting the `MAVEN_OPTS` variable as discussed before.
+You can fix these problems by setting the `MAVEN_OPTS` variable as 
discussed before.
 
 **Note:**
 
-* For Java 8 and above this step is not required.
-* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automate 
this for you.
+* This step is not needed for Java 8.
+* If using `build/mvn` with no `MAVEN_OPTS` set, the script will 
automatically add the above options for Java 7 to the `MAVEN_OPTS` environment 
variable.
+* The `test` phase of the Spark build will automatically add these options 
to `MAVEN_OPTS`, even when not using `build/mvn`.
+* You may see warnings like "ignoring option MaxPermSize=1g; support was 
removed in 8.0" when building or running tests with Java 8. These warnings are 
harmless.
--- End diff --

Developers will see the warning when compiling with `build/mvn`. Updating 
to clarify that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...

2016-09-21 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15005
  
Summary of testing:
- On Java 8, the build fails intermittently with OOM when `-Xmx2g` is 
omitted
- The `-XX:ReservedCodeCacheSize=512m` argument prevents warnings on both 
Java 7 and OpenJDK 8
- `-XX:ReservedCodeCacheSize=512m` has no effect on IBM Java 8


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15166: [SPARK-17513][SQL] Make StreamExecution garbage-c...

2016-09-20 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15166#discussion_r79730904
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
 )
   }
 
+  testQuietly("StreamExecution metadata garbage collection") {
+val inputData = MemoryStream[Int]
+val mapped = inputData.toDS().map(6 / _)
+
+// Run 3 batches, and then assert that only 1 metadata file is left at 
the end
+// since the first 2 should have been purged.
+testStream(mapped)(
+  AddData(inputData, 1, 2),
+  CheckAnswer(6, 3),
+  AddData(inputData, 1, 2),
+  CheckAnswer(6, 3, 6, 3),
+  AddData(inputData, 4, 6),
+  CheckAnswer(6, 3, 6, 3, 1, 1),
+
+  AssertOnQuery("metadata log should contain only one file") { q =>
+val metadataLogDir = new 
java.io.File(q.offsetLog.metadataPath.toString)
+val logFileNames = 
metadataLogDir.listFiles().toSeq.map(_.getName())
+val toTest = logFileNames.filter(! _.endsWith(".crc"))  // 
Workaround for SPARK-17475
--- End diff --

Either way is fine with me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15166: [SPARK-17513][SQL] Make StreamExecution garbage-c...

2016-09-20 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15166#discussion_r79730664
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
 )
   }
 
+  testQuietly("StreamExecution metadata garbage collection") {
+val inputData = MemoryStream[Int]
+val mapped = inputData.toDS().map(6 / _)
+
+// Run 3 batches, and then assert that only 1 metadata file is left at 
the end
+// since the first 2 should have been purged.
+testStream(mapped)(
+  AddData(inputData, 1, 2),
+  CheckAnswer(6, 3),
+  AddData(inputData, 1, 2),
+  CheckAnswer(6, 3, 6, 3),
+  AddData(inputData, 4, 6),
+  CheckAnswer(6, 3, 6, 3, 1, 1),
+
+  AssertOnQuery("metadata log should contain only one file") { q =>
+val metadataLogDir = new 
java.io.File(q.offsetLog.metadataPath.toString)
+val logFileNames = 
metadataLogDir.listFiles().toSeq.map(_.getName())
+val toTest = logFileNames.filter(! _.endsWith(".crc"))  // 
Workaround for SPARK-17475
+assert(toTest.size == 1 && toTest.head == "2")
+true
--- End diff --

Ah, yes. The previous like (146) should be just `toTest.size == 1 && 
toTest.head == "2"`, with no "assert".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15166: [SPARK-17513][SQL] Make StreamExecution garbage-c...

2016-09-20 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15166#discussion_r79722643
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
 )
   }
 
+  testQuietly("StreamExecution metadata garbage collection") {
+val inputData = MemoryStream[Int]
+val mapped = inputData.toDS().map(6 / _)
+
+// Run 3 batches, and then assert that only 1 metadata file is left at 
the end
+// since the first 2 should have been purged.
+testStream(mapped)(
+  AddData(inputData, 1, 2),
+  CheckAnswer(6, 3),
+  AddData(inputData, 1, 2),
+  CheckAnswer(6, 3, 6, 3),
+  AddData(inputData, 4, 6),
+  CheckAnswer(6, 3, 6, 3, 1, 1),
+
+  AssertOnQuery("metadata log should contain only one file") { q =>
+val metadataLogDir = new 
java.io.File(q.offsetLog.metadataPath.toString)
+val logFileNames = 
metadataLogDir.listFiles().toSeq.map(_.getName())
+val toTest = logFileNames.filter(! _.endsWith(".crc"))  // 
Workaround for SPARK-17475
+assert(toTest.size == 1 && toTest.head == "2")
+true
--- End diff --

This line ("true") shouldn't be here. It makes the Assert always pass, even 
when the condition on the previous line isn't satisfied.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15162: [SPARK-17386] [STREAMING] [WIP] Make polling rate...

2016-09-20 Thread frreiss
GitHub user frreiss opened a pull request:

https://github.com/apache/spark/pull/15162

[SPARK-17386] [STREAMING] [WIP] Make polling rate adaptive

## What changes were proposed in this pull request?

This change makes the scheduler in `StreamExecution` adjust its rate of 
polling to the rate of data arrival. I used the 
[AIMD](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease) 
algorithm, aka TCP congestion avoidance, because it is simple. I renamed
`spark.sql.streaming.pollingDelay` to `spark.sql.streaming.minPollingDelay` 
and added a second parameter `spark.sql.streaming.maxPollingDelay` to serve as 
an upper bound. This upper bound is necessary because the data arrival rate 
could be bursty, with infinite variance.

This approach works, but I'm not completely satisfied with the current 
design. The blocking-system-call part of polling really ought to be delegated 
to the sources themselves, so that the main scheduler thread won't be blocked 
for unpredictable amounts of time. Also, the polling rate ought to be adaptive 
on a per-source basis, instead of there being a single global rate per session. 
Leaving this PR marked as WIP for now.

## How was this patch tested?

I tested for three scenarios:
* If data is not present, the scheduler ramps up the polling delay quickly 
to `spark.sql.streaming.maxPollingDelay`
* If data arrives quickly, the scheduler keeps its polling delay pinned at 
`spark.sql.streaming.minPollingDelay`
* If new batches arrive at a constant interval between the minimum and 
maximum delay, the scheduler keeps its polling delay within a factor of two of 
the actual arrival rate.

The testing code is included in this PR. Note that the test cases for the 
two latter cases are disabled by default, because they are somewhat timing 
dependent.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/frreiss/spark-fred fred-17386a

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15162.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15162


commit 54a8272f1810514ec0a1e92e222359f505c5e58b
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-10T05:15:22Z

Made polling rate adaptive.

commit 24a1f74ff5c401e39858d887212fe4acc5b17bca
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-14T22:04:19Z

Added test case and corrected some bugs.

commit 9b44439d784e51ed75f4476b4432d13155dcdfb0
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-14T22:26:59Z

Merge branch 'master' of https://github.com/apache/spark into fred-17386a

commit 4439d07e79ca48b5037ecb50017013c75172f6af
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-20T16:09:29Z

Merge branch 'master' of https://github.com/apache/spark into fred-17386a




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15067: [SPARK-17513] [STREAMING] [SQL] Make StreamExecut...

2016-09-20 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15067#discussion_r79662093
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -125,6 +125,32 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
 )
   }
 
+  testQuietly("StreamExecution metadata garbarge collection") {
+val inputData = MemoryStream[Int]
+val mapped = inputData.toDS().map(6 / _)
+
+// Run a few batches through the application
+testStream(mapped)(
+  AddData(inputData, 1, 2),
+  CheckAnswer(6, 3),
+  AddData(inputData, 1, 2),
+  CheckAnswer(6, 3, 6, 3),
+  AddData(inputData, 4, 6),
+  CheckAnswer(6, 3, 6, 3, 1, 1),
+
+  // Three batches have run, but only one set of metadata should be 
present
+  AssertOnQuery(
+q => {
+  val metadataLogDir = new 
java.io.File(q.offsetLog.metadataPath.toString)
+  val logFileNames = 
metadataLogDir.listFiles().toSeq.map(_.getName())
+  val toTest = logFileNames.filter(! _.endsWith(".crc")) // 
Workaround for SPARK-17475
+  toTest.size == 1 && toTest.head == "2"
--- End diff --

Oops, didn't notice I had left that "true" at the end of that block of 
code. @petermaxlee , let me know if you need help preparing the final version 
for merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15067: [SPARK-17513] [STREAMING] [SQL] Make StreamExecut...

2016-09-20 Thread frreiss
Github user frreiss closed the pull request at:

https://github.com/apache/spark/pull/15067


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...

2016-09-20 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15005
  
I've about narrowed down the options that work for OpenJDK 7 and 8 on Mac 
and Linux. Working on IBM Java on Linux. I can have an update in by EOD today.
BTW, one thing that's been slowing me down is that my local copies get 
"corrupted" after a while. Some of the tests in SparkSubmitSuite stop working 
until I delete and re-clone my local copy of Spark. I suspect there may be a 
race condition involving some directory that doesn't get cleaned by the "clean" 
target.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...

2016-09-14 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15005
  
Quick update: I'm running a series of test builds with various parameters 
to determine what parts of MAVEN_OPTS are currently necessary on different 
versions of Java. Will report back in a few days and update this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13513: [SPARK-15698][SQL][Streaming] Add the ability to remove ...

2016-09-13 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/13513
  
Ah, now I fully understand @zsxwing's earlier comment about the semantics 
of the semantics of `Source.getBatch()`. Those semantics have a design flaw; 
see the email thread I started at 
http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-tt18551.html.
 Basically, it's impossible to implement a Source to the written API spec 
without keeping unbounded state. I have an open PR to fix this problem at 
https://github.com/apache/spark/pull/14553.

In the short run, I think that @jerryshao's changes here are ok with 
respect to `Source.getBatch`. The approach in this PR will work as long as the 
internal structure of the `StreamExecution` class doesn't change and as long as 
Spark does not have to recover from an outage longer than the compaction 
interval. The recent changes to `FileInputStream` under SPARK-17165 
(https://github.com/apache/spark/pull/14728) have the same problem, and those 
changes are already committed.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15067: [SPARK-17513] [STREAMING] [SQL] Make StreamExecut...

2016-09-12 Thread frreiss
GitHub user frreiss opened a pull request:

https://github.com/apache/spark/pull/15067

[SPARK-17513] [STREAMING] [SQL] Make StreamExecution garbage-collect its 
metadata

## What changes were proposed in this pull request?

This PR modifies StreamExecution such that it discards metadata for batches 
that have already been fully processed. I used the `purge` method that was 
added as part of SPARK-17235.

## How was this patch tested?

I added a test case to verify that old metadata log files are correctly 
purged. 
I also ran the entire regression suite.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/frreiss/spark-fred fred-16963a

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15067.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15067


commit 8cc5e835b209d5796b044978ec4221ee22a8b9d2
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-08T19:59:15Z

Added purge() call to scheduler

commit d71366d958334ebbc81e45c7f469bad2a68d0a2d
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-10T04:23:58Z

Added test case and corrected off-by-one error.

commit 82f5b681c2e8e52f8549b21c7d058c497f2fc809
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-10T04:24:35Z

Merge branch 'master' of https://github.com/apache/spark into fred-16963a




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13513: [SPARK-15698][SQL][Streaming] Add the ability to remove ...

2016-09-12 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/13513
  
You could just move the metadata deletion logic from FileStreamSinkLog into 
CompactibleFileStreamLog. Then FileStreamSource could issue DELETE log records 
for files that are older than `FileStreamSource.lastPurgeTimestamp`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15027: [SPARK-17475] [STREAMING] Delete CRC files if the...

2016-09-12 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15027#discussion_r78469982
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -146,6 +146,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
   // It will fail if there is an existing file (someone has 
committed the batch)
   logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
   fileManager.rename(tempPath, batchIdToPath(batchId))
+
+  // SPARK-17475: HDFSMetadataLog should not leak CRC files
+  // If the underlying filesystem didn't rename the CRC file, 
delete it.
--- End diff --

I believe HDFSMetadataLog is only called from Structured Streaming classes 
currently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...

2016-09-09 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/15005
  
Sure, I'll redo that part so that includes two sets of recommended options. 
Note that docs in the Spark 2.0.0 release say that these options aren't 
necessary for Java 8.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15027: [SPARK-17475] [STREAMING] Delete CRC files if the...

2016-09-09 Thread frreiss
GitHub user frreiss opened a pull request:

https://github.com/apache/spark/pull/15027

[SPARK-17475] [STREAMING] Delete CRC files if the filesystem doesn't use 
checksum files

## What changes were proposed in this pull request?

When the metadata logs for various parts of Structured Streaming are stored 
on non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves 
hidden HDFS-style checksum (CRC) files in the log directory, one file per 
batch. This PR modifies HDFSMetadataLog so that it detects the use of a 
filesystem that doesn't use CRC files and removes the CRC files.

## How was this patch tested?

Modified an existing test case in HDFSMetadataLogSuite to check whether 
HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem.  Ran 
the entire regression suite.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/frreiss/spark-fred fred-17475

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15027.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15027


commit 3a2a9c116659f526189de6b8b98fb6c92024a7a6
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-09T17:30:08Z

Delete CRC files when the filesystem doesn't support checksums.

commit 9ff89c0228c09764fa6444528050a35e823db0e6
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-09T17:30:52Z

Merge branch 'master' of https://github.com/apache/spark into fred-17475




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14945: [SPARK-17386] Set default trigger interval to 1/1...

2016-09-07 Thread frreiss
Github user frreiss closed the pull request at:

https://github.com/apache/spark/pull/14945


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14945: [SPARK-17386] Set default trigger interval to 1/10 secon...

2016-09-07 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14945
  
On a closer reading of the code, there is a more expedient fix; change the 
default STREAMING_POLLING_DELAY parameter. Will redo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15005: [SPARK-17421] Documenting the current treatment o...

2016-09-07 Thread frreiss
GitHub user frreiss opened a pull request:

https://github.com/apache/spark/pull/15005

[SPARK-17421] Documenting the current treatment of MAVEN_OPTS.

## What changes were proposed in this pull request?

Modified the documentation to clarify that `build/mvn` and `pom.xml` always 
add Java 7-specific parameters to `MAVEN_OPTS`, and that developers can safely 
ignore warnings about `-XX:MaxPermSize` that may result from compiling or 
running tests with Java 8.

## How was this patch tested?

Rebuilt HTML documentation, made sure that building-spark.html displays 
correctly in a browser.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/frreiss/spark-fred fred-17421a

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15005.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15005


commit cfe0f69676cc57922ed2076435a10e6f2355a540
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-07T22:43:00Z

Documenting the current treatment of MAVEN_OPTS.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14986: [WIP] [SPARK-17421] Don't use -XX:MaxPermSize opt...

2016-09-07 Thread frreiss
Github user frreiss closed the pull request at:

https://github.com/apache/spark/pull/14986


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14986: [WIP] [SPARK-17421] Don't use -XX:MaxPermSize option whe...

2016-09-07 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14986
  
Make sense. I will close this PR and just add a clarification to the 
documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14986: [WIP] [SPARK-17421] Don't use -XX:MaxPermSize opt...

2016-09-06 Thread frreiss
GitHub user frreiss opened a pull request:

https://github.com/apache/spark/pull/14986

[WIP] [SPARK-17421] Don't use -XX:MaxPermSize option when Java version >= 8

## What changes were proposed in this pull request?

Modifies the `build/mvn` and `build/sbt-launch-lib.bash` scripts so that 
they check the Java version and omit the `-XX:MaxPermSize=512M` option for Java 
versions >= 8.

## How was this patch tested?

Ran the build on Linux and Mac with Java 7 and 8, from `build/mvn` and 
`dev/run-tests`. Also tested Java 8 with `JAVA_7_HOME` pointing to a Java 7 
installation.

Currently, `dev/run-tests` has a few remaining instances of the MaxPermSize 
warning, apparently due to another code path to spawning a JVM. Keeping this PR 
as a WIP until the output of `dev/run-tests` is completely clean.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/frreiss/spark-fred fred-17421

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14986.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14986


commit c9beadd02c22c7a84b521003fdebee0ba45bd286
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-06T22:19:44Z

Added logic to build/mvn to check Java version.

commit 0955947dcf83c90473f4f77b19a0559558bba80c
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-06T22:37:47Z

Made sbt-launch-lib.bash check for Java >7




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14945: [SPARK-17386] Set default trigger interval to 1/1...

2016-09-02 Thread frreiss
GitHub user frreiss opened a pull request:

https://github.com/apache/spark/pull/14945

[SPARK-17386] Set default trigger interval to 1/10 second

## What changes were proposed in this pull request?

This pull request implements the most expedient change to fix SPARK-17386: 
Use a default trigger interval of 100ms instead of polling continuously for new 
data by default. I've changed the default value used in both 
`StreamingQueryManager` and `StreamTest` and created a new constant to 
facilitate changing both of those defaults simultaneously in the future.

## How was this patch tested?

All existing regression tests pass, and most of those tests use the default 
trigger intervals that this PR changes.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/frreiss/spark-fred fred-17386

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14945.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14945


commit 13496416bf2409082e29231cad504d1b3bf1
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-02T23:22:59Z

Change default trigger to 1/10 second

commit b1e7fd55adaa7faf5344ea5ddc135cf9cbaf3507
Author: frreiss <frre...@us.ibm.com>
Date:   2016-09-02T23:23:22Z

Merge branch 'master' of https://github.com/apache/spark into fred-17386




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14553: [SPARK-16963] Changes to Source trait and related implem...

2016-08-31 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14553
  
@ScrapCodes, would you mind triggering a build of this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14803: [SPARK-17153][SQL] Should read partition data when readi...

2016-08-30 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14803
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14870: [SPARK-17303] Added spark-warehouse to dev/.rat-e...

2016-08-29 Thread frreiss
GitHub user frreiss opened a pull request:

https://github.com/apache/spark/pull/14870

[SPARK-17303] Added spark-warehouse to dev/.rat-excludes

## What changes were proposed in this pull request?

Excludes the `spark-warehouse` directory from the Apache RAT checks that 
src/run-tests performs. `spark-warehouse` is created by some of the Spark SQL 
tests, as well as by `bin/spark-sql`.

## How was this patch tested?

Ran src/run-tests twice. The second time, the script failed because the 
first iteration
Made the change in this PR.
Ran src/run-tests a third time; RAT checks succeeded.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/frreiss/spark-fred fred-17303

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14870.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14870


commit 850261fb54a0d372b12a16a89e16774cc9a4cba8
Author: frreiss <frre...@us.ibm.com>
Date:   2016-08-29T23:02:15Z

Added spark-warehouse to dev/.rat-excludes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14803: [SPARK-17153][SQL] Should read partition data whe...

2016-08-29 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14803#discussion_r76646983
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -129,13 +129,20 @@ class FileStreamSource(
 val files = metadataLog.get(Some(startId + 1), 
Some(endId)).flatMap(_._2)
 logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
 logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
+val newOptions = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) {
--- End diff --

I recommend putting this code into the initialization code at the top of 
this file that sets `qualifiedBasePath` (currently lines 47-50). That way all 
the code that interprets the meaning of the `path` parameter` will be in one 
place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14553: [SPARK-16963] Changes to Source trait and related implem...

2016-08-29 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14553
  
@rxin and @marmbrus, would it be possible to get this PR reviewed soon? I 
can split it into smaller chunks if that would make things easier; I just need 
to know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14691: [SPARK-16407][STREAMING] Allow users to supply cu...

2016-08-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14691#discussion_r76505064
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -123,12 +124,30 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
   /**
* :: Experimental ::
* Specifies the underlying output data source. Built-in options include 
"parquet", "json", etc.
+   * Additionally user specific StreamSinkProviders can be specified hear 
using the fully qualified
+   * class name.
*
* @since 2.0.0
*/
   @Experimental
   def format(source: String): DataStreamWriter[T] = {
 this.source = source
+this.sinkProvider = null
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output data source using a 
StreamSinkProvider. This is useful for
+   * sinks which are constructed with user specified functions (such as a 
user specified version of
+   * ForeachSink).
+   *
+   * @since 2.1.0
+   */
+  @Experimental
+  def format(sinkProvider: StreamSinkProvider): DataStreamWriter[T] = {
+this.source = null
--- End diff --

Probably better to set `source` to a dedicated constant string like 
"custom" here, so that the branches in start() all have consistent logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14691: [SPARK-16407][STREAMING] Allow users to supply cu...

2016-08-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14691#discussion_r76504239
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -123,12 +124,30 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
   /**
* :: Experimental ::
* Specifies the underlying output data source. Built-in options include 
"parquet", "json", etc.
+   * Additionally user specific StreamSinkProviders can be specified hear 
using the fully qualified
--- End diff --

*user-specific* StreamSinkproviders can be specified *here*


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14773: [SPARK-17203][SQL] data source options should alw...

2016-08-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14773#discussion_r76503740
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 ---
@@ -65,7 +65,7 @@ case class CreateDataSourceTableCommand(
 
 var isExternal = true
 val optionsWithPath =
-  if (!new CaseInsensitiveMap(options).contains("path") && 
managedIfNoPath) {
+  if (!options.contains("path") && managedIfNoPath) {
--- End diff --

Maybe create a constant for the string "path" as a data source param? It 
occurs in quite a few places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14802: [SPARK-17235][SQL] Support purging of old logs in Metada...

2016-08-26 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14802
  
LGTM. I have written nearly the exact same thing as part of 
[https://github.com/apache/spark/pull/14553], but can use this version of the 
method instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...

2016-08-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13513#discussion_r76499068
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -129,3 +131,86 @@ class FileStreamSource(
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 }
+
+class FileStreamSourceLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[Seq[String]](sparkSession, path) {
+
+  // Configurations about metadata compaction
+  private val compactInterval = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+  s"positive value.")
+
+  private val fileCleanupDelayMs = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  private val isDeletingExpiredLog = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private var compactBatchId: Long = -1L
+
+  private def isCompactionBatch(batchId: Long, compactInterval: Long): 
Boolean = {
+batchId % compactInterval == 0
+  }
+
+  override def add(batchId: Long, metadata: Seq[String]): Boolean = {
+if (isCompactionBatch(batchId, compactInterval)) {
+  compactMetadataLog(batchId - 1)
+}
+
+super.add(batchId, metadata)
+  }
+
+  private def compactMetadataLog(batchId: Long): Unit = {
+// read out compact metadata and merge with new metadata.
+val batches = super.get(Some(compactBatchId), Some(batchId))
+val totalMetadata = batches.flatMap(_._2)
+if (totalMetadata.isEmpty) {
+  return
+}
+
+// Remove old compact metadata file and rewrite.
+val renamedPath = new Path(path, 
s".${batchId.toString}-${UUID.randomUUID.toString}.tmp")
+fileManager.rename(batchIdToPath(batchId), renamedPath)
+
+var isSuccess = false
+try {
+  isSuccess = super.add(batchId, totalMetadata)
+} catch {
+  case NonFatal(e) => isSuccess = false
+} finally {
+  if (!isSuccess) {
+// Rollback to the previous status if compaction is failed.
--- End diff --

This rollback code will not execute if the process exits during a 
compaction operation. You will need cleanup code in the class constructor to 
handle that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] Changes to Source trait and related...

2016-08-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r76498637
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -727,6 +732,48 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
   }
 }
   }
+
+
--- End diff --

This line no longer exists after merging the changes from 
[https://github.com/apache/spark/pull/14728] and replacing my former 
implementation of GC for FileStreamSource with the one in that merged PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] Changes to Source trait and related...

2016-08-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r76498301
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -24,21 +24,24 @@ import java.text.SimpleDateFormat
 import java.util.Calendar
 import javax.annotation.concurrent.GuardedBy
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ListBuffer
 import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
+import org.apache.spark.sql._
 import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
 
+
 object TextSocketSource {
   val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
   val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
 StructField("timestamp", TimestampType) :: Nil)
   val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss")
 }
 
+
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] Changes to Source trait and related...

2016-08-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r76498251
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -244,6 +250,21 @@ class StreamExecution(
 logDebug(s"Resuming with committed offsets: $committedOffsets")
 }
 
+// Compare the offsets we just read from the checkpoint against the
+// sources' own checkpoint data.
+val offsetChanges = mutable.Map[Source, Offset]()
+committedOffsets.foreach {
+  case (src, checkptOffset) =>
+val srcOffset = src.getMinOffset
+if (srcOffset.isDefined && srcOffset.get > checkptOffset) {
+  logWarning(s"Source $src lost offsets between $checkptOffset 
" +
+s"and ${srcOffset.get} when resuming. Skipping ahead to 
${srcOffset.get}.")
+  offsetChanges += (src -> srcOffset.get)
+}
+}
+committedOffsets ++= offsetChanges
+
+
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [SPARK-16963] Changes to Source trait and related...

2016-08-26 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14553#discussion_r76498223
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
 ---
@@ -48,4 +49,13 @@ trait MetadataLog[T] {
* Return the latest batch Id and its metadata if exist.
*/
   def getLatest(): Option[(Long, T)]
+
+
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14553: [WIP] [SPARK-16963] Initial version of changes to Source...

2016-08-22 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/14553
  
These changes are now ready for review. The contents of this PR pass 
regression tests on my machines. Can one of the committers please start a 
Jenkins build?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14151: [SPARK-16496][SQL] Add wholetext as option for re...

2016-08-15 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14151#discussion_r74805700
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 ---
@@ -39,6 +39,11 @@ class TextSuite extends QueryTest with SharedSQLContext {
 verifyFrame(spark.read.text(testFile))
   }
 
+  test("reading text file with wholetext option on") {
--- End diff --

As far as I'm aware, the most common use case for reading entire files is 
using a glob to read a directory or directory tree containing multiple files. 
For example, one might download the Enron corpus (see 
[https://www.cs.cmu.edu/~./enron/]), which comes packaged with one file per 
email message. With a large number of files on the input, it's important that 
the work of processing the files be split among many cores. So the test for the 
`wholetext` option really should have multiple input files and verify that 
different files end up in different partitions of the resulting RDD or 
Dataframe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14151: [SPARK-16496][SQL] Add wholetext as option for re...

2016-08-15 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/14151#discussion_r74804217
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -533,6 +533,12 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(10L)
 
+  val WHOLETEXT =
+SQLConfigBuilder("spark.sql.wholetext")
--- End diff --

Should this really be a session-global configuration? It seems like 
something that is specific to a particular input file and should only be set 
when opening a given file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14553: [WIP] [SPARK-16963] Initial version of changes to...

2016-08-08 Thread frreiss
GitHub user frreiss opened a pull request:

https://github.com/apache/spark/pull/14553

[WIP] [SPARK-16963] Initial version of changes to Source trait

## What changes were proposed in this pull request?

Initial proposed changes to the Source trait such that the scheduler can 
notify data sources when it is safe to discard buffered data. Major changes so 
far:
* Added a method `commit(end: Offset)` that tells the Source that is OK to 
discard all offsets up `end`, inclusive.
* Changed the semantics of a `None` value for the `getBatch` method to mean 
"from the very beginning of the stream"; as opposed to "all data present in the 
Source's buffer".
* Added notes that the upper layers of the system will never call 
`getBatch` with a start value less than the last value passed to `commit`.
* Added a `getMinOffset` method to allow the scheduler to query the status 
of each Source on restart. This addition is not strictly necessary, but it 
seemed like a good idea -- Sources will be maintaining their own persistent 
state, and there may be bugs in the checkpointing code.
* Changed the name of `getOffset` to `getMaxOffset`

## How was this patch tested?

Testing is still TBD.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/frreiss/spark-fred fred-16963

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14553


commit 6c9acdefe1c791bad3f00d845a72d07e2113b214
Author: frreiss <frre...@us.ibm.com>
Date:   2016-08-09T02:28:32Z

Initial version of changes to Source trait




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScalarSubque...

2016-06-10 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/13155
  
Tests ran successfully on my machine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScalarSubque...

2016-06-10 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/13155
  
Updated changes are in. Running a full regression suite overnight.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66564793
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66561815
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66561017
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
--- End diff --

Yes, that is true. Added a test case and an additional clause in that case 
statement in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66560947
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66560868
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66558119
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66558125
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66539170
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66509510
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66509444
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
--- End diff --

Fixed in my local copy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66509405
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
--- End diff --

Fixed in my local copy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScalarSubque...

2016-06-09 Thread frreiss
Github user frreiss commented on the issue:

https://github.com/apache/spark/pull/13155
  
@rxin I'll have an updated set of changes in tonight


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-09 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r66478261
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
--- End diff --

No particular reason for using `ExprId.id`. I'll change to using the entire 
class as a key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-02 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r65584546
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
--- End diff --

The join attributes that the Analyzer adds to the Aggregate node are 
AttributeReference nodes, and the `evalAggOnZeroTups` method as currently 
written can't process them. A more comprehensive facility for statically 
evaluating expressions would be nice to have; but I'm hesitant to add such a 
mechanism as part of a bug fix. Perhaps a follow-on JIRA is in order?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-02 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r65583548
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
--- End diff --

The test on the next line doesn't compile without the cast. The condition 
in a Filter node is of type Expression, and `Expresssion.eval()` returns `Any`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...

2016-06-01 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r65454461
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
--- End diff --

Fixed in my local copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedScalarSubque...

2016-05-31 Thread frreiss
Github user frreiss commented on the pull request:

https://github.com/apache/spark/pull/13155
  
Thanks @hvanhovell for the additional pass of review! I'll be preparing my 
slides for Spark Summit all day today but will come back to this PR as soon as 
that's done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-28 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r64996012
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-28 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r64995985
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-27 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r64944724
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-27 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r64943404
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-27 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r64942870
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-27 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r64942480
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
+val rewrittenExpr = expr transform {
+  case r @ AttributeReference(_, dataType, _, _) =>
+bindings(r.exprId.id) match {
+  case Some(v) => Literal.create(v, dataType)
+  case None => Literal.default(NullType)
+}
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalAggOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+aggFunc.defaultResult.getOrElse(Literal.default(NullType))
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
+   * Statically evaluate a scalar subquery on an empty input.
+   *
+   * WARNING: This method only covers subqueries that pass the 
checks under
+   * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the 
checks in
+   * CheckAnalysis become less restrictive, this method will need to 
change.
+   */
+  private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
+// Inputs to this method will start with a chain of zero or more 
SubqueryAlias
+// and Project operators, followed by an optional Filter, followed by 
an
+// Aggregate. Traverse the operators recursively.
+def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = {
+  lp match {
+case SubqueryAlias(_, child) => evalPlan(child)
+case Filter(condition, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) bindings
+  else {
+val exprResult = evalExpr(condition, bindings).getOrElse(false)
+  .asInstanceOf[Boolean]
+if (exprResult) bindings else Map()
+  }
+
+case Project(projectList, child) =>
+  val bindings = evalPlan(child)
+  if (bindings.size == 0) {
+bindings
+  } else {
+projectList.map(ne => (ne.exprId.id, evalExpr(ne, 
bindings))).toMap
+  }
+
+case Aggregate(_, aggExprs, _) =>
+  // Some of the expressions under the Aggregate node are the join 
columns
+  // for joining with the outer query block. Fill those 
expressions in with
+  // nulls and statically evaluate the remainder.
+  aggExprs.map(ne => ne match {
+case AttributeReference(_, _, _, _) => (ne.exprId.id, None)
+case Alias(AttributeReference(_, _, _, _), _) => 
(ne.exprId.id, None)
+case _ => (ne.exprId.id, evalAggOnZeroTups(ne))
+  }).toMap
+
+case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+  }
+}
+
+val resultMap = evalPlan(plan)
+
+// By convention, the scalar subquery result is the leftmost field.
+resultMap(plan.output.head.exprId.id)
+  }
+
+  /**
+   * Split the plan for a scalar subquery into the parts above the 
Aggregate node
+   * (first part of returned value) and the parts below the Aggregate 
node, including
+   * the Aggregate (second part of returned value)
+   */
+  private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], 
Aggregate] = {
+var topPart = List[LogicalPlan]()
+var bottomPart : LogicalPlan = plan
+while (! bottomPart.isInstanceOf[Aggregate]) {
+  topPart = bottomPart :: topPart
+  bottomPart = bottomPart.children.head
+}
+(topPart, bottomPart.asInstanceOf[Aggregate])
+  }
+
+  /**
+   * Rewrite the nodes above the Aggregate in a subquery so that they 
generate an
+   * auxiliary column "isFiltered"
+   * @param subqueryPlan plan before rewrite
+   * @param filteredId expression ID for the "isFiltered" column
+   */
+  private def addIsFiltered(subquery

[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-27 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r64941953
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing zero or more 
placeholders, given a set
+   * of bindings for placeholder values.
+   */
+  private def evalExpr(expr : Expression, bindings : Map[Long, 
Option[Any]]) : Option[Any] = {
--- End diff --

Fixed that in my local copy. Will include the change in the next update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-24 Thread frreiss
Github user frreiss commented on the pull request:

https://github.com/apache/spark/pull/13155#issuecomment-221336991
  
Could one of the committers please trigger another build on this PR? The 
change set passes all the tests on my machine, but it's good to be safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-20 Thread frreiss
Github user frreiss commented on the pull request:

https://github.com/apache/spark/pull/13155#issuecomment-220684859
  
I've added additional changes to cover two additional cases that 
@hvanhovell pointed out on review, plus one additional case that came up while 
fixing the first two:
- The correlated subquery may have a HAVING clause
- The correlated subquery may be nested inside additional query blocks that 
apply projections
- The correlated subquery may return NULL when the correlation bindings join
  with the subquery and a non-NULL value when bindings do *not* join, e.g. 
```sql
select l.a from l where
  (select case when count(*) = 1 then null else count(*) end from r where 
l.a = r.c) = 0
```

The logic for statically evaluating the subquery's aggregate expression now 
handles an Aggregate node with a chain of Filter and Project operators above 
it. The rewrite logic has an third case to handle subqueries with HAVING 
clauses. The second case from the original change set now adds an additional 
column to cover subqueries that return NULL with bindings that join the inner 
query block. I also addressed various other more minor review comments.

After these changes, the algorithm for preventing the COUNT bug in scalar 
subqueries is now the following:
```
V <- 
if (V is null)
then
  Use the original rewrite from SPARK-14785.
else 
  if (subquery has a Filter above the Aggregate node)
  then
Rewrite the Filter node to a Project that adds a Boolean column 
isFiltered.
Rewrite nodes above the Filter node so they pass through the isFiltered 
column.
  else
Add an isFiltered column with a hard-coded value of false to the top 
Project in the subquery.
  endif
  Create a left outer join between the outer query block and the rewritten 
subquery.
  Put the following case statement into the Project operator above the 
outer join.
  case 
   when isFiltered is null then coalesce(aggVal, V)
   when isFiltered then null
   else aggVal
endif
```

### Correctness proof 

Let *b* denote a tuple of correlation bindings from the outer query block.  
Without loss of generality, assume that the correlated subquery has the plan 
`Project(Filter(Aggregate(Join({b},T`, where *T* is a table. Note that 
upstream checks in Catalyst ensure that the Aggregate node will produce exactly 
one tuple.

Consider the evaluation of the original subquery on *b*. We can ask three 
questions about this evaluation:
- Did *b* join with one or more tuples from *T*?
- Did the Filter node reject the tuple that the Aggregate node returned?
- Did the subquery return `null`?

The answer to each of these questions must be "yes" or "no", leading to 
eight cases:

Case \# | Empty join? | Agg filtered? | SQ returns null?
-- | --- | - | 
1  | Yes | Yes   | Yes
2  | Yes | Yes   | No
3  | Yes | No| Yes
4  | Yes | No| No
5  | No  | Yes   | Yes
6  | No  | Yes   | No
7  | No  | No| Yes
8  | No  | No| No

Cases 2 and 6 are impossible. For the remaining cases, the rewritten 
subquery returns the correct result:
- Case 1 and 3 => Subquery returns null on empty join result, so first 
branch of rewrite algorithm above applies. Outer join returns null for subquery 
result.
- Case 4 => Subquery returns non-null answer on empty join result, so 
second or third branch of rewrite applies. Outer join in the rewritten query 
returns a tuple with `(aggVal, isFiltered)` set to `(null,null)`, so case 
statement at the top of the rewritten query returns the answer the subquery 
returns on an empty join, which is the correct answer.
- Case 5 => If first branch of rewrite applies, the outer join in the 
rewritten query returns null. If second or third branch of rewrite applies, 
then outer join in rewritten query returns a tuple with `(aggVal, isFiltered)` 
set to `(null, true)`, so case statement at top of rewritten query returns null.
- Case 7 => If first branch of rewrite applies, the outer join in the 
rewritten query returns null. For second or third branch of rewrite, outer join 
in rewritten query returns a tuple with `(aggVal, isFiltered)` set to `(null, 
false)`, so case statement at top of rewritten query returns null.
- Case 8 => If first branch of rewrite applies, the outer join in the 
rewritten query returns null. For second or third branch of rewrite, outer join 
in rewritten query returns a tuple with `(aggVal, isFiltered)` set to 
`(, false)`, so case statement at top of rewritten query 
returns the non-null value of a

[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-19 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r63954767
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1648,16 +1648,56 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+val resultLit = aggFunc.defaultResult match {
+  case Some(lit) => lit
+  case None => Literal.default(NullType)
+}
+Alias(resultLit, "aggVal") (exprId = resultId)
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
* Construct a new child plan by left joining the given subqueries to a 
base plan.
*/
   private def constructLeftJoins(
   child: LogicalPlan,
   subqueries: ArrayBuffer[ScalarSubquery]): LogicalPlan = {
 subqueries.foldLeft(child) {
   case (currentChild, ScalarSubquery(query, conditions, _)) =>
+val aggOutputExpr = 
query.asInstanceOf[Aggregate].aggregateExpressions.head
--- End diff --

I came across an additional counterexample just now:
```sql
select l.a from l where
(select case when count(*) = 1 then null else count(*) from r where 
l.a = r.c) = 0
```
This subquery returns null when count(*) on the inner query block is 1. The 
rewrite in this PR turns the overall query into:
```sql
select * 
from l left outer join 
(select c, case when count(*) = 1 then null else count(*) as cnt from r 
group by c) sq
on l.a = sq.c 
where coalesce(sq.cnt, 0) = 0
```
This result is incorrect; if exactly one tuple from R joins with a tuple of 
L, the query will return the L tuple, even though that tuple should not be 
returned.

So, to summarize, the operators above the outer join need to be able to 
discern between *four* different cases:
1. A tuple from the outer query block joins with one or more tuples in the 
subquery and produces an aggregate result that is *not* null
2. A tuple from the outer query block joins with one or more tuples in the 
subquery and produces an aggregate result that *is* null
3. A tuple from the outer query block does not join with any tuples in the 
subquery
4. A tuple from the outer query block joins with tuples in the subquery and 
produces an aggregate result that is filtered out by a HAVING clause

A single result column is simply not able to encode all four cases (or even 
the first three). The output of the outer join needs to have at least two 
columns to pass through enough information. 
Here's a scheme that will work as far as I can see: The result of the outer 
join contains the correlation columns, plus a column `aggVal` for the aggregate 
value, plus an additional column `isFiltered`. The four cases above are encoded 
as follows:
1. `aggVal is not null and isFiltered = false`
2. `aggVal is null and isFiltered = false`
3. `isFiltered is null`
4. `isFiltered = true`

Then the coalesce expression above the outer join turns into a CASE 
statement:
```sql
case 
   when isFiltered is null then coalesce(aggVal, )
   when isFiltered then null
   else aggVal
```

Here's pseudocode for a rewrite that should produce the correct values of 
`isFiltered` and `aggVal`:
```
if (subquery returns null when zero tuples join) 
then
  Use the original rewrite.
else 
  if (subquery has a Filter above the Aggregate node)
  then
Replace the Filter node with a Project that computes the value of 
isFiltered.
Rewrite nodes above the Filter node so they pass through the isFiltered 
column.
  else
 Add an isFiltered column with a hard-coded value of false to the top 
Project in the subquery.
  endif
  Create a left outer join between the outer query block and the rewritten 
subquery.
  Put the case statement in the previous listing into the Project operator 
above the outer join.
endif
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

--

[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...

2016-05-19 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/13155#discussion_r63933593
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1648,16 +1648,56 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
+   */
+  private def evalOnZeroTups(expr : Expression) : Option[Any] = {
+// AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
+// in the expression with the value they would return for zero input 
tuples.
+val rewrittenExpr = expr transform {
+  case a @ AggregateExpression(aggFunc, _, _, resultId) =>
+val resultLit = aggFunc.defaultResult match {
+  case Some(lit) => lit
+  case None => Literal.default(NullType)
+}
+Alias(resultLit, "aggVal") (exprId = resultId)
+}
+Option(rewrittenExpr.eval())
+  }
+
+  /**
* Construct a new child plan by left joining the given subqueries to a 
base plan.
*/
   private def constructLeftJoins(
   child: LogicalPlan,
   subqueries: ArrayBuffer[ScalarSubquery]): LogicalPlan = {
 subqueries.foldLeft(child) {
   case (currentChild, ScalarSubquery(query, conditions, _)) =>
+val aggOutputExpr = 
query.asInstanceOf[Aggregate].aggregateExpressions.head
--- End diff --

Upon further reflection, a Filter node above the Aggregate creates 
additional issues. If you rewrite the subquery into an outer join above the 
Filter, it may be impossible to distinguish between two cases:
- The original subquery returns null because an aggregate value did not 
pass the HAVING clause
- The aggregate in the original subquery is evaluated over zero tuples

In both cases, the outer join will return a tuple containing nulls.
For example, the query:
```sql
select * from l where (select count(*) cnt from r where l.a = r.c having 
cnt = 0) = 0
```
would turn into
```sql
select * 
from l left outer join (select c, count(*) cnt from r group by c having cnt 
= 0) sq
on l.a = sq.c 
where sq.cnt = 0
```
Note how the rewritten subquery `select c, count(*) cnt from r group by c 
having cnt = 0` always returns zero tuples.

I'm working through a few possible solutions, trying to see if one of them 
is guaranteed to be correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >