This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c30b529 [SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource. c30b529 is described below commit c30b5297bc607ae33cc2fcf624b127942154e559 Author: gengjiaan <gengji...@360.cn> AuthorDate: Tue May 28 09:26:06 2019 -0500 [SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource. ## What changes were proposed in this pull request? I checked the code of `org.apache.spark.sql.execution.datasources.DataSource` , there exists duplicate Java reflection. `sourceSchema`,`createSource`,`createSink`,`resolveRelation`,`writeAndRead`, all the methods call the `providingClass.getConstructor().newInstance()`. The instance of `providingClass` is stateless, such as: `KafkaSourceProvider` `RateSourceProvider` `TextSocketSourceProvider` `JdbcRelationProvider` `ConsoleSinkProvider` AFAIK, Java reflection will result in significant performance issue. The oracle website [https://docs.oracle.com/javase/tutorial/reflect/index.html](https://docs.oracle.com/javase/tutorial/reflect/index.html) contains some performance description about Java reflection: ``` Performance Overhead Because reflection involves types that are dynamically resolved, certain Java virtual machine optimizations can not be performed. Consequently, reflective operations have slower performance than their non-reflective counterparts, and should be avoided in sections of code which are called frequently in performance-sensitive applications. ``` I have found some performance cost test of Java reflection as follows: [https://blog.frankel.ch/performance-cost-of-reflection/](https://blog.frankel.ch/performance-cost-of-reflection/) contains performance cost test. [https://stackoverflow.com/questions/435553/java-reflection-performance](https://stackoverflow.com/questions/435553/java-reflection-performance) has a discussion of java reflection. So I think should avoid duplicate Java reflection and reuse the instance of `providingClass`. ## How was this patch tested? Exists UT. Closes #24647 from beliefer/optimize-DataSource. Authored-by: gengjiaan <gengji...@360.cn> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../spark/sql/execution/datasources/DataSource.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ef430f4..04ae528 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -105,6 +105,9 @@ case class DataSource( case _ => cls } } + + private def providingInstance() = providingClass.getConstructor().newInstance() + lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver @@ -210,7 +213,7 @@ case class DataSource( /** Returns the name and schema of the source that can be used to continually read data. */ private def sourceSchema(): SourceInfo = { - providingClass.getConstructor().newInstance() match { + providingInstance() match { case s: StreamSourceProvider => val (name, schema) = s.sourceSchema( sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions) @@ -264,7 +267,7 @@ case class DataSource( /** Returns a source that can be used to continually read data. */ def createSource(metadataPath: String): Source = { - providingClass.getConstructor().newInstance() match { + providingInstance() match { case s: StreamSourceProvider => s.createSource( sparkSession.sqlContext, @@ -293,7 +296,7 @@ case class DataSource( /** Returns a sink that can be used to continually write data. */ def createSink(outputMode: OutputMode): Sink = { - providingClass.getConstructor().newInstance() match { + providingInstance() match { case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode) @@ -324,7 +327,7 @@ case class DataSource( * that files already exist, we don't need to check them again. */ def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = { - val relation = (providingClass.getConstructor().newInstance(), userSpecifiedSchema) match { + val relation = (providingInstance(), userSpecifiedSchema) match { // TODO: Throw when too much is given. case (dataSource: SchemaRelationProvider, Some(schema)) => dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema) @@ -495,7 +498,7 @@ case class DataSource( throw new AnalysisException("Cannot save interval data type into external storage.") } - providingClass.getConstructor().newInstance() match { + providingInstance() match { case dataSource: CreatableRelationProvider => dataSource.createRelation( sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data)) @@ -532,7 +535,7 @@ case class DataSource( throw new AnalysisException("Cannot save interval data type into external storage.") } - providingClass.getConstructor().newInstance() match { + providingInstance() match { case dataSource: CreatableRelationProvider => SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode) case format: FileFormat => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org