[GitHub] spark issue #17640: [SPARK-17608][SPARKR]:Long type has incorrect serializat...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/17640 Overall, this looks like a sensible approach to a messy problem. You might want to think about adding some overflow handling to the SQL-->R translation. That is, if a Dataframe contains a `bigint` value that cannot be expressed as a `Double`, it would be safer to convert that value to NaN instead of stripping the lower-order bits off the `bigint`. The `bigint` column in the source Dataframe could hold a unique identifier or a hash value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15027: [SPARK-17475] [STREAMING] Delete CRC files if the filesy...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15027 @viirya to answer your question re deleting vs moving the files: Deleting is easier to implement, because once the .crc file is deleted, you can be sure it won't appear again. Moving the checksum files would be better but would require tracking down other Spark code that does filesystem operations on the files later on. These log files are very short by HDFS standards, so the checksums aren't providing much value anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15027: [SPARK-17475] [STREAMING] Delete CRC files if the filesy...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15027 When I comment out line 155 in HDFSMetadataLog.scala on this branch (`if (fileManager.exists(crcPath)) fileManager.delete(crcPath)`) and run the test case attached to this PR, the test case fails: ``` [freiss@fuzzy]:~/spark/from_git/spark-17475$ build/sbt -Phadoop-2.7 -Pscala-2.11 "test-only org.apache.spark.sql.execution.streaming.HDFSMetadataLogSuite" ... [info] - HDFSMetadataLog: purge *** FAILED *** (135 milliseconds) [info] Array(/Users/freiss/spark/from_git/spark-17475/target/tmp/spark-682aa8da-3f04-494f-846f-13c97d3e5538/..29ef67f7-1712-4350-8552-1f8bc6424d0b.tmp.crc, /Users/freiss/spark/from_git/spark-17475/target/tmp/spark-682aa8da-3f04-494f-846f-13c97d3e5538/..ab9bafcb-bdf5-4411-9a9b-60d293d653a6.tmp.crc, /Users/freiss/spark/from_git/spark-17475/target/tmp/spark-682aa8da-3f04-494f-846f-13c97d3e5538/..f79fbc34-58c5-4856-a40d-84eef49c8b9e.tmp.crc, /Users/freiss/spark/from_git/spark-17475/target/tmp/spark-682aa8da-3f04-494f-846f-13c97d3e5538/2) had size 4 instead of expected size 1 (HDFSMetadataLogSuite.scala:126) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500) ... ``` Note how the test case failure message contains a list of orphan .crc files left in HDFSMetadataLog's temp directory. So, while the filesystem code appears on the surface to be immune to this problem, the problem is clearly happening in the context of the unit tests. Determining exactly what is going on will require more in-depth investigation. Depending on the true root cause, it's possible that this problem also occurs in some distributed settings. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15162: [SPARK-17386] [STREAMING] [WIP] Make polling rate adapti...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15162 Closing the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15162: [SPARK-17386] [STREAMING] [WIP] Make polling rate...
Github user frreiss closed the pull request at: https://github.com/apache/spark/pull/15162 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source trait ...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14553 Updated the branch and addressed new review comments. Looks like my last push missed a one-line change to memory.scala. Tests are running now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r85227714 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -111,6 +126,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } + override def commit(end: Offset): Unit = synchronized { +if (end.isInstanceOf[LongOffset]) { + val newOffset = end.asInstanceOf[LongOffset] + val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt + + if (offsetDiff < 0) { +sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") + } + + batches.trimStart(offsetDiff) + lastOffsetCommitted = newOffset +} else { + sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " + +s"an instance of this class") --- End diff -- Corrected in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r85227658 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -111,6 +126,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } + override def commit(end: Offset): Unit = synchronized { +if (end.isInstanceOf[LongOffset]) { --- End diff -- Corrected in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r84569335 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -337,17 +343,27 @@ class StreamExecution( } if (hasNewData) { reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { -assert( - offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), +assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") +// NOTE: The following code is correct because runBatches() processes exactly one +// batch at a time. If we add pipeline parallelism (multiple batches in flight at +// the same time), this cleanup logic will need to change. + +// Now that we've updated the scheduler's persistent checkpoint, it is safe for the +// sources to discard data from the previous batch. +val prevBatchOff = offsetLog.get(currentBatchId - 1) +if (prevBatchOff.isDefined) { + prevBatchOff.get.toStreamProgress(sources).foreach { +case (src, off) => src.commit(off) + } +} + // Now that we have logged the new batch, no further processing will happen for -// the previous batch, and it is safe to discard the old metadata. -// Note that purge is exclusive, i.e. it purges everything before currentBatchId. -// NOTE: If StreamExecution implements pipeline parallelism (multiple batches in -// flight at the same time), this cleanup logic will need to change. -offsetLog.purge(currentBatchId) +// the batch before the previous batch, and it is safe to discard the old metadata. +// Note that purge is exclusive, i.e. it purges everything before the target ID. +offsetLog.purge(currentBatchId - 1) --- End diff -- I can move this change to another JIRA if you'd like, but we really should change `currentBatchId` to `currentBatchId - 1` at some point. The call to `offsetLog.purge(currentBatchId)`, which I introduced in my PR for SPARK-17513, contains a subtle bug. The recovery logic in `populateStartOffsets()` reads the last and second-to-last entries in `offsetLog`. `populateStartOffsets()` uses those entries to populate `availableOffsets` and `committedOffsets`, respectively. Calling `offsetLog.purge(currentBatchId)` at line 350/366 results in the `offsetLog` being truncated to one entry, which in turn results in `committedOffsets` being left empty on recovery, which in turn causes the first call to `getBatch()` for any source to have `None` as its first argument. Sources that do not prune buffered data in their `commit()` methods will return a previously committed data in response to such a `getBatch()` call. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r84539662 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -336,17 +342,27 @@ class StreamExecution( } if (hasNewData) { reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { -assert( - offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), +assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") +// Now that we've updated the scheduler's persistent checkpoint, it is safe for the +// sources to discard data from *before* the previous batch. +// The scheduler might still request the previous batch from a source in some cases +// if a crash and recovery occured. +val prevBatchOff = offsetLog.get(currentBatchId - 2) --- End diff -- Yes, `currentBatchId - 1` does work here, and I've updated my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r84538526 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -336,17 +342,27 @@ class StreamExecution( } if (hasNewData) { reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { -assert( - offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), +assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") +// Now that we've updated the scheduler's persistent checkpoint, it is safe for the +// sources to discard data from *before* the previous batch. +// The scheduler might still request the previous batch from a source in some cases +// if a crash and recovery occured. --- End diff -- Fixed in my local copy --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source trait ...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14553 I've been running tests since this morning; should have updates in soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r84138507 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala --- @@ -30,16 +30,30 @@ trait Source { /** Returns the schema of the data from this source */ def schema: StructType - /** Returns the maximum available offset for this source. */ + /** + * Returns the maximum available offset for this source. + * Returns `None` if this source has never received any data. + */ def getOffset: Option[Offset] /** - * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then - * the batch should begin with the first available record. This method must always return the - * same data for a particular `start` and `end` pair. + * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`, + * then the batch should begin with the first record. This method must always return the + * same data for a particular `start` and `end` pair; even after the Source has been restarted + * on a different node. + * + * Higher layers will always call this method with a value of `start` greater than or equal + * to the last value passed to `commit` and a value of `end` less than or equal to the + * last value returned by `getMaxOffset` --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source trait ...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14553 All my changes are in now, and regression tests pass. As far as I can see, all the review comments have been addressed at this point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source trait ...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14553 Sorry, I missed the last few email notifications about this PR. I've merged with the head version and made updates to address the most recent round of review comments. Currently running regression tests locally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r83524491 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -92,21 +105,64 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR - /** Returns the maximum available offset for this source. */ + override def lastCommittedOffset: Option[Offset] = synchronized { +if (lastOffsetCommitted.offset == -1) { + None +} else { + Some(lastOffsetCommitted) +} + } + override def getOffset: Option[Offset] = synchronized { -if (lines.isEmpty) None else Some(LongOffset(lines.size - 1)) +if (currentOffset.offset == -1) { + None +} else { + Some(currentOffset) +} } /** Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { -val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 1).getOrElse(0) -val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1 -val data = synchronized { lines.slice(startIdx, endIdx) } +val startOrdinal = + start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 +val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 + +// Internal buffer only holds the batches after lastOffsetCommitted +val rawList = synchronized { + val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 + val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 + batches.slice(sliceStart, sliceEnd) +} + import sqlContext.implicits._ +val rawBatch = sqlContext.createDataset(rawList) + + + --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] [STREAMING] [SQL] Changes to Source...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r83524216 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala --- @@ -30,16 +30,37 @@ trait Source { /** Returns the schema of the data from this source */ def schema: StructType - /** Returns the maximum available offset for this source. */ + /** + * Returns the highest offset that this source has removed from its internal buffer + * in response to a call to `commit`. + * Returns `None` if this source has not removed any data. + */ + def lastCommittedOffset: Option[Offset] = (None) + + /** + * Returns the maximum available offset for this source. + * Returns `None` if this source has never received any data. + */ def getOffset: Option[Offset] /** - * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then - * the batch should begin with the first available record. This method must always return the - * same data for a particular `start` and `end` pair. + * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`, + * then the batch should begin with the first record. This method must always return the + * same data for a particular `start` and `end` pair; even after the Source has been restarted + * on a different node. + * --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15352: [SPARK-17780][SQL]Report Throwable to user in Str...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15352#discussion_r82085912 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -207,13 +207,18 @@ class StreamExecution( }) } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() - case NonFatal(e) => + case e: Throwable => streamDeathCause = new StreamingQueryException( this, s"Query $name terminated with exception: ${e.getMessage}", e, Some(committedOffsets.toCompositeOffset(sources))) logError(s"Query $name terminated with error", e) +// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to +// handle them +if (!NonFatal(e)) { + throw e --- End diff -- `StreamExecution.stop()` won't have been called if the microbatch thread arrives at this location in the code, and the user likely won't call it after seeing the exception. You should probably call the `stop` method on each of the sources before exiting the microbatch thread here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15307#discussion_r81684775 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -136,16 +139,30 @@ class StreamExecution( /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE + override def queryStatus: StreamingQueryInfo = { +this.toInfo + } + /** Returns current status of all the sources. */ override def sourceStatuses: Array[SourceStatus] = { val localAvailableOffsets = availableOffsets sources.map(s => - new SourceStatus(s.toString, localAvailableOffsets.get(s).map(_.toString))).toArray + new SourceStatus( --- End diff -- Actually, you can probably drop most of the synchronization if you keep two `StreamMetrics` objects and preallocate the slots for counters. At least the way things are now, each counter in `StreamMetrics` is written once per batch. If you tweak `sourceStatuses()` to return the metrics from the most recent completed batch (i.e. the `StreamMetrics` object that's not currently being written to), there should be no overlap between readers and writers. Eventually you'll want to have more than one `StreamMetrics` object anyway, since the scheduler will need to pipeline multiple batches to reach latencies below the 50-100ms level. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15307#discussion_r81672432 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -317,15 +358,18 @@ class StreamExecution( // TODO: Move this to IncrementalExecution. // Request unprocessed data from all sources. -val newData = availableOffsets.flatMap { - case (source, available) +val newData = timeIt(GET_BATCH_LATENCY) { --- End diff -- Note that the time interval being measured here will have different semantics for different sources, depending on how much computation occurs inside the source's `getBatch` method vs. lazily when the data is read from the resulting Dataframe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15307#discussion_r81672040 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -136,16 +139,30 @@ class StreamExecution( /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE + override def queryStatus: StreamingQueryInfo = { +this.toInfo + } + /** Returns current status of all the sources. */ override def sourceStatuses: Array[SourceStatus] = { val localAvailableOffsets = availableOffsets sources.map(s => - new SourceStatus(s.toString, localAvailableOffsets.get(s).map(_.toString))).toArray + new SourceStatus( --- End diff -- If this method is intended to be called from threads other than the scheduler thread, then the entire map really ought to be synchronized on `streamMetrics`'s lock. Otherwise this method could return a mixture of statistics from different points of time, even within a single source. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15307#discussion_r81668841 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -56,7 +57,12 @@ case class StateStoreRestoreExec( child: SparkPlan) extends execution.UnaryExecNode with StatefulOperator { + override lazy val metrics = Map( +"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) --- End diff -- The metric names should probably be in a separate, centralized list of constants. Users will want a single place in the API docs to find a list of all available metrics, and the list is likely to change quite frequently as Structured Streaming evolves. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15262: [SPARK-17690][STREAMING][SQL] Add mini-dfs cluster based...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15262 LGTM overall. We may want to switch more of the test cases to use HDFS in a follow-on JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15258: [SPARK-17689][SQL][STREAMING] added excludeFiles option ...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15258 This change allows FileInputStream to consume partial outputs of a system such as Hadoop or another copy of Spark, provided that the system adheres rigidly to the write policy of recent versions of Hadoop. That is: First, write to a temporary file. Then close and flush the temporary file. Then rename the temporary file, using one of the newer, atomic HDFS APIs for renaming files. I worry that users might write data in a subtly different way that does not follow this procedure 100%, which could result in Spark reading incorrect data every once in a while. I recommend documenting under exactly what conditions the "ignore temporary files" option guarantees correct behavior. Also, it would be a good idea to include a mode in which FileInputStream will ignore a directory of files until the special file _SUCCESS appears, indicating that the directory is complete. Otherwise, Spark could end up consuming partial results from failed jobs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15258: [SPARK-17689][SQL][STREAMING] added excludeFiles ...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15258#discussion_r80838376 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -50,6 +50,19 @@ class ListingFileCatalog( refresh() + /** + * Often HDFS create temporary files while copying to a new directory or writing new content. + * These files are unintentionally picked up by streaming - causing job failures. This option lets + * HDFS skip these files matching the configured regex-patterns from being picked up by Streaming + * Job. + */ + private lazy val excludeFiles: Set[String] = parameters +.getOrElse("excludeFiles", ".*._COPYING_,_temporary").split(",").toSet + + private def isExcludedFile(path: Path): Boolean = { +excludeFiles.map(path.getName.matches).fold(false)(_ || _) --- End diff -- Probably better to precompile a regex here, as this method will be called a lot (100X per second X number of files under the root directory, by default) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15262: [SPARK-17690][STREAMING][SQL] Add mini-dfs cluste...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15262#discussion_r80826485 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -330,15 +353,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val filtered = textStream.filter($"value" contains "keep") testStream(filtered)( -AddTextFileData("drop1\nkeep2\nkeep3", src, tmp), +AddTextLocalFileData("drop1\nkeep2\nkeep3", src, tmp), +CheckAnswer("keep2", "keep3"), +StopStream, +AddTextLocalFileData("drop4\nkeep5\nkeep6", src, tmp), +StartStream(), +CheckAnswer("keep2", "keep3", "keep5", "keep6"), +AddTextLocalFileData("drop7\nkeep8\nkeep9", src, tmp), +CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") + ) +} + } + + test("read from text files using hdfs") { +withTempDirs { case (_src, tmp) => + // Create a mini dfs cluster. + System.clearProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA) + val conf = new HdfsConfiguration() + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmp.getAbsolutePath) + val cluster = new MiniDFSCluster.Builder(conf).build() + val hdfsHomeDirectory: Path = cluster.getFileSystem.getHomeDirectory + cluster.getFileSystem.mkdirs(hdfsHomeDirectory) + cluster.waitClusterUp() + val textStream = createFileStream("text", hdfsHomeDirectory.toString) + val filtered = textStream.filter($"value" contains "keep") + val src = hdfsHomeDirectory + testStream(filtered)( +AddTextHDFSFileData("drop1\nkeep2\nkeep3", src, tmp, conf), CheckAnswer("keep2", "keep3"), StopStream, -AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), +AddTextHDFSFileData("drop4\nkeep5\nkeep6", src, tmp, conf), StartStream(), CheckAnswer("keep2", "keep3", "keep5", "keep6"), -AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), +AddTextHDFSFileData("drop7\nkeep8\nkeep9", src, tmp, conf), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") ) +cluster.shutdown() --- End diff -- You'll probably want to put this cleanup code somewhere where it will be called even if another part of the test case crashes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15005 Thanks @srowen for all the thoughtful comments! It's great to see committers spending time to help improve the build experience for new developers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15005: [SPARK-17421] [DOCS] Documenting the current trea...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15005#discussion_r79902326 --- Diff: docs/building-spark.md --- @@ -16,24 +16,32 @@ Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+. ### Setting up Maven's Memory Usage -You'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`. We recommend the following settings: +You'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`: -export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" +export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" -If you don't run this, you may see errors like the following: +When compiling with Java 7, you will need to add the additional option "-XX:MaxPermSize=512M" to MAVEN_OPTS. + +If you don't add these parameters to `MAVEN_OPTS`, you may see errors and warnings like the following: [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... [ERROR] PermGen space -> [Help 1] [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... [ERROR] Java heap space -> [Help 1] -You can fix this by setting the `MAVEN_OPTS` variable as discussed before. +[INFO] Compiling 233 Scala sources and 41 Java sources to /Users/me/Development/spark/sql/core/target/scala-{site.SCALA_BINARY_VERSION}/classes... +OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled. +OpenJDK 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize= + +You can fix these problems by setting the `MAVEN_OPTS` variable as discussed before. **Note:** -* For Java 8 and above this step is not required. -* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automate this for you. +* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automatically add the above options for Java 7 to the `MAVEN_OPTS` environment variable. --- End diff -- What I meant to say was that build/mvn adds all three options, but I can see how the statement here can interpreted differently. Checking in an edited version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15005: [SPARK-17421] [DOCS] Documenting the current trea...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15005#discussion_r79766509 --- Diff: docs/building-spark.md --- @@ -16,24 +16,27 @@ Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+. ### Setting up Maven's Memory Usage -You'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`. We recommend the following settings: +If you are compiling with Java 7, you'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`: --- End diff -- Leaving out `-XX:ReservedCodeCacheSize` caused a warning in my tests: ```OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled. OpenJDK 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=``` Left that option in and reworded the beginning of this section as you suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15005: [SPARK-17421] [DOCS] Documenting the current trea...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15005#discussion_r79765892 --- Diff: docs/building-spark.md --- @@ -16,24 +16,31 @@ Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+. ### Setting up Maven's Memory Usage -You'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`. We recommend the following settings: +If you are compiling with Java 7, you'll need to configure Maven to use more memory than usual by setting `MAVEN_OPTS`: export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" -If you don't run this, you may see errors like the following: +When compiling with Java 8, a similar but smaller set of memory parameters is necessary: + +export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" + +If you don't add these parameters to `MAVEN_OPTS`, you may see errors like the following: [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... [ERROR] PermGen space -> [Help 1] [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... [ERROR] Java heap space -> [Help 1] -You can fix this by setting the `MAVEN_OPTS` variable as discussed before. +You can fix these problems by setting the `MAVEN_OPTS` variable as discussed before. **Note:** -* For Java 8 and above this step is not required. -* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automate this for you. +* This step is not needed for Java 8. +* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automatically add the above options for Java 7 to the `MAVEN_OPTS` environment variable. +* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`. +* You may see warnings like "ignoring option MaxPermSize=1g; support was removed in 8.0" when building or running tests with Java 8. These warnings are harmless. --- End diff -- Developers will see the warning when compiling with `build/mvn`. Updating to clarify that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15005 Summary of testing: - On Java 8, the build fails intermittently with OOM when `-Xmx2g` is omitted - The `-XX:ReservedCodeCacheSize=512m` argument prevents warnings on both Java 7 and OpenJDK 8 - `-XX:ReservedCodeCacheSize=512m` has no effect on IBM Java 8 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15166: [SPARK-17513][SQL] Make StreamExecution garbage-c...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15166#discussion_r79730904 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbage collection") { +val inputData = MemoryStream[Int] +val mapped = inputData.toDS().map(6 / _) + +// Run 3 batches, and then assert that only 1 metadata file is left at the end +// since the first 2 should have been purged. +testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + AssertOnQuery("metadata log should contain only one file") { q => +val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) +val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) +val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 --- End diff -- Either way is fine with me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15166: [SPARK-17513][SQL] Make StreamExecution garbage-c...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15166#discussion_r79730664 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbage collection") { +val inputData = MemoryStream[Int] +val mapped = inputData.toDS().map(6 / _) + +// Run 3 batches, and then assert that only 1 metadata file is left at the end +// since the first 2 should have been purged. +testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + AssertOnQuery("metadata log should contain only one file") { q => +val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) +val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) +val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 +assert(toTest.size == 1 && toTest.head == "2") +true --- End diff -- Ah, yes. The previous like (146) should be just `toTest.size == 1 && toTest.head == "2"`, with no "assert". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15166: [SPARK-17513][SQL] Make StreamExecution garbage-c...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15166#discussion_r79722643 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbage collection") { +val inputData = MemoryStream[Int] +val mapped = inputData.toDS().map(6 / _) + +// Run 3 batches, and then assert that only 1 metadata file is left at the end +// since the first 2 should have been purged. +testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + AssertOnQuery("metadata log should contain only one file") { q => +val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) +val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) +val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 +assert(toTest.size == 1 && toTest.head == "2") +true --- End diff -- This line ("true") shouldn't be here. It makes the Assert always pass, even when the condition on the previous line isn't satisfied. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15162: [SPARK-17386] [STREAMING] [WIP] Make polling rate...
GitHub user frreiss opened a pull request: https://github.com/apache/spark/pull/15162 [SPARK-17386] [STREAMING] [WIP] Make polling rate adaptive ## What changes were proposed in this pull request? This change makes the scheduler in `StreamExecution` adjust its rate of polling to the rate of data arrival. I used the [AIMD](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease) algorithm, aka TCP congestion avoidance, because it is simple. I renamed `spark.sql.streaming.pollingDelay` to `spark.sql.streaming.minPollingDelay` and added a second parameter `spark.sql.streaming.maxPollingDelay` to serve as an upper bound. This upper bound is necessary because the data arrival rate could be bursty, with infinite variance. This approach works, but I'm not completely satisfied with the current design. The blocking-system-call part of polling really ought to be delegated to the sources themselves, so that the main scheduler thread won't be blocked for unpredictable amounts of time. Also, the polling rate ought to be adaptive on a per-source basis, instead of there being a single global rate per session. Leaving this PR marked as WIP for now. ## How was this patch tested? I tested for three scenarios: * If data is not present, the scheduler ramps up the polling delay quickly to `spark.sql.streaming.maxPollingDelay` * If data arrives quickly, the scheduler keeps its polling delay pinned at `spark.sql.streaming.minPollingDelay` * If new batches arrive at a constant interval between the minimum and maximum delay, the scheduler keeps its polling delay within a factor of two of the actual arrival rate. The testing code is included in this PR. Note that the test cases for the two latter cases are disabled by default, because they are somewhat timing dependent. You can merge this pull request into a Git repository by running: $ git pull https://github.com/frreiss/spark-fred fred-17386a Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15162.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15162 commit 54a8272f1810514ec0a1e92e222359f505c5e58b Author: frreiss <frre...@us.ibm.com> Date: 2016-09-10T05:15:22Z Made polling rate adaptive. commit 24a1f74ff5c401e39858d887212fe4acc5b17bca Author: frreiss <frre...@us.ibm.com> Date: 2016-09-14T22:04:19Z Added test case and corrected some bugs. commit 9b44439d784e51ed75f4476b4432d13155dcdfb0 Author: frreiss <frre...@us.ibm.com> Date: 2016-09-14T22:26:59Z Merge branch 'master' of https://github.com/apache/spark into fred-17386a commit 4439d07e79ca48b5037ecb50017013c75172f6af Author: frreiss <frre...@us.ibm.com> Date: 2016-09-20T16:09:29Z Merge branch 'master' of https://github.com/apache/spark into fred-17386a --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15067: [SPARK-17513] [STREAMING] [SQL] Make StreamExecut...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15067#discussion_r79662093 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -125,6 +125,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbarge collection") { +val inputData = MemoryStream[Int] +val mapped = inputData.toDS().map(6 / _) + +// Run a few batches through the application +testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + // Three batches have run, but only one set of metadata should be present + AssertOnQuery( +q => { + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) + val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 + toTest.size == 1 && toTest.head == "2" --- End diff -- Oops, didn't notice I had left that "true" at the end of that block of code. @petermaxlee , let me know if you need help preparing the final version for merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15067: [SPARK-17513] [STREAMING] [SQL] Make StreamExecut...
Github user frreiss closed the pull request at: https://github.com/apache/spark/pull/15067 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15005 I've about narrowed down the options that work for OpenJDK 7 and 8 on Mac and Linux. Working on IBM Java on Linux. I can have an update in by EOD today. BTW, one thing that's been slowing me down is that my local copies get "corrupted" after a while. Some of the tests in SparkSubmitSuite stop working until I delete and re-clone my local copy of Spark. I suspect there may be a race condition involving some directory that doesn't get cleaned by the "clean" target. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15005 Quick update: I'm running a series of test builds with various parameters to determine what parts of MAVEN_OPTS are currently necessary on different versions of Java. Will report back in a few days and update this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13513: [SPARK-15698][SQL][Streaming] Add the ability to remove ...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/13513 Ah, now I fully understand @zsxwing's earlier comment about the semantics of the semantics of `Source.getBatch()`. Those semantics have a design flaw; see the email thread I started at http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-tt18551.html. Basically, it's impossible to implement a Source to the written API spec without keeping unbounded state. I have an open PR to fix this problem at https://github.com/apache/spark/pull/14553. In the short run, I think that @jerryshao's changes here are ok with respect to `Source.getBatch`. The approach in this PR will work as long as the internal structure of the `StreamExecution` class doesn't change and as long as Spark does not have to recover from an outage longer than the compaction interval. The recent changes to `FileInputStream` under SPARK-17165 (https://github.com/apache/spark/pull/14728) have the same problem, and those changes are already committed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15067: [SPARK-17513] [STREAMING] [SQL] Make StreamExecut...
GitHub user frreiss opened a pull request: https://github.com/apache/spark/pull/15067 [SPARK-17513] [STREAMING] [SQL] Make StreamExecution garbage-collect its metadata ## What changes were proposed in this pull request? This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the `purge` method that was added as part of SPARK-17235. ## How was this patch tested? I added a test case to verify that old metadata log files are correctly purged. I also ran the entire regression suite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/frreiss/spark-fred fred-16963a Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15067.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15067 commit 8cc5e835b209d5796b044978ec4221ee22a8b9d2 Author: frreiss <frre...@us.ibm.com> Date: 2016-09-08T19:59:15Z Added purge() call to scheduler commit d71366d958334ebbc81e45c7f469bad2a68d0a2d Author: frreiss <frre...@us.ibm.com> Date: 2016-09-10T04:23:58Z Added test case and corrected off-by-one error. commit 82f5b681c2e8e52f8549b21c7d058c497f2fc809 Author: frreiss <frre...@us.ibm.com> Date: 2016-09-10T04:24:35Z Merge branch 'master' of https://github.com/apache/spark into fred-16963a --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13513: [SPARK-15698][SQL][Streaming] Add the ability to remove ...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/13513 You could just move the metadata deletion logic from FileStreamSinkLog into CompactibleFileStreamLog. Then FileStreamSource could issue DELETE log records for files that are older than `FileStreamSource.lastPurgeTimestamp`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15027: [SPARK-17475] [STREAMING] Delete CRC files if the...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/15027#discussion_r78469982 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -146,6 +146,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) // It will fail if there is an existing file (someone has committed the batch) logDebug(s"Attempting to write log #${batchIdToPath(batchId)}") fileManager.rename(tempPath, batchIdToPath(batchId)) + + // SPARK-17475: HDFSMetadataLog should not leak CRC files + // If the underlying filesystem didn't rename the CRC file, delete it. --- End diff -- I believe HDFSMetadataLog is only called from Structured Streaming classes currently. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15005: [SPARK-17421] [DOCS] Documenting the current treatment o...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/15005 Sure, I'll redo that part so that includes two sets of recommended options. Note that docs in the Spark 2.0.0 release say that these options aren't necessary for Java 8. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15027: [SPARK-17475] [STREAMING] Delete CRC files if the...
GitHub user frreiss opened a pull request: https://github.com/apache/spark/pull/15027 [SPARK-17475] [STREAMING] Delete CRC files if the filesystem doesn't use checksum files ## What changes were proposed in this pull request? When the metadata logs for various parts of Structured Streaming are stored on non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves hidden HDFS-style checksum (CRC) files in the log directory, one file per batch. This PR modifies HDFSMetadataLog so that it detects the use of a filesystem that doesn't use CRC files and removes the CRC files. ## How was this patch tested? Modified an existing test case in HDFSMetadataLogSuite to check whether HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem. Ran the entire regression suite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/frreiss/spark-fred fred-17475 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15027.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15027 commit 3a2a9c116659f526189de6b8b98fb6c92024a7a6 Author: frreiss <frre...@us.ibm.com> Date: 2016-09-09T17:30:08Z Delete CRC files when the filesystem doesn't support checksums. commit 9ff89c0228c09764fa6444528050a35e823db0e6 Author: frreiss <frre...@us.ibm.com> Date: 2016-09-09T17:30:52Z Merge branch 'master' of https://github.com/apache/spark into fred-17475 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14945: [SPARK-17386] Set default trigger interval to 1/1...
Github user frreiss closed the pull request at: https://github.com/apache/spark/pull/14945 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14945: [SPARK-17386] Set default trigger interval to 1/10 secon...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14945 On a closer reading of the code, there is a more expedient fix; change the default STREAMING_POLLING_DELAY parameter. Will redo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15005: [SPARK-17421] Documenting the current treatment o...
GitHub user frreiss opened a pull request: https://github.com/apache/spark/pull/15005 [SPARK-17421] Documenting the current treatment of MAVEN_OPTS. ## What changes were proposed in this pull request? Modified the documentation to clarify that `build/mvn` and `pom.xml` always add Java 7-specific parameters to `MAVEN_OPTS`, and that developers can safely ignore warnings about `-XX:MaxPermSize` that may result from compiling or running tests with Java 8. ## How was this patch tested? Rebuilt HTML documentation, made sure that building-spark.html displays correctly in a browser. You can merge this pull request into a Git repository by running: $ git pull https://github.com/frreiss/spark-fred fred-17421a Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15005.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15005 commit cfe0f69676cc57922ed2076435a10e6f2355a540 Author: frreiss <frre...@us.ibm.com> Date: 2016-09-07T22:43:00Z Documenting the current treatment of MAVEN_OPTS. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14986: [WIP] [SPARK-17421] Don't use -XX:MaxPermSize opt...
Github user frreiss closed the pull request at: https://github.com/apache/spark/pull/14986 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14986: [WIP] [SPARK-17421] Don't use -XX:MaxPermSize option whe...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14986 Make sense. I will close this PR and just add a clarification to the documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14986: [WIP] [SPARK-17421] Don't use -XX:MaxPermSize opt...
GitHub user frreiss opened a pull request: https://github.com/apache/spark/pull/14986 [WIP] [SPARK-17421] Don't use -XX:MaxPermSize option when Java version >= 8 ## What changes were proposed in this pull request? Modifies the `build/mvn` and `build/sbt-launch-lib.bash` scripts so that they check the Java version and omit the `-XX:MaxPermSize=512M` option for Java versions >= 8. ## How was this patch tested? Ran the build on Linux and Mac with Java 7 and 8, from `build/mvn` and `dev/run-tests`. Also tested Java 8 with `JAVA_7_HOME` pointing to a Java 7 installation. Currently, `dev/run-tests` has a few remaining instances of the MaxPermSize warning, apparently due to another code path to spawning a JVM. Keeping this PR as a WIP until the output of `dev/run-tests` is completely clean. You can merge this pull request into a Git repository by running: $ git pull https://github.com/frreiss/spark-fred fred-17421 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14986.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14986 commit c9beadd02c22c7a84b521003fdebee0ba45bd286 Author: frreiss <frre...@us.ibm.com> Date: 2016-09-06T22:19:44Z Added logic to build/mvn to check Java version. commit 0955947dcf83c90473f4f77b19a0559558bba80c Author: frreiss <frre...@us.ibm.com> Date: 2016-09-06T22:37:47Z Made sbt-launch-lib.bash check for Java >7 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14945: [SPARK-17386] Set default trigger interval to 1/1...
GitHub user frreiss opened a pull request: https://github.com/apache/spark/pull/14945 [SPARK-17386] Set default trigger interval to 1/10 second ## What changes were proposed in this pull request? This pull request implements the most expedient change to fix SPARK-17386: Use a default trigger interval of 100ms instead of polling continuously for new data by default. I've changed the default value used in both `StreamingQueryManager` and `StreamTest` and created a new constant to facilitate changing both of those defaults simultaneously in the future. ## How was this patch tested? All existing regression tests pass, and most of those tests use the default trigger intervals that this PR changes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/frreiss/spark-fred fred-17386 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14945.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14945 commit 13496416bf2409082e29231cad504d1b3bf1 Author: frreiss <frre...@us.ibm.com> Date: 2016-09-02T23:22:59Z Change default trigger to 1/10 second commit b1e7fd55adaa7faf5344ea5ddc135cf9cbaf3507 Author: frreiss <frre...@us.ibm.com> Date: 2016-09-02T23:23:22Z Merge branch 'master' of https://github.com/apache/spark into fred-17386 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14553: [SPARK-16963] Changes to Source trait and related implem...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14553 @ScrapCodes, would you mind triggering a build of this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14803: [SPARK-17153][SQL] Should read partition data when readi...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14803 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14870: [SPARK-17303] Added spark-warehouse to dev/.rat-e...
GitHub user frreiss opened a pull request: https://github.com/apache/spark/pull/14870 [SPARK-17303] Added spark-warehouse to dev/.rat-excludes ## What changes were proposed in this pull request? Excludes the `spark-warehouse` directory from the Apache RAT checks that src/run-tests performs. `spark-warehouse` is created by some of the Spark SQL tests, as well as by `bin/spark-sql`. ## How was this patch tested? Ran src/run-tests twice. The second time, the script failed because the first iteration Made the change in this PR. Ran src/run-tests a third time; RAT checks succeeded. You can merge this pull request into a Git repository by running: $ git pull https://github.com/frreiss/spark-fred fred-17303 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14870.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14870 commit 850261fb54a0d372b12a16a89e16774cc9a4cba8 Author: frreiss <frre...@us.ibm.com> Date: 2016-08-29T23:02:15Z Added spark-warehouse to dev/.rat-excludes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14803: [SPARK-17153][SQL] Should read partition data whe...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14803#discussion_r76646983 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -129,13 +129,20 @@ class FileStreamSource( val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2) logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId") logTrace(s"Files are:\n\t" + files.mkString("\n\t")) +val newOptions = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) { --- End diff -- I recommend putting this code into the initialization code at the top of this file that sets `qualifiedBasePath` (currently lines 47-50). That way all the code that interprets the meaning of the `path` parameter` will be in one place. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14553: [SPARK-16963] Changes to Source trait and related implem...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14553 @rxin and @marmbrus, would it be possible to get this PR reviewed soon? I can split it into smaller chunks if that would make things easier; I just need to know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14691: [SPARK-16407][STREAMING] Allow users to supply cu...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14691#discussion_r76505064 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -123,12 +124,30 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * :: Experimental :: * Specifies the underlying output data source. Built-in options include "parquet", "json", etc. + * Additionally user specific StreamSinkProviders can be specified hear using the fully qualified + * class name. * * @since 2.0.0 */ @Experimental def format(source: String): DataStreamWriter[T] = { this.source = source +this.sinkProvider = null +this + } + + /** + * :: Experimental :: + * Specifies the underlying output data source using a StreamSinkProvider. This is useful for + * sinks which are constructed with user specified functions (such as a user specified version of + * ForeachSink). + * + * @since 2.1.0 + */ + @Experimental + def format(sinkProvider: StreamSinkProvider): DataStreamWriter[T] = { +this.source = null --- End diff -- Probably better to set `source` to a dedicated constant string like "custom" here, so that the branches in start() all have consistent logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14691: [SPARK-16407][STREAMING] Allow users to supply cu...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14691#discussion_r76504239 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -123,12 +124,30 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * :: Experimental :: * Specifies the underlying output data source. Built-in options include "parquet", "json", etc. + * Additionally user specific StreamSinkProviders can be specified hear using the fully qualified --- End diff -- *user-specific* StreamSinkproviders can be specified *here* --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14773: [SPARK-17203][SQL] data source options should alw...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14773#discussion_r76503740 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -65,7 +65,7 @@ case class CreateDataSourceTableCommand( var isExternal = true val optionsWithPath = - if (!new CaseInsensitiveMap(options).contains("path") && managedIfNoPath) { + if (!options.contains("path") && managedIfNoPath) { --- End diff -- Maybe create a constant for the string "path" as a data source param? It occurs in quite a few places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14802: [SPARK-17235][SQL] Support purging of old logs in Metada...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14802 LGTM. I have written nearly the exact same thing as part of [https://github.com/apache/spark/pull/14553], but can use this version of the method instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r76499068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -129,3 +131,86 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" } + +class FileStreamSourceLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[Seq[String]](sparkSession, path) { + + // Configurations about metadata compaction + private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private var compactBatchId: Long = -1L + + private def isCompactionBatch(batchId: Long, compactInterval: Long): Boolean = { +batchId % compactInterval == 0 + } + + override def add(batchId: Long, metadata: Seq[String]): Boolean = { +if (isCompactionBatch(batchId, compactInterval)) { + compactMetadataLog(batchId - 1) +} + +super.add(batchId, metadata) + } + + private def compactMetadataLog(batchId: Long): Unit = { +// read out compact metadata and merge with new metadata. +val batches = super.get(Some(compactBatchId), Some(batchId)) +val totalMetadata = batches.flatMap(_._2) +if (totalMetadata.isEmpty) { + return +} + +// Remove old compact metadata file and rewrite. +val renamedPath = new Path(path, s".${batchId.toString}-${UUID.randomUUID.toString}.tmp") +fileManager.rename(batchIdToPath(batchId), renamedPath) + +var isSuccess = false +try { + isSuccess = super.add(batchId, totalMetadata) +} catch { + case NonFatal(e) => isSuccess = false +} finally { + if (!isSuccess) { +// Rollback to the previous status if compaction is failed. --- End diff -- This rollback code will not execute if the process exits during a compaction operation. You will need cleanup code in the class constructor to handle that case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] Changes to Source trait and related...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r76498637 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -727,6 +732,48 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } + + --- End diff -- This line no longer exists after merging the changes from [https://github.com/apache/spark/pull/14728] and replacing my former implementation of GC for FileStreamSource with the one in that merged PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] Changes to Source trait and related...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r76498301 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -24,21 +24,24 @@ import java.text.SimpleDateFormat import java.util.Calendar import javax.annotation.concurrent.GuardedBy -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.ListBuffer import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} +import org.apache.spark.sql._ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + object TextSocketSource { val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: StructField("timestamp", TimestampType) :: Nil) val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") } + --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] Changes to Source trait and related...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r76498251 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -244,6 +250,21 @@ class StreamExecution( logDebug(s"Resuming with committed offsets: $committedOffsets") } +// Compare the offsets we just read from the checkpoint against the +// sources' own checkpoint data. +val offsetChanges = mutable.Map[Source, Offset]() +committedOffsets.foreach { + case (src, checkptOffset) => +val srcOffset = src.getMinOffset +if (srcOffset.isDefined && srcOffset.get > checkptOffset) { + logWarning(s"Source $src lost offsets between $checkptOffset " + +s"and ${srcOffset.get} when resuming. Skipping ahead to ${srcOffset.get}.") + offsetChanges += (src -> srcOffset.get) +} +} +committedOffsets ++= offsetChanges + + --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [SPARK-16963] Changes to Source trait and related...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r76498223 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala --- @@ -48,4 +49,13 @@ trait MetadataLog[T] { * Return the latest batch Id and its metadata if exist. */ def getLatest(): Option[(Long, T)] + + --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14553: [WIP] [SPARK-16963] Initial version of changes to Source...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/14553 These changes are now ready for review. The contents of this PR pass regression tests on my machines. Can one of the committers please start a Jenkins build? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14151: [SPARK-16496][SQL] Add wholetext as option for re...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14151#discussion_r74805700 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala --- @@ -39,6 +39,11 @@ class TextSuite extends QueryTest with SharedSQLContext { verifyFrame(spark.read.text(testFile)) } + test("reading text file with wholetext option on") { --- End diff -- As far as I'm aware, the most common use case for reading entire files is using a glob to read a directory or directory tree containing multiple files. For example, one might download the Enron corpus (see [https://www.cs.cmu.edu/~./enron/]), which comes packaged with one file per email message. With a large number of files on the input, it's important that the work of processing the files be split among many cores. So the test for the `wholetext` option really should have multiple input files and verify that different files end up in different partitions of the resulting RDD or Dataframe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14151: [SPARK-16496][SQL] Add wholetext as option for re...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14151#discussion_r74804217 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -533,6 +533,12 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val WHOLETEXT = +SQLConfigBuilder("spark.sql.wholetext") --- End diff -- Should this really be a session-global configuration? It seems like something that is specific to a particular input file and should only be set when opening a given file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14553: [WIP] [SPARK-16963] Initial version of changes to...
GitHub user frreiss opened a pull request: https://github.com/apache/spark/pull/14553 [WIP] [SPARK-16963] Initial version of changes to Source trait ## What changes were proposed in this pull request? Initial proposed changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Major changes so far: * Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive. * Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer". * Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`. * Added a `getMinOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code. * Changed the name of `getOffset` to `getMaxOffset` ## How was this patch tested? Testing is still TBD. You can merge this pull request into a Git repository by running: $ git pull https://github.com/frreiss/spark-fred fred-16963 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14553.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14553 commit 6c9acdefe1c791bad3f00d845a72d07e2113b214 Author: frreiss <frre...@us.ibm.com> Date: 2016-08-09T02:28:32Z Initial version of changes to Source trait --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScalarSubque...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/13155 Tests ran successfully on my machine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScalarSubque...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/13155 Updated changes are in. Running a full regression suite overnight. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66564793 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66561815 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66561017 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { --- End diff -- Yes, that is true. Added a test case and an additional clause in that case statement in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66560947 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66560868 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66558119 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66558125 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66539170 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66509510 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66509444 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() --- End diff -- Fixed in my local copy --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66509405 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings --- End diff -- Fixed in my local copy --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScalarSubque...
Github user frreiss commented on the issue: https://github.com/apache/spark/pull/13155 @rxin I'll have an updated set of changes in tonight --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r66478261 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { --- End diff -- No particular reason for using `ExprId.id`. I'll change to using the entire class as a key. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r65584546 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) --- End diff -- The join attributes that the Analyzer adds to the Aggregate node are AttributeReference nodes, and the `evalAggOnZeroTups` method as currently written can't process them. A more comprehensive facility for statically evaluating expressions would be nice to have; but I'm hesitant to add such a mechanism as part of a bug fix. Perhaps a follow-on JIRA is in order? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r65583548 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] --- End diff -- The test on the next line doesn't compile without the cast. The condition in a Filter node is of type Expression, and `Expresssion.eval()` returns `Any`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13155: [SPARK-15370] [SQL] Update RewriteCorrelatedScala...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r65454461 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { --- End diff -- Fixed in my local copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedScalarSubque...
Github user frreiss commented on the pull request: https://github.com/apache/spark/pull/13155 Thanks @hvanhovell for the additional pass of review! I'll be preparing my slides for Spark Summit all day today but will come back to this PR as soon as that's done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r64996012 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r64995985 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r64944724 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r64943404 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r64942870 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r64942480 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { +val rewrittenExpr = expr transform { + case r @ AttributeReference(_, dataType, _, _) => +bindings(r.exprId.id) match { + case Some(v) => Literal.create(v, dataType) + case None => Literal.default(NullType) +} +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalAggOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +aggFunc.defaultResult.getOrElse(Literal.default(NullType)) +} +Option(rewrittenExpr.eval()) + } + + /** + * Statically evaluate a scalar subquery on an empty input. + * + * WARNING: This method only covers subqueries that pass the checks under + * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in + * CheckAnalysis become less restrictive, this method will need to change. + */ + private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = { +// Inputs to this method will start with a chain of zero or more SubqueryAlias +// and Project operators, followed by an optional Filter, followed by an +// Aggregate. Traverse the operators recursively. +def evalPlan(lp : LogicalPlan) : Map[Long, Option[Any]] = { + lp match { +case SubqueryAlias(_, child) => evalPlan(child) +case Filter(condition, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) bindings + else { +val exprResult = evalExpr(condition, bindings).getOrElse(false) + .asInstanceOf[Boolean] +if (exprResult) bindings else Map() + } + +case Project(projectList, child) => + val bindings = evalPlan(child) + if (bindings.size == 0) { +bindings + } else { +projectList.map(ne => (ne.exprId.id, evalExpr(ne, bindings))).toMap + } + +case Aggregate(_, aggExprs, _) => + // Some of the expressions under the Aggregate node are the join columns + // for joining with the outer query block. Fill those expressions in with + // nulls and statically evaluate the remainder. + aggExprs.map(ne => ne match { +case AttributeReference(_, _, _, _) => (ne.exprId.id, None) +case Alias(AttributeReference(_, _, _, _), _) => (ne.exprId.id, None) +case _ => (ne.exprId.id, evalAggOnZeroTups(ne)) + }).toMap + +case _ => sys.error(s"Unexpected operator in scalar subquery: $lp") + } +} + +val resultMap = evalPlan(plan) + +// By convention, the scalar subquery result is the leftmost field. +resultMap(plan.output.head.exprId.id) + } + + /** + * Split the plan for a scalar subquery into the parts above the Aggregate node + * (first part of returned value) and the parts below the Aggregate node, including + * the Aggregate (second part of returned value) + */ + private def splitSubquery(plan : LogicalPlan) : Tuple2[Seq[LogicalPlan], Aggregate] = { +var topPart = List[LogicalPlan]() +var bottomPart : LogicalPlan = plan +while (! bottomPart.isInstanceOf[Aggregate]) { + topPart = bottomPart :: topPart + bottomPart = bottomPart.children.head +} +(topPart, bottomPart.asInstanceOf[Aggregate]) + } + + /** + * Rewrite the nodes above the Aggregate in a subquery so that they generate an + * auxiliary column "isFiltered" + * @param subqueryPlan plan before rewrite + * @param filteredId expression ID for the "isFiltered" column + */ + private def addIsFiltered(subquery
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r64941953 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1695,16 +1695,176 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing zero or more placeholders, given a set + * of bindings for placeholder values. + */ + private def evalExpr(expr : Expression, bindings : Map[Long, Option[Any]]) : Option[Any] = { --- End diff -- Fixed that in my local copy. Will include the change in the next update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on the pull request: https://github.com/apache/spark/pull/13155#issuecomment-221336991 Could one of the committers please trigger another build on this PR? The change set passes all the tests on my machine, but it's good to be safe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on the pull request: https://github.com/apache/spark/pull/13155#issuecomment-220684859 I've added additional changes to cover two additional cases that @hvanhovell pointed out on review, plus one additional case that came up while fixing the first two: - The correlated subquery may have a HAVING clause - The correlated subquery may be nested inside additional query blocks that apply projections - The correlated subquery may return NULL when the correlation bindings join with the subquery and a non-NULL value when bindings do *not* join, e.g. ```sql select l.a from l where (select case when count(*) = 1 then null else count(*) end from r where l.a = r.c) = 0 ``` The logic for statically evaluating the subquery's aggregate expression now handles an Aggregate node with a chain of Filter and Project operators above it. The rewrite logic has an third case to handle subqueries with HAVING clauses. The second case from the original change set now adds an additional column to cover subqueries that return NULL with bindings that join the inner query block. I also addressed various other more minor review comments. After these changes, the algorithm for preventing the COUNT bug in scalar subqueries is now the following: ``` V <- if (V is null) then Use the original rewrite from SPARK-14785. else if (subquery has a Filter above the Aggregate node) then Rewrite the Filter node to a Project that adds a Boolean column isFiltered. Rewrite nodes above the Filter node so they pass through the isFiltered column. else Add an isFiltered column with a hard-coded value of false to the top Project in the subquery. endif Create a left outer join between the outer query block and the rewritten subquery. Put the following case statement into the Project operator above the outer join. case when isFiltered is null then coalesce(aggVal, V) when isFiltered then null else aggVal endif ``` ### Correctness proof Let *b* denote a tuple of correlation bindings from the outer query block. Without loss of generality, assume that the correlated subquery has the plan `Project(Filter(Aggregate(Join({b},T`, where *T* is a table. Note that upstream checks in Catalyst ensure that the Aggregate node will produce exactly one tuple. Consider the evaluation of the original subquery on *b*. We can ask three questions about this evaluation: - Did *b* join with one or more tuples from *T*? - Did the Filter node reject the tuple that the Aggregate node returned? - Did the subquery return `null`? The answer to each of these questions must be "yes" or "no", leading to eight cases: Case \# | Empty join? | Agg filtered? | SQ returns null? -- | --- | - | 1 | Yes | Yes | Yes 2 | Yes | Yes | No 3 | Yes | No| Yes 4 | Yes | No| No 5 | No | Yes | Yes 6 | No | Yes | No 7 | No | No| Yes 8 | No | No| No Cases 2 and 6 are impossible. For the remaining cases, the rewritten subquery returns the correct result: - Case 1 and 3 => Subquery returns null on empty join result, so first branch of rewrite algorithm above applies. Outer join returns null for subquery result. - Case 4 => Subquery returns non-null answer on empty join result, so second or third branch of rewrite applies. Outer join in the rewritten query returns a tuple with `(aggVal, isFiltered)` set to `(null,null)`, so case statement at the top of the rewritten query returns the answer the subquery returns on an empty join, which is the correct answer. - Case 5 => If first branch of rewrite applies, the outer join in the rewritten query returns null. If second or third branch of rewrite applies, then outer join in rewritten query returns a tuple with `(aggVal, isFiltered)` set to `(null, true)`, so case statement at top of rewritten query returns null. - Case 7 => If first branch of rewrite applies, the outer join in the rewritten query returns null. For second or third branch of rewrite, outer join in rewritten query returns a tuple with `(aggVal, isFiltered)` set to `(null, false)`, so case statement at top of rewritten query returns null. - Case 8 => If first branch of rewrite applies, the outer join in the rewritten query returns null. For second or third branch of rewrite, outer join in rewritten query returns a tuple with `(aggVal, isFiltered)` set to `(, false)`, so case statement at top of rewritten query returns the non-null value of a
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r63954767 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1648,16 +1648,56 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +val resultLit = aggFunc.defaultResult match { + case Some(lit) => lit + case None => Literal.default(NullType) +} +Alias(resultLit, "aggVal") (exprId = resultId) +} +Option(rewrittenExpr.eval()) + } + + /** * Construct a new child plan by left joining the given subqueries to a base plan. */ private def constructLeftJoins( child: LogicalPlan, subqueries: ArrayBuffer[ScalarSubquery]): LogicalPlan = { subqueries.foldLeft(child) { case (currentChild, ScalarSubquery(query, conditions, _)) => +val aggOutputExpr = query.asInstanceOf[Aggregate].aggregateExpressions.head --- End diff -- I came across an additional counterexample just now: ```sql select l.a from l where (select case when count(*) = 1 then null else count(*) from r where l.a = r.c) = 0 ``` This subquery returns null when count(*) on the inner query block is 1. The rewrite in this PR turns the overall query into: ```sql select * from l left outer join (select c, case when count(*) = 1 then null else count(*) as cnt from r group by c) sq on l.a = sq.c where coalesce(sq.cnt, 0) = 0 ``` This result is incorrect; if exactly one tuple from R joins with a tuple of L, the query will return the L tuple, even though that tuple should not be returned. So, to summarize, the operators above the outer join need to be able to discern between *four* different cases: 1. A tuple from the outer query block joins with one or more tuples in the subquery and produces an aggregate result that is *not* null 2. A tuple from the outer query block joins with one or more tuples in the subquery and produces an aggregate result that *is* null 3. A tuple from the outer query block does not join with any tuples in the subquery 4. A tuple from the outer query block joins with tuples in the subquery and produces an aggregate result that is filtered out by a HAVING clause A single result column is simply not able to encode all four cases (or even the first three). The output of the outer join needs to have at least two columns to pass through enough information. Here's a scheme that will work as far as I can see: The result of the outer join contains the correlation columns, plus a column `aggVal` for the aggregate value, plus an additional column `isFiltered`. The four cases above are encoded as follows: 1. `aggVal is not null and isFiltered = false` 2. `aggVal is null and isFiltered = false` 3. `isFiltered is null` 4. `isFiltered = true` Then the coalesce expression above the outer join turns into a CASE statement: ```sql case when isFiltered is null then coalesce(aggVal, ) when isFiltered then null else aggVal ``` Here's pseudocode for a rewrite that should produce the correct values of `isFiltered` and `aggVal`: ``` if (subquery returns null when zero tuples join) then Use the original rewrite. else if (subquery has a Filter above the Aggregate node) then Replace the Filter node with a Project that computes the value of isFiltered. Rewrite nodes above the Filter node so they pass through the isFiltered column. else Add an isFiltered column with a hard-coded value of false to the top Project in the subquery. endif Create a left outer join between the outer query block and the rewritten subquery. Put the case statement in the previous listing into the Project operator above the outer join. endif ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --
[GitHub] spark pull request: [SPARK-15370] [SQL] Update RewriteCorrelatedSc...
Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/13155#discussion_r63933593 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1648,16 +1648,56 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } /** + * Statically evaluate an expression containing one or more aggregates on an empty input. + */ + private def evalOnZeroTups(expr : Expression) : Option[Any] = { +// AggregateExpressions are Unevaluable, so we need to replace all aggregates +// in the expression with the value they would return for zero input tuples. +val rewrittenExpr = expr transform { + case a @ AggregateExpression(aggFunc, _, _, resultId) => +val resultLit = aggFunc.defaultResult match { + case Some(lit) => lit + case None => Literal.default(NullType) +} +Alias(resultLit, "aggVal") (exprId = resultId) +} +Option(rewrittenExpr.eval()) + } + + /** * Construct a new child plan by left joining the given subqueries to a base plan. */ private def constructLeftJoins( child: LogicalPlan, subqueries: ArrayBuffer[ScalarSubquery]): LogicalPlan = { subqueries.foldLeft(child) { case (currentChild, ScalarSubquery(query, conditions, _)) => +val aggOutputExpr = query.asInstanceOf[Aggregate].aggregateExpressions.head --- End diff -- Upon further reflection, a Filter node above the Aggregate creates additional issues. If you rewrite the subquery into an outer join above the Filter, it may be impossible to distinguish between two cases: - The original subquery returns null because an aggregate value did not pass the HAVING clause - The aggregate in the original subquery is evaluated over zero tuples In both cases, the outer join will return a tuple containing nulls. For example, the query: ```sql select * from l where (select count(*) cnt from r where l.a = r.c having cnt = 0) = 0 ``` would turn into ```sql select * from l left outer join (select c, count(*) cnt from r group by c having cnt = 0) sq on l.a = sq.c where sq.cnt = 0 ``` Note how the rewritten subquery `select c, count(*) cnt from r group by c having cnt = 0` always returns zero tuples. I'm working through a few possible solutions, trying to see if one of them is guaranteed to be correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org