This is an automated email from the ASF dual-hosted git repository. kunalkapoor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new f8ef807 [CARBONDATA-3453] Fix set segment issue in adaptive execution f8ef807 is described below commit f8ef807ebbac95cdc12b34d7554c9c533d917cb0 Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Thu Jun 27 10:31:30 2019 +0530 [CARBONDATA-3453] Fix set segment issue in adaptive execution Cause: For set segments, driver will check carbon property and carbon property will look for segments in session params, which is not set in current thread incase of adaptive execution. Solution: Use the session params from RDD's session info, where it will be set correctly. This closes #3307 --- .../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala | 5 +++-- .../testsuite/segmentreading/TestSegmentReading.scala | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index f90d279..973baa6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -670,8 +670,9 @@ class CarbonScanRDD[T: ClassTag]( CarbonInputFormat .setQuerySegment(conf, carbonSessionInfo.getThreadParams - .getProperty(inputSegmentsKey, - CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*"))) + .getProperty(inputSegmentsKey, carbonSessionInfo.getSessionParams + .getProperty(inputSegmentsKey, + CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*")))) if (queryOnPreAggStreaming) { CarbonInputFormat.setAccessStreamingSegments(conf, queryOnPreAggStreaming) // union for streaming preaggregate can happen concurrently from spark. diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala index 5ade510..6394f0b 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala @@ -40,6 +40,8 @@ class TestSegmentReading extends QueryTest with BeforeAndAfterAll { override def afterAll(): Unit = { cleanAllTable() + // reset + sql("SET carbon.input.segments.default.carbon_table=*") } test("test SET -V for segment reading property") { @@ -366,4 +368,19 @@ class TestSegmentReading extends QueryTest with BeforeAndAfterAll { "SET carbon.input.segments.default.carbon_table=*") } } + test("test with the adaptive execution") { + sql("set spark.sql.adaptive.enabled=true") + + sql("SET carbon.input.segments.default.carbon_table=1") + checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10))) + + // segment doesn't exist + sql("SET carbon.input.segments.default.carbon_table=5") + checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(0))) + + sql("SET carbon.input.segments.default.carbon_table=1") + checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10))) + + sql("set spark.sql.adaptive.enabled=false") + } }