This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f8ef807  [CARBONDATA-3453] Fix set segment issue in adaptive execution
f8ef807 is described below

commit f8ef807ebbac95cdc12b34d7554c9c533d917cb0
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Thu Jun 27 10:31:30 2019 +0530

    [CARBONDATA-3453] Fix set segment issue in adaptive execution
    
    Cause: For set segments, driver will check carbon property and
    carbon property will look for segments in session params, which
    is not set in current thread incase of adaptive execution.
    
    Solution: Use the session params from RDD's session info,
    where it will be set correctly.
    
    This closes #3307
---
 .../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala |  5 +++--
 .../testsuite/segmentreading/TestSegmentReading.scala   | 17 +++++++++++++++++
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index f90d279..973baa6 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -670,8 +670,9 @@ class CarbonScanRDD[T: ClassTag](
       CarbonInputFormat
         .setQuerySegment(conf,
           carbonSessionInfo.getThreadParams
-            .getProperty(inputSegmentsKey,
-              CarbonProperties.getInstance().getProperty(inputSegmentsKey, 
"*")))
+            .getProperty(inputSegmentsKey, carbonSessionInfo.getSessionParams
+              .getProperty(inputSegmentsKey,
+              CarbonProperties.getInstance().getProperty(inputSegmentsKey, 
"*"))))
       if (queryOnPreAggStreaming) {
         CarbonInputFormat.setAccessStreamingSegments(conf, 
queryOnPreAggStreaming)
         // union for streaming preaggregate can happen concurrently from spark.
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index 5ade510..6394f0b 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -40,6 +40,8 @@ class TestSegmentReading extends QueryTest with 
BeforeAndAfterAll {
 
   override def afterAll(): Unit = {
     cleanAllTable()
+    // reset
+    sql("SET carbon.input.segments.default.carbon_table=*")
   }
 
   test("test SET -V for segment reading property") {
@@ -366,4 +368,19 @@ class TestSegmentReading extends QueryTest with 
BeforeAndAfterAll {
         "SET carbon.input.segments.default.carbon_table=*")
     }
   }
+  test("test with the adaptive execution") {
+    sql("set spark.sql.adaptive.enabled=true")
+
+    sql("SET carbon.input.segments.default.carbon_table=1")
+    checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
+
+    // segment doesn't exist
+    sql("SET carbon.input.segments.default.carbon_table=5")
+    checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(0)))
+
+    sql("SET carbon.input.segments.default.carbon_table=1")
+    checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
+
+    sql("set spark.sql.adaptive.enabled=false")
+  }
 }

Reply via email to