This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c30b529  [SPARK-27776][SQL] Avoid duplicate Java reflection in 
DataSource.
c30b529 is described below

commit c30b5297bc607ae33cc2fcf624b127942154e559
Author: gengjiaan <gengji...@360.cn>
AuthorDate: Tue May 28 09:26:06 2019 -0500

    [SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource.
    
    ## What changes were proposed in this pull request?
    
    I checked the code of
    `org.apache.spark.sql.execution.datasources.DataSource`
    , there exists duplicate Java reflection.
    
`sourceSchema`,`createSource`,`createSink`,`resolveRelation`,`writeAndRead`, 
all the methods call the `providingClass.getConstructor().newInstance()`.
    The instance of `providingClass` is stateless, such as:
    `KafkaSourceProvider`
    `RateSourceProvider`
    `TextSocketSourceProvider`
    `JdbcRelationProvider`
    `ConsoleSinkProvider`
    
    AFAIK, Java reflection will result in significant performance issue.
    The oracle website 
[https://docs.oracle.com/javase/tutorial/reflect/index.html](https://docs.oracle.com/javase/tutorial/reflect/index.html)
 contains some performance description about Java reflection:
    
    ```
    Performance Overhead
    Because reflection involves types that are dynamically resolved, certain 
Java virtual machine optimizations can not be performed. Consequently, 
reflective operations have slower performance than their non-reflective 
counterparts, and should be avoided in sections of code which are called 
frequently in performance-sensitive applications.
    ```
    
    I have found some performance cost test of Java reflection as follows:
    
[https://blog.frankel.ch/performance-cost-of-reflection/](https://blog.frankel.ch/performance-cost-of-reflection/)
 contains performance cost test.
    
[https://stackoverflow.com/questions/435553/java-reflection-performance](https://stackoverflow.com/questions/435553/java-reflection-performance)
 has a discussion of java reflection.
    
    So I think should avoid duplicate Java reflection and reuse the instance of 
`providingClass`.
    
    ## How was this patch tested?
    
    Exists UT.
    
    Closes #24647 from beliefer/optimize-DataSource.
    
    Authored-by: gengjiaan <gengji...@360.cn>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../spark/sql/execution/datasources/DataSource.scala      | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ef430f4..04ae528 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -105,6 +105,9 @@ case class DataSource(
       case _ => cls
     }
   }
+
+  private def providingInstance() = 
providingClass.getConstructor().newInstance()
+
   lazy val sourceInfo: SourceInfo = sourceSchema()
   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
   private val equality = sparkSession.sessionState.conf.resolver
@@ -210,7 +213,7 @@ case class DataSource(
 
   /** Returns the name and schema of the source that can be used to 
continually read data. */
   private def sourceSchema(): SourceInfo = {
-    providingClass.getConstructor().newInstance() match {
+    providingInstance() match {
       case s: StreamSourceProvider =>
         val (name, schema) = s.sourceSchema(
           sparkSession.sqlContext, userSpecifiedSchema, className, 
caseInsensitiveOptions)
@@ -264,7 +267,7 @@ case class DataSource(
 
   /** Returns a source that can be used to continually read data. */
   def createSource(metadataPath: String): Source = {
-    providingClass.getConstructor().newInstance() match {
+    providingInstance() match {
       case s: StreamSourceProvider =>
         s.createSource(
           sparkSession.sqlContext,
@@ -293,7 +296,7 @@ case class DataSource(
 
   /** Returns a sink that can be used to continually write data. */
   def createSink(outputMode: OutputMode): Sink = {
-    providingClass.getConstructor().newInstance() match {
+    providingInstance() match {
       case s: StreamSinkProvider =>
         s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, 
partitionColumns, outputMode)
 
@@ -324,7 +327,7 @@ case class DataSource(
    *                        that files already exist, we don't need to check 
them again.
    */
   def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
-    val relation = (providingClass.getConstructor().newInstance(), 
userSpecifiedSchema) match {
+    val relation = (providingInstance(), userSpecifiedSchema) match {
       // TODO: Throw when too much is given.
       case (dataSource: SchemaRelationProvider, Some(schema)) =>
         dataSource.createRelation(sparkSession.sqlContext, 
caseInsensitiveOptions, schema)
@@ -495,7 +498,7 @@ case class DataSource(
       throw new AnalysisException("Cannot save interval data type into 
external storage.")
     }
 
-    providingClass.getConstructor().newInstance() match {
+    providingInstance() match {
       case dataSource: CreatableRelationProvider =>
         dataSource.createRelation(
           sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
@@ -532,7 +535,7 @@ case class DataSource(
       throw new AnalysisException("Cannot save interval data type into 
external storage.")
     }
 
-    providingClass.getConstructor().newInstance() match {
+    providingInstance() match {
       case dataSource: CreatableRelationProvider =>
         SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, 
mode)
       case format: FileFormat =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to