This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1fcd537a37b [SPARK-43757][CONNECT] Change client compatibility from allow list to deny list 1fcd537a37b is described below commit 1fcd537a37b2457092e20f8034f23917a8ae2ffa Author: Zhen Li <zhenli...@users.noreply.github.com> AuthorDate: Wed Jun 28 10:37:38 2023 -0400 [SPARK-43757][CONNECT] Change client compatibility from allow list to deny list ### What changes were proposed in this pull request? Expand the client compatibility check to include all sql APIs. ### Why are the changes needed? Enhance the API compatibility coverage ### Does this PR introduce _any_ user-facing change? No, except it fixes a few wrong types and hides a few helper methods internally. ### How was this patch tested? Existing tests. Closes #41284 from zhenlineo/compatibility-check-allowlist. Authored-by: Zhen Li <zhenli...@users.noreply.github.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../apache/spark/sql/KeyValueGroupedDataset.scala | 6 +- .../scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../sql/streaming/StreamingQueryException.scala | 3 +- .../sql/streaming/StreamingQueryManager.scala | 3 +- .../CheckConnectJvmClientCompatibility.scala | 327 ++++++++++++++------- 5 files changed, 225 insertions(+), 116 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 20c130b83cb..e67ef1c0fa7 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode * * @since 3.5.0 */ -abstract class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable { +class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable { /** * Returns a new [[KeyValueGroupedDataset]] where the type of the key has been mapped to the @@ -462,7 +462,7 @@ abstract class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable UdfUtils.coGroupFunctionToScalaFunc(f))(encoder) } - protected def flatMapGroupsWithStateHelper[S: Encoder, U: Encoder]( + protected[sql] def flatMapGroupsWithStateHelper[S: Encoder, U: Encoder]( outputMode: Option[OutputMode], timeoutConf: GroupStateTimeout, initialState: Option[KeyValueGroupedDataset[K, S]], @@ -923,7 +923,7 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV]( agg(aggregator) } - override protected def flatMapGroupsWithStateHelper[S: Encoder, U: Encoder]( + override protected[sql] def flatMapGroupsWithStateHelper[S: Encoder, U: Encoder]( outputMode: Option[OutputMode], timeoutConf: GroupStateTimeout, initialState: Option[KeyValueGroupedDataset[K, S]], diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 45e7dca38d7..54e9102c55c 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -429,7 +429,7 @@ class SparkSession private[sql] ( * * @since 3.4.0 */ - object implicits extends SQLImplicits(this) + object implicits extends SQLImplicits(this) with Serializable // scalastyle:on def newSession(): SparkSession = { diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index d5e9982dfbf..512c94f5c70 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -36,7 +36,8 @@ class StreamingQueryException private[sql] ( message: String, errorClass: String, stackTrace: String) - extends SparkThrowable { + extends Exception(message) + with SparkThrowable { override def getErrorClass: String = errorClass diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 775921ff579..13bbf470639 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -25,6 +25,7 @@ import org.apache.spark.annotation.Evolving import org.apache.spark.connect.proto.Command import org.apache.spark.connect.proto.StreamingQueryManagerCommand import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession /** @@ -33,7 +34,7 @@ import org.apache.spark.sql.SparkSession * @since 3.5.0 */ @Evolving -class StreamingQueryManager private[sql] (sparkSession: SparkSession) { +class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Logging { /** * Returns a list of active queries associated with this SQLContext diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index acc469672b4..f22baddc01e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -17,18 +17,14 @@ package org.apache.spark.sql.connect.client import java.io.{File, Writer} -import java.net.URLClassLoader import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import java.util.regex.Pattern -import scala.reflect.runtime.universe.runtimeMirror - -import com.typesafe.tools.mima.core.{Problem, ProblemFilter, ProblemFilters} +import com.typesafe.tools.mima.core._ import com.typesafe.tools.mima.lib.MiMaLib import org.apache.spark.sql.connect.client.util.IntegrationTestUtils._ -import org.apache.spark.util.ChildFirstURLClassLoader /** * A tool for checking the binary compatibility of the connect client API against the spark SQL @@ -70,6 +66,16 @@ object CheckConnectJvmClientCompatibility { sqlJar, "Sql") + val problemsWithClientModule = + checkMiMaCompatibilityWithReversedSqlModule(clientJar, sqlJar) + appendMimaCheckErrorMessageIfNeeded( + resultWriter, + problemsWithClientModule, + clientJar, + sqlJar, + "ReversedSql", + "Sql") + val avroJar: File = findJar("connector/avro", "spark-avro", "spark-avro") val problemsWithAvroModule = checkMiMaCompatibilityWithAvroModule(clientJar, avroJar) appendMimaCheckErrorMessageIfNeeded( @@ -89,9 +95,6 @@ object CheckConnectJvmClientCompatibility { clientJar, protobufJar, "Protobuf") - - val incompatibleApis = checkDatasetApiCompatibility(clientJar, sqlJar) - appendIncompatibleDatasetApisErrorMessageIfNeeded(resultWriter, incompatibleApis) } catch { case e: Throwable => println(e.getMessage) @@ -122,65 +125,62 @@ object CheckConnectJvmClientCompatibility { private def checkMiMaCompatibilityWithSqlModule( clientJar: File, sqlJar: File): List[Problem] = { - val includedRules = Seq( - IncludeByName("org.apache.spark.sql.catalog.Catalog.*"), - IncludeByName("org.apache.spark.sql.catalog.CatalogMetadata.*"), - IncludeByName("org.apache.spark.sql.catalog.Column.*"), - IncludeByName("org.apache.spark.sql.catalog.Database.*"), - IncludeByName("org.apache.spark.sql.catalog.Function.*"), - IncludeByName("org.apache.spark.sql.catalog.Table.*"), - IncludeByName("org.apache.spark.sql.Column.*"), - IncludeByName("org.apache.spark.sql.ColumnName.*"), - IncludeByName("org.apache.spark.sql.DataFrame.*"), - IncludeByName("org.apache.spark.sql.DataFrameReader.*"), - IncludeByName("org.apache.spark.sql.DataFrameNaFunctions.*"), - IncludeByName("org.apache.spark.sql.DataFrameStatFunctions.*"), - IncludeByName("org.apache.spark.sql.DataFrameWriter.*"), - IncludeByName("org.apache.spark.sql.DataFrameWriterV2.*"), - IncludeByName("org.apache.spark.sql.Dataset.*"), - IncludeByName("org.apache.spark.sql.functions.*"), - IncludeByName("org.apache.spark.sql.KeyValueGroupedDataset.*"), - IncludeByName("org.apache.spark.sql.RelationalGroupedDataset.*"), - IncludeByName("org.apache.spark.sql.SparkSession.*"), - IncludeByName("org.apache.spark.sql.RuntimeConfig.*"), - IncludeByName("org.apache.spark.sql.TypedColumn.*"), - IncludeByName("org.apache.spark.sql.SQLImplicits.*"), - IncludeByName("org.apache.spark.sql.DatasetHolder.*"), - IncludeByName("org.apache.spark.sql.streaming.DataStreamReader.*"), - IncludeByName("org.apache.spark.sql.streaming.DataStreamWriter.*"), - IncludeByName("org.apache.spark.sql.streaming.StreamingQuery.*"), - IncludeByName("org.apache.spark.sql.streaming.StreamingQueryManager.active"), - IncludeByName("org.apache.spark.sql.streaming.StreamingQueryManager.get"), - IncludeByName("org.apache.spark.sql.streaming.StreamingQueryManager.awaitAnyTermination"), - IncludeByName("org.apache.spark.sql.streaming.StreamingQueryManager.resetTerminated"), - IncludeByName("org.apache.spark.sql.streaming.StreamingQueryStatus.*"), - IncludeByName("org.apache.spark.sql.streaming.StreamingQueryProgress.*")) + val includedRules = Seq(IncludeByName("org.apache.spark.sql.*")) val excludeRules = Seq( // Filter unsupported rules: // Note when muting errors for a method, checks on all overloading methods are also muted. - // Skip all shaded dependencies and proto files in the client. - ProblemFilters.exclude[Problem]("org.sparkproject.*"), - ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"), + // Skip unsupported packages + ProblemFilters.exclude[Problem]("org.apache.spark.sql.api.*"), // Java, Python, R + ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalyst.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.columnar.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.connector.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.execution.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.expressions.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.internal.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.jdbc.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.sources.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.streaming.ui.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.test.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.util.*"), + + // Skip private[sql] constructors + ProblemFilters.exclude[Problem]("org.apache.spark.sql.*.this"), + + // Skip unsupported classes + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ExperimentalMethods"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLContext"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLContext$*"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SparkSessionExtensions"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.SparkSessionExtensionsProvider"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UDTFRegistration"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UDFRegistration"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UDFRegistration$"), // DataFrame Reader & Writer - ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), // deprecated + ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), // rdd // DataFrameNaFunctions - ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.this"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.fillValue"), // DataFrameStatFunctions ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameStatFunctions.bloomFilter"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameStatFunctions.this"), // Dataset + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.Dataset$" // private[sql] + ), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.ofRows"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_TAG"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.COL_POS_KEY"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_KEY"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.curId"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.observe"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ObservationListener"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ObservationListener$"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"), @@ -191,7 +191,6 @@ object CheckConnectJvmClientCompatibility { ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.this"), // functions ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udf"), @@ -203,11 +202,12 @@ object CheckConnectJvmClientCompatibility { // KeyValueGroupedDataset ProblemFilters.exclude[Problem]( "org.apache.spark.sql.KeyValueGroupedDataset.queryExecution"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.KeyValueGroupedDataset.this"), // RelationalGroupedDataset + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.RelationalGroupedDataset$*" // private[sql] + ), ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.apply"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.this"), // SparkSession ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"), @@ -226,16 +226,32 @@ object CheckConnectJvmClientCompatibility { "org.apache.spark.sql.SparkSession.baseRelationToDataFrame"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataset"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"), // TODO(SPARK-44068): Support positional parameters in Scala connect client ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sql"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"), + + // SparkSession#implicits + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#implicits._sqlContext"), + + // SparkSession#Builder + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#Builder.appName"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#Builder.config"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#Builder.master"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#Builder.enableHiveSupport"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#Builder.withExtensions"), // RuntimeConfig - ProblemFilters.exclude[Problem]("org.apache.spark.sql.RuntimeConfig.this"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig$"), - // TypedColumn - ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"), // DataStreamWriter + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.DataStreamWriter$"), ProblemFilters.exclude[Problem]( "org.apache.spark.sql.streaming.DataStreamWriter.foreachBatch" // TODO(SPARK-42944) ), @@ -243,27 +259,161 @@ object CheckConnectJvmClientCompatibility { "org.apache.spark.sql.streaming.DataStreamWriter.SOURCE*" // These are constant vals. ), + // StreamingQueryException + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryException.message"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryException.cause"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryException.startOffset"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryException.endOffset"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryException.time"), + + // StreamingQueryManager + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryManager.addListener"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryManager.removeListener"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryManager.listListeners"), + + // Classes missing from streaming API + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ForeachWriter"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupState"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.TestGroupState"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.TestGroupState$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.PythonStreamingQueryListener"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener$*"), + // SQLImplicits - ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.this"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits._sqlContext")) checkMiMaCompatibility(clientJar, sqlJar, includedRules, excludeRules) } /** - * MiMa takes an old jar (sql jar) and a new jar (client jar) as inputs and then reports all - * incompatibilities found in the new jar. The incompatibility result is then filtered using - * include and exclude rules. Include rules are first applied to find all client classes that - * need to be checked. Then exclude rules are applied to filter out all unsupported methods in - * the client classes. + * This check ensures client jar dose not expose any unwanted APIs by mistake. */ - private def checkMiMaCompatibility( + private def checkMiMaCompatibilityWithReversedSqlModule( clientJar: File, - targetJar: File, + sqlJar: File): List[Problem] = { + val includedRules = Seq(IncludeByName("org.apache.spark.sql.*")) + val excludeRules = Seq( + // Skipped packages + ProblemFilters.exclude[Problem]("org.apache.spark.sql.avro.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.connect.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.protobuf.*"), + + // private[sql] + ProblemFilters.exclude[Problem]("org.apache.spark.sql.*.this"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameStatFunctions$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.KeyValueGroupedDatasetImpl"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.KeyValueGroupedDatasetImpl$"), + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "org.apache.spark.sql.SQLImplicits._sqlContext" // protected + ), + + // New public APIs added in the client + // ScalarUserDefinedFunction + ProblemFilters + .exclude[MissingClassProblem]( + "org.apache.spark.sql.expressions.ScalarUserDefinedFunction"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.expressions.ScalarUserDefinedFunction$"), + + // Dataset + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.Dataset.plan" + ), // developer API + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.sql.Dataset.encoder"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.Dataset.collectResult"), + + // RuntimeConfig + ProblemFilters.exclude[MissingTypesProblem]( + "org.apache.spark.sql.RuntimeConfig" // Client version extends Logging + ), + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.RuntimeConfig.*" // Mute missing Logging methods + ), + // ConnectRepl + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.application.ConnectRepl" // developer API + ), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.application.ConnectRepl$" // developer API + ), + + // SparkSession + // developer API + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession.newDataFrame"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession.newDataset"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession.execute"), + // Experimental + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession.addArtifact"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession.addArtifacts"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession.registerClassFinder"), + // public + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession.interruptAll"), + // SparkSession#Builder + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#Builder.remote"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#Builder.client"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#Builder.build" // deprecated + ), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.SparkSession#Builder.create"), + + // Steaming API + ProblemFilters.exclude[MissingTypesProblem]( + "org.apache.spark.sql.streaming.DataStreamWriter" // Client version extends Logging + ), + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.streaming.DataStreamWriter.*" // Mute missing Logging methods + ), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.RemoteStreamingQuery"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.RemoteStreamingQuery$")) + + checkMiMaCompatibility(sqlJar, clientJar, includedRules, excludeRules) + } + + /** + * MiMa takes a new jar and an old jar as inputs and then reports all incompatibilities found in + * the new jar. The incompatibility result is then filtered using include and exclude rules. + * Include rules are first applied to find all client classes that need to be checked. Then + * exclude rules are applied to filter out all unsupported methods in the client classes. + */ + private def checkMiMaCompatibility( + newJar: File, + oldJar: File, includedRules: Seq[IncludeByName], excludeRules: Seq[ProblemFilter]): List[Problem] = { - val mima = new MiMaLib(Seq(clientJar, targetJar)) - val allProblems = mima.collectProblems(targetJar, clientJar, List.empty) + val mima = new MiMaLib(Seq(newJar, oldJar)) + val allProblems = mima.collectProblems(oldJar, newJar, List.empty) val problems = allProblems .filter { p => includedRules.exists(rule => rule(p)) @@ -274,46 +424,18 @@ object CheckConnectJvmClientCompatibility { problems } - private def checkDatasetApiCompatibility(clientJar: File, sqlJar: File): Seq[String] = { - - def methods(jar: File, className: String): Seq[String] = { - val classLoader: URLClassLoader = - new ChildFirstURLClassLoader(Seq(jar.toURI.toURL).toArray, this.getClass.getClassLoader) - val mirror = runtimeMirror(classLoader) - // scalastyle:off classforname - val classSymbol = - mirror.classSymbol(Class.forName(className, false, classLoader)) - // scalastyle:on classforname - classSymbol.typeSignature.members - .filter(_.isMethod) - .map(_.asMethod) - .filter(m => m.isPublic) - .map(_.fullName) - .toSeq - } - - val className = "org.apache.spark.sql.Dataset" - val clientMethods = methods(clientJar, className) - val sqlMethods = methods(sqlJar, className) - // Exclude some public methods that must be added through `exceptionMethods` - val exceptionMethods = - Seq("org.apache.spark.sql.Dataset.collectResult", "org.apache.spark.sql.Dataset.plan") - - // Find new public functions that are not in sql module `Dataset`. - clientMethods.diff(sqlMethods).diff(exceptionMethods) - } - private def appendMimaCheckErrorMessageIfNeeded( resultWriter: Writer, problems: List[Problem], clientModule: File, targetModule: File, - targetName: String): Unit = { + targetName: String, + description: String = "client"): Unit = { if (problems.nonEmpty) { resultWriter.write( s"ERROR: Comparing Client jar: $clientModule and $targetName jar: $targetModule \n") resultWriter.write(s"problems with $targetName module: \n") - resultWriter.write(s"${problems.map(p => p.description("client")).mkString("\n")}") + resultWriter.write(s"${problems.map(p => p.description(description)).mkString("\n")}") resultWriter.write("\n") resultWriter.write( "Exceptions to binary compatibility can be added in " + @@ -321,21 +443,6 @@ object CheckConnectJvmClientCompatibility { } } - private def appendIncompatibleDatasetApisErrorMessageIfNeeded( - resultWriter: Writer, - incompatibleApis: Seq[String]): Unit = { - if (incompatibleApis.nonEmpty) { - resultWriter.write( - "ERROR: The Dataset apis only exist in the connect client " + - "module and not belong to the sql module include: \n") - resultWriter.write(incompatibleApis.mkString("\n")) - resultWriter.write("\n") - resultWriter.write( - "Exceptions can be added to exceptionMethods in " + - "'CheckConnectJvmClientCompatibility#checkDatasetApiCompatibility'\n") - } - } - private case class IncludeByName(name: String) extends ProblemFilter { private[this] val pattern = Pattern.compile(name.split("\\*", -1).map(Pattern.quote).mkString(".*")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org