Repository: carbondata
Updated Branches:
  refs/heads/master 8affab843 -> b5609b073


[HOTFIX] Fixed Query performance issue

Problem: Query performance is slower than 1.4
Root cause: In CarbonScanRDD it is getting TaskContext onCompleteCallbacks 
Field using reflection to check whether already InsertTaskCompletionListener is 
already added or not to avoid adding two taskcompletion listener in case of 
insert into. This Scala reflection call is taking ~2 seconds for each query 
task. Because of this query is slower.
Solution: Now added Java reflection to get the onCompleteCallbacks field from 
TaskContextImpl class
Query Time:
Old: ~59 seconds
New: ~33 seconds

This closes #2622


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b5609b07
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b5609b07
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b5609b07

Branch: refs/heads/master
Commit: b5609b07331437145218b638ba00c16ec96afad6
Parents: 8affab8
Author: kumarvishal09 <kumarvishal1...@gmail.com>
Authored: Wed Aug 8 23:25:41 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Aug 9 19:06:47 2018 +0530

----------------------------------------------------------------------
 .../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala     | 3 ++-
 .../scala/org/apache/spark/util/CarbonReflectionUtils.scala | 9 +++++----
 2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b5609b07/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6b43999..38062a4 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -463,9 +463,10 @@ class CarbonScanRDD[T: ClassTag](
       model.setStatisticsRecorder(recorder)
 
       // TODO: rewrite this logic to call free memory in FailureListener on 
failures. On success,
-      // no memory leak should be there, resources should be freed on success 
completion.
+      // TODO: no memory leak should be there, resources should be freed on 
success completion.
       val listeners = CarbonReflectionUtils.getField("onCompleteCallbacks", 
context)
         .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
+
       val isAdded = listeners.exists(p => 
p.isInstanceOf[InsertTaskCompletionListener])
       model.setFreeUnsafeMemory(!isAdded)
       // add task completion before calling initialize as initialize method 
will internally call

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b5609b07/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 4264aa1..1061e98 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util
 
+import java.lang.reflect.Field
+
 import scala.reflect.runtime._
 import scala.reflect.runtime.universe._
 
@@ -50,10 +52,9 @@ object CarbonReflectionUtils {
    * @return
    */
   def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
-    val im = rm.reflect(obj)
-
-    im.symbol.typeSignature.members.find(_.name.toString.equals(name))
-      .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
+    val field = obj.getClass.getDeclaredField(name)
+    field.setAccessible(true)
+    field.get(obj)
   }
 
   def getUnresolvedRelation(

Reply via email to