[spark] branch master updated (04f142d -> 67582fd)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 04f142d [SPARK-20547][REPL] Throw RemoteClassLoadedError for transient errors in ExecutorClassLoader add 67582fd [SPARK-27737][SQL][FOLLOW-UP] Move sql/hive-thriftserver/v2.3.4 to sql/hive-thriftserver/v2.3.5 No new revisions were added by this update. Summary of changes: sql/hive-thriftserver/{v2.3.4 => v2.3.5}/if/TCLIService.thrift| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TArrayTypeEntry.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TBinaryColumn.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TBoolColumn.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TBoolValue.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TByteColumn.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TByteValue.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TCLIService.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/TCLIServiceConstants.java | 0 .../org/apache/hive/service/cli/thrift/TCancelDelegationTokenReq.java | 0 .../org/apache/hive/service/cli/thrift/TCancelDelegationTokenResp.java| 0 .../gen/java/org/apache/hive/service/cli/thrift/TCancelOperationReq.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/TCancelOperationResp.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/TCloseOperationReq.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/TCloseOperationResp.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TCloseSessionReq.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/TCloseSessionResp.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TColumn.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TColumnDesc.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TColumnValue.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TDoubleColumn.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TDoubleValue.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/TExecuteStatementReq.java | 0 .../java/org/apache/hive/service/cli/thrift/TExecuteStatementResp.java| 0 .../gen/java/org/apache/hive/service/cli/thrift/TFetchOrientation.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TFetchResultsReq.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/TFetchResultsResp.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetCatalogsReq.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetCatalogsResp.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetColumnsReq.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetColumnsResp.java | 0 .../java/org/apache/hive/service/cli/thrift/TGetDelegationTokenReq.java | 0 .../java/org/apache/hive/service/cli/thrift/TGetDelegationTokenResp.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetFunctionsReq.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/TGetFunctionsResp.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetInfoReq.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetInfoResp.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetInfoType.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetInfoValue.java| 0 .../java/org/apache/hive/service/cli/thrift/TGetOperationStatusReq.java | 0 .../java/org/apache/hive/service/cli/thrift/TGetOperationStatusResp.java | 0 .../java/org/apache/hive/service/cli/thrift/TGetResultSetMetadataReq.java | 0 .../org/apache/hive/service/cli/thrift/TGetResultSetMetadataResp.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetSchemasReq.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetSchemasResp.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/TGetTableTypesReq.java| 0 .../gen/java/org/apache/hive/service/cli/thrift/TGetTableTypesResp.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetTablesReq.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetTablesResp.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetTypeInfoReq.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TGetTypeInfoResp.java | 0 .../gen/java/org/apache/hive/service/cli/thrift/THandleIdentifier.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TI16Column.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TI16Value.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TI32Column.java | 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TI32Value.java| 0 .../src/gen/java/org/apache/hive/service/cli/thrift/TI64Column.java | 0
[spark] branch master updated: [SPARK-20547][REPL] Throw RemoteClassLoadedError for transient errors in ExecutorClassLoader
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 04f142d [SPARK-20547][REPL] Throw RemoteClassLoadedError for transient errors in ExecutorClassLoader 04f142d is described below commit 04f142db9c4f87699053eb3aa777c08aca57b524 Author: Shixiong Zhu AuthorDate: Tue May 28 12:56:14 2019 -0700 [SPARK-20547][REPL] Throw RemoteClassLoadedError for transient errors in ExecutorClassLoader ## What changes were proposed in this pull request? `ExecutorClassLoader`'s `findClass` may fail to fetch a class due to transient exceptions. For example, when a task is interrupted, if `ExecutorClassLoader` is fetching a class, you may see `InterruptedException` or `IOException` wrapped by `ClassNotFoundException`, even if this class can be loaded. Then the result of `findClass` will be cached by JVM, and later when the same class is being loaded in the same executor, it will just throw NoClassDefFoundError even if the class can be loaded. I found JVM only caches `LinkageError` and `ClassNotFoundException`. Hence in this PR, I changed ExecutorClassLoader to throw `RemoteClassLoadedError` if we cannot get a response from driver. ## How was this patch tested? New unit tests. Closes #24683 from zsxwing/SPARK-20547-fix. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu --- .../network/server/TransportRequestHandler.java| 2 + .../apache/spark/repl/ExecutorClassLoader.scala| 45 ++- .../spark/repl/ExecutorClassLoaderSuite.scala | 145 - .../scala/org/apache/spark/repl/ReplSuite.scala| 17 ++- .../org/apache/spark/repl/SingletonReplSuite.scala | 16 +++ 5 files changed, 214 insertions(+), 11 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 3e089b4..0792b58 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -140,6 +140,8 @@ public class TransportRequestHandler extends MessageHandler { streamManager.streamSent(req.streamId); }); } else { + // org.apache.spark.repl.ExecutorClassLoader.STREAM_NOT_FOUND_REGEX should also be updated + // when the following error message is changed. respond(new StreamFailure(req.streamId, String.format( "Stream '%s' was not found.", req.streamId))); } diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index 177bce2..0cfd961 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -21,6 +21,8 @@ import java.io.{ByteArrayOutputStream, FileNotFoundException, FilterInputStream, import java.net.{URI, URL, URLEncoder} import java.nio.channels.Channels +import scala.util.control.NonFatal + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.xbean.asm7._ import org.apache.xbean.asm7.Opcodes._ @@ -106,7 +108,17 @@ class ExecutorClassLoader( parentLoader.loadClass(name) } catch { case e: ClassNotFoundException => - val classOption = findClassLocally(name) + val classOption = try { +findClassLocally(name) + } catch { +case e: RemoteClassLoaderError => + throw e +case NonFatal(e) => + // Wrap the error to include the class name + // scalastyle:off throwerror + throw new RemoteClassLoaderError(name, e) + // scalastyle:on throwerror + } classOption match { case None => throw new ClassNotFoundException(name, e) case Some(a) => a @@ -115,14 +127,15 @@ class ExecutorClassLoader( } } + // See org.apache.spark.network.server.TransportRequestHandler.processStreamRequest. + private val STREAM_NOT_FOUND_REGEX = s"Stream '.*' was not found.".r.pattern + private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = { -val channel = env.rpcEnv.openChannel(s"$classUri/$path") +val channel = env.rpcEnv.openChannel(s"$classUri/${urlEncode(path)}") new FilterInputStream(Channels.newInputStream(channel)) { override def read(): Int = toClassNotFound(super.read()) - override def read(b: Array[Byte]): Int = toClassNotFound(super.read(b)) - override def read(b: Array[Byte], offset: Int, len: Int) =
[spark] branch master updated: [SPARK-27863][SQL] Metadata files and temporary files should not be counted as data files
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4e61de4 [SPARK-27863][SQL] Metadata files and temporary files should not be counted as data files 4e61de4 is described below commit 4e61de4380ba8f589f202b889935c93338ea520f Author: Yuming Wang AuthorDate: Tue May 28 09:28:35 2019 -0700 [SPARK-27863][SQL] Metadata files and temporary files should not be counted as data files ## What changes were proposed in this pull request? [`DataSourceUtils.isDataPath(path)`](https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L95) should be `DataSourceUtils.isDataPath(status.getPath)`. This pr fix this issue. ## How was this patch tested? unit tests Closes #24725 from wangyum/SPARK-27863. Authored-by: Yuming Wang Signed-off-by: Dongjoon Hyun --- .../spark/sql/execution/command/CommandUtils.scala | 11 --- .../spark/sql/StatisticsCollectionSuite.scala | 38 +- .../spark/sql/StatisticsCollectionTestBase.scala | 4 +++ .../apache/spark/sql/hive/StatisticsSuite.scala| 2 +- 4 files changed, 48 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 70e7cd9..cac2519 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -64,9 +64,7 @@ object CommandUtils extends Logging { val paths = partitions.map(x => new Path(x.storage.locationUri.get)) val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") val pathFilter = new PathFilter with Serializable { - override def accept(path: Path): Boolean = { -DataSourceUtils.isDataPath(path) && !path.getName.startsWith(stagingDir) - } + override def accept(path: Path): Boolean = isDataPath(path, stagingDir) } val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles( paths, sessionState.newHadoopConf(), pathFilter, spark) @@ -98,8 +96,7 @@ object CommandUtils extends Logging { val size = if (fileStatus.isDirectory) { fs.listStatus(path) .map { status => -if (!status.getPath.getName.startsWith(stagingDir) && - DataSourceUtils.isDataPath(path)) { +if (isDataPath(status.getPath, stagingDir)) { getPathSize(fs, status.getPath) } else { 0L @@ -343,4 +340,8 @@ object CommandUtils extends Logging { cs.copy(histogram = Some(histogram)) } } + + private def isDataPath(path: Path, stagingDir: String): Boolean = { +!path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index ba47b09..4c78f85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql -import java.io.File +import java.io.{File, PrintWriter} +import java.net.URI import java.util.TimeZone import java.util.concurrent.TimeUnit @@ -614,4 +615,39 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Metadata files and temporary files should not be counted as data files") { +withTempDir { tempDir => + val tableName = "t1" + val stagingDirName = ".test-staging-dir" + val tableLocation = s"${tempDir.toURI}/$tableName" + withSQLConf( +SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true", +"hive.exec.stagingdir" -> stagingDirName) { +withTable("t1") { + sql(s"CREATE TABLE $tableName(c1 BIGINT) USING PARQUET LOCATION '$tableLocation'") + sql(s"INSERT INTO TABLE $tableName VALUES(1)") + + val staging = new File(new URI(s"$tableLocation/$stagingDirName")) + Utils.tryWithResource(new PrintWriter(staging)) { stagingWriter => +stagingWriter.write("12") + } + + val metadata = new File(new URI(s"$tableLocation/_metadata")) + Utils.tryWithResource(new PrintWriter(metadata)) { metadataWriter => +metadataWriter.write("1234") + } + + sql(s"INSERT INTO TABLE $tableName VALUES(1)") + + val stagingFileSize = staging.length() + val metadataFileSize =
[spark] branch branch-2.4 updated: [SPARK-27657][ML] Fix the log format of ml.util.Instrumentation.logFai…
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new a4bbe02 [SPARK-27657][ML] Fix the log format of ml.util.Instrumentation.logFai… a4bbe02 is described below commit a4bbe02ffdabe103d5cd614fe80a19e744d0f1a6 Author: MJ Tang AuthorDate: Tue May 28 09:29:46 2019 -0500 [SPARK-27657][ML] Fix the log format of ml.util.Instrumentation.logFai… …lure ## What changes were proposed in this pull request? The failure log format is fixed according to the jdk implementation. ## How was this patch tested? Manual tests have been done. The new failure log format would be like: java.lang.RuntimeException: Failed to finish the task at com.xxx.Test.test(Test.java:106) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124) at org.testng.internal.Invoker.invokeMethod(Invoker.java:571) at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:707) at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:979) at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125) at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109) at org.testng.TestRunner.privateRun(TestRunner.java:648) at org.testng.TestRunner.run(TestRunner.java:505) at org.testng.SuiteRunner.runTest(SuiteRunner.java:455) at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450) at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415) at org.testng.SuiteRunner.run(SuiteRunner.java:364) at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84) at org.testng.TestNG.runSuitesSequentially(TestNG.java:1187) at org.testng.TestNG.runSuitesLocally(TestNG.java:1116) at org.testng.TestNG.runSuites(TestNG.java:1028) at org.testng.TestNG.run(TestNG.java:996) at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72) at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:123) Caused by: java.io.FileNotFoundException: File is not found at com.xxx.Test.test(Test.java:105) ... 24 more Closes #24684 from breakdawn/master. Authored-by: MJ Tang Signed-off-by: Sean Owen (cherry picked from commit 1824cbfa39c92d999e24173f2337f518aa5e3e9b) Signed-off-by: Sean Owen --- mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala index 4965491..e7c1a50 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.util +import java.io.{PrintWriter, StringWriter} import java.util.UUID import scala.util.{Failure, Success, Try} @@ -160,8 +161,9 @@ private[spark] class Instrumentation private () extends Logging { * Logs an exception raised during a training session. */ def logFailure(e: Throwable): Unit = { -val msg = e.getStackTrace.mkString("\n") -super.logError(msg) +val msg = new StringWriter() +e.printStackTrace(new PrintWriter(msg)) +super.logError(msg.toString) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27657][ML] Fix the log format of ml.util.Instrumentation.logFai…
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1824cbf [SPARK-27657][ML] Fix the log format of ml.util.Instrumentation.logFai… 1824cbf is described below commit 1824cbfa39c92d999e24173f2337f518aa5e3e9b Author: MJ Tang AuthorDate: Tue May 28 09:29:46 2019 -0500 [SPARK-27657][ML] Fix the log format of ml.util.Instrumentation.logFai… …lure ## What changes were proposed in this pull request? The failure log format is fixed according to the jdk implementation. ## How was this patch tested? Manual tests have been done. The new failure log format would be like: java.lang.RuntimeException: Failed to finish the task at com.xxx.Test.test(Test.java:106) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124) at org.testng.internal.Invoker.invokeMethod(Invoker.java:571) at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:707) at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:979) at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125) at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109) at org.testng.TestRunner.privateRun(TestRunner.java:648) at org.testng.TestRunner.run(TestRunner.java:505) at org.testng.SuiteRunner.runTest(SuiteRunner.java:455) at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450) at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415) at org.testng.SuiteRunner.run(SuiteRunner.java:364) at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84) at org.testng.TestNG.runSuitesSequentially(TestNG.java:1187) at org.testng.TestNG.runSuitesLocally(TestNG.java:1116) at org.testng.TestNG.runSuites(TestNG.java:1028) at org.testng.TestNG.run(TestNG.java:996) at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72) at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:123) Caused by: java.io.FileNotFoundException: File is not found at com.xxx.Test.test(Test.java:105) ... 24 more Closes #24684 from breakdawn/master. Authored-by: MJ Tang Signed-off-by: Sean Owen --- mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala index 780650d..8cd4a7c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.util +import java.io.{PrintWriter, StringWriter} import java.util.UUID import scala.util.{Failure, Success, Try} @@ -161,8 +162,9 @@ private[spark] class Instrumentation private () extends Logging with MLEvents { * Logs an exception raised during a training session. */ def logFailure(e: Throwable): Unit = { -val msg = e.getStackTrace.mkString("\n") -super.logError(msg) +val msg = new StringWriter() +e.printStackTrace(new PrintWriter(msg)) +super.logError(msg.toString) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource.
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c30b529 [SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource. c30b529 is described below commit c30b5297bc607ae33cc2fcf624b127942154e559 Author: gengjiaan AuthorDate: Tue May 28 09:26:06 2019 -0500 [SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource. ## What changes were proposed in this pull request? I checked the code of `org.apache.spark.sql.execution.datasources.DataSource` , there exists duplicate Java reflection. `sourceSchema`,`createSource`,`createSink`,`resolveRelation`,`writeAndRead`, all the methods call the `providingClass.getConstructor().newInstance()`. The instance of `providingClass` is stateless, such as: `KafkaSourceProvider` `RateSourceProvider` `TextSocketSourceProvider` `JdbcRelationProvider` `ConsoleSinkProvider` AFAIK, Java reflection will result in significant performance issue. The oracle website [https://docs.oracle.com/javase/tutorial/reflect/index.html](https://docs.oracle.com/javase/tutorial/reflect/index.html) contains some performance description about Java reflection: ``` Performance Overhead Because reflection involves types that are dynamically resolved, certain Java virtual machine optimizations can not be performed. Consequently, reflective operations have slower performance than their non-reflective counterparts, and should be avoided in sections of code which are called frequently in performance-sensitive applications. ``` I have found some performance cost test of Java reflection as follows: [https://blog.frankel.ch/performance-cost-of-reflection/](https://blog.frankel.ch/performance-cost-of-reflection/) contains performance cost test. [https://stackoverflow.com/questions/435553/java-reflection-performance](https://stackoverflow.com/questions/435553/java-reflection-performance) has a discussion of java reflection. So I think should avoid duplicate Java reflection and reuse the instance of `providingClass`. ## How was this patch tested? Exists UT. Closes #24647 from beliefer/optimize-DataSource. Authored-by: gengjiaan Signed-off-by: Sean Owen --- .../spark/sql/execution/datasources/DataSource.scala | 15 +-- 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ef430f4..04ae528 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -105,6 +105,9 @@ case class DataSource( case _ => cls } } + + private def providingInstance() = providingClass.getConstructor().newInstance() + lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver @@ -210,7 +213,7 @@ case class DataSource( /** Returns the name and schema of the source that can be used to continually read data. */ private def sourceSchema(): SourceInfo = { -providingClass.getConstructor().newInstance() match { +providingInstance() match { case s: StreamSourceProvider => val (name, schema) = s.sourceSchema( sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions) @@ -264,7 +267,7 @@ case class DataSource( /** Returns a source that can be used to continually read data. */ def createSource(metadataPath: String): Source = { -providingClass.getConstructor().newInstance() match { +providingInstance() match { case s: StreamSourceProvider => s.createSource( sparkSession.sqlContext, @@ -293,7 +296,7 @@ case class DataSource( /** Returns a sink that can be used to continually write data. */ def createSink(outputMode: OutputMode): Sink = { -providingClass.getConstructor().newInstance() match { +providingInstance() match { case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode) @@ -324,7 +327,7 @@ case class DataSource( *that files already exist, we don't need to check them again. */ def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = { -val relation = (providingClass.getConstructor().newInstance(), userSpecifiedSchema) match { +val relation = (providingInstance(), userSpecifiedSchema) match { // TODO: Throw when too much is given.