Repository: carbondata Updated Branches: refs/heads/carbonstore-rebase5 8b94788a5 -> 7540cc9ca (forced update)
[CARBONDATA-2211] in case of DDL HandOff should not be execute in thread 1. DDL handoff will be executed in the blocking thread. 2. Auto handoff will be executed in a new non-blocking thread. This closes #2008 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7f7ea4d7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7f7ea4d7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7f7ea4d7 Branch: refs/heads/carbonstore-rebase5 Commit: 7f7ea4d757d99b681512cb5f1369187f69f905c8 Parents: a0fc0be Author: rahulforallp <rahul.ku...@knoldus.in> Authored: Tue Feb 27 21:50:20 2018 +0530 Committer: QiangCai <qiang...@qq.com> Committed: Fri Mar 2 09:40:20 2018 +0800 ---------------------------------------------------------------------- .../CarbonAlterTableCompactionCommand.scala | 2 +- .../carbondata/streaming/StreamHandoffRDD.scala | 17 +++++++++++------ .../streaming/CarbonAppendableStreamSink.scala | 3 ++- 3 files changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f7ea4d7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index f6019e4..9b9ca0e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -181,7 +181,7 @@ case class CarbonAlterTableCompactionCommand( if (compactionType == CompactionType.STREAMING) { StreamHandoffRDD.startStreamingHandoffThread( carbonLoadModel, - sqlContext.sparkSession) + sqlContext.sparkSession, true) return } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f7ea4d7/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index b03ee1e..a46ced5 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -279,15 +279,20 @@ object StreamHandoffRDD { */ def startStreamingHandoffThread( carbonLoadModel: CarbonLoadModel, - sparkSession: SparkSession + sparkSession: SparkSession, + isDDL: Boolean ): Unit = { - // start a new thread to execute streaming segment handoff - val handoffThread = new Thread() { - override def run(): Unit = { - iterateStreamingHandoff(carbonLoadModel, sparkSession) + if (isDDL) { + iterateStreamingHandoff(carbonLoadModel, sparkSession) + } else { + // start a new thread to execute streaming segment handoff + val handoffThread = new Thread() { + override def run(): Unit = { + iterateStreamingHandoff(carbonLoadModel, sparkSession) + } } + handoffThread.start() } - handoffThread.start() } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f7ea4d7/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index f2f9853..312d24e 100644 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -178,7 +178,8 @@ class CarbonAppendableStreamSink( if (enableAutoHandoff) { StreamHandoffRDD.startStreamingHandoffThread( carbonLoadModel, - sparkSession) + sparkSession, + false) } } }