Repository: spark Updated Branches: refs/heads/master 1ea49916a -> e48ebc4e4
[SPARK-15698][SQL][STREAMING][FOLLW-UP] Fix FileStream source and sink log get configuration issue ## What changes were proposed in this pull request? This issue was introduced in the previous commit of SPARK-15698. Mistakenly change the way to get configuration back to original one, so here with the follow up PR to revert them up. ## How was this patch tested? N/A Ping zsxwing , please review again, sorry to bring the inconvenience. Thanks a lot. Author: jerryshao <ss...@hortonworks.com> Closes #15173 from jerryshao/SPARK-15698-follow. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e48ebc4e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e48ebc4e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e48ebc4e Branch: refs/heads/master Commit: e48ebc4e403ca3a0e580b47aadffe9fbfcf3c655 Parents: 1ea4991 Author: jerryshao <ss...@hortonworks.com> Authored: Tue Sep 20 22:36:24 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue Sep 20 22:36:24 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/streaming/FileStreamSinkLog.scala | 9 +++------ .../spark/sql/execution/streaming/FileStreamSourceLog.scala | 7 +++---- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 8 +++++++- 3 files changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e48ebc4e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 64f2f00..f9e2416 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -84,14 +84,11 @@ class FileStreamSinkLog( private implicit val formats = Serialization.formats(NoTypeHints) - protected override val fileCleanupDelayMs = - sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY) + protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay - protected override val isDeletingExpiredLog = - sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION) + protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion - protected override val compactInterval = - sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL) + protected override val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompactInterval require(compactInterval > 0, s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + "to a positive value.") http://git-wip-us.apache.org/repos/asf/spark/blob/e48ebc4e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 8103309..4681f2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -39,16 +39,15 @@ class FileStreamSourceLog( // Configurations about metadata compaction protected override val compactInterval = - sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + sparkSession.sessionState.conf.fileSourceLogCompactInterval require(compactInterval > 0, s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + s"positive value.") protected override val fileCleanupDelayMs = - sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + sparkSession.sessionState.conf.fileSourceLogCleanupDelay - protected override val isDeletingExpiredLog = - sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSourceLogDeletion private implicit val formats = Serialization.formats(NoTypeHints) http://git-wip-us.apache.org/repos/asf/spark/blob/e48ebc4e/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f8b7a7f..e67140f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -620,10 +620,16 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) - def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL) + def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL) def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY) + def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION) + + def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL) + + def fileSourceLogCleanupDelay: Long = getConf(FILE_SOURCE_LOG_CLEANUP_DELAY) + def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE) def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org