[spark] branch master updated (adac633 -> 0b647fe)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from adac633 [SPARK-33934][SQL] Add SparkFile's root dir to env property PATH add 0b647fe [SPARK-33888][SQL] JDBC SQL TIME type represents incorrectly as TimestampType, it should be physical Int in millis No new revisions were added by this update. Summary of changes: .../sql/execution/datasources/jdbc/JdbcUtils.scala | 36 -- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 26 +++- 2 files changed, 58 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2a68ed7 -> adac633)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2a68ed7 [SPARK-33954][SQL] Some operator missing rowCount when enable CBO add adac633 [SPARK-33934][SQL] Add SparkFile's root dir to env property PATH No new revisions were added by this update. Summary of changes: .../execution/BaseScriptTransformationExec.scala | 8 +- sql/core/src/test/resources/test_script.py | 2 + .../execution/BaseScriptTransformationSuite.scala | 113 + 3 files changed, 121 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-33954][SQL] Some operator missing rowCount when enable CBO
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2a68ed7 [SPARK-33954][SQL] Some operator missing rowCount when enable CBO 2a68ed7 is described below commit 2a68ed71e4402c2864202aa78a54d9921c257990 Author: Yuming Wang AuthorDate: Mon Jan 4 05:53:14 2021 + [SPARK-33954][SQL] Some operator missing rowCount when enable CBO ### What changes were proposed in this pull request? This pr fix some operator missing rowCount when enable CBO, e.g.: ```scala spark.range(1000).selectExpr("id as a", "id as b").write.saveAsTable("t1") spark.sql("ANALYZE TABLE t1 COMPUTE STATISTICS FOR ALL COLUMNS") spark.sql("set spark.sql.cbo.enabled=true") spark.sql("set spark.sql.cbo.planStats.enabled=true") spark.sql("select * from (select * from t1 distribute by a limit 100) distribute by b").explain("cost") ``` Before this pr: ``` == Optimized Logical Plan == RepartitionByExpression [b#2129L], Statistics(sizeInBytes=2.3 KiB) +- GlobalLimit 100, Statistics(sizeInBytes=2.3 KiB, rowCount=100) +- LocalLimit 100, Statistics(sizeInBytes=23.4 KiB) +- RepartitionByExpression [a#2128L], Statistics(sizeInBytes=23.4 KiB) +- Relation[a#2128L,b#2129L] parquet, Statistics(sizeInBytes=23.4 KiB, rowCount=1.00E+3) ``` After this pr: ``` == Optimized Logical Plan == RepartitionByExpression [b#2129L], Statistics(sizeInBytes=2.3 KiB, rowCount=100) +- GlobalLimit 100, Statistics(sizeInBytes=2.3 KiB, rowCount=100) +- LocalLimit 100, Statistics(sizeInBytes=23.4 KiB, rowCount=1.00E+3) +- RepartitionByExpression [a#2128L], Statistics(sizeInBytes=23.4 KiB, rowCount=1.00E+3) +- Relation[a#2128L,b#2129L] parquet, Statistics(sizeInBytes=23.4 KiB, rowCount=1.00E+3) ``` ### Why are the changes needed? [`JoinEstimation.estimateInnerOuterJoin`](https://github.com/apache/spark/blob/d6a68e0b67ff7de58073c176dd097070e88ac831/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala#L55-L156) need the row count. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #30987 from wangyum/SPARK-33954. Authored-by: Yuming Wang Signed-off-by: Wenchen Fan --- .../statsEstimation/BasicStatsPlanVisitor.scala| 24 +++--- .../BasicStatsEstimationSuite.scala| 7 +++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala index ec0c100..34baf5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala @@ -27,13 +27,23 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] { /** Falls back to the estimation computed by [[SizeInBytesOnlyStatsPlanVisitor]]. */ private def fallback(p: LogicalPlan): Statistics = SizeInBytesOnlyStatsPlanVisitor.visit(p) - override def default(p: LogicalPlan): Statistics = fallback(p) + override def default(p: LogicalPlan): Statistics = p match { +case p: LeafNode => p.computeStats() +case _: LogicalPlan => + val stats = p.children.map(_.stats) + val rowCount = if (stats.exists(_.rowCount.isEmpty)) { +None + } else { +Some(stats.map(_.rowCount.get).filter(_ > 0L).product) + } + Statistics(sizeInBytes = stats.map(_.sizeInBytes).filter(_ > 0L).product, rowCount = rowCount) + } override def visitAggregate(p: Aggregate): Statistics = { AggregateEstimation.estimate(p).getOrElse(fallback(p)) } - override def visitDistinct(p: Distinct): Statistics = fallback(p) + override def visitDistinct(p: Distinct): Statistics = default(p) override def visitExcept(p: Except): Statistics = fallback(p) @@ -43,7 +53,7 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] { FilterEstimation(p).estimate.getOrElse(fallback(p)) } - override def visitGenerate(p: Generate): Statistics = fallback(p) + override def visitGenerate(p: Generate): Statistics = default(p) override def visitGlobalLimit(p: GlobalLimit): Statistics = fallback(p) @@ -55,19 +65,19 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] { override def visitLocalLimit(p: LocalLimit): Statistics = fallback(p) - override
[spark] branch master updated: [SPARK-33951][SQL] Distinguish the error between filter and distinct
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b037930 [SPARK-33951][SQL] Distinguish the error between filter and distinct b037930 is described below commit b037930952a341f4ed956a8f1839852992feaadc Author: gengjiaan AuthorDate: Mon Jan 4 05:44:00 2021 + [SPARK-33951][SQL] Distinguish the error between filter and distinct ### What changes were proposed in this pull request? The error messages for specifying filter and distinct for the aggregate function are mixed together and should be separated. This can increase readability and ease of use. ### Why are the changes needed? increase readability and ease of use. ### Does this PR introduce _any_ user-facing change? 'Yes'. ### How was this patch tested? Jenkins test Closes #30982 from beliefer/SPARK-33951. Lead-authored-by: gengjiaan Co-authored-by: beliefer Signed-off-by: Wenchen Fan --- .../apache/spark/sql/QueryCompilationErrors.scala | 9 + .../spark/sql/catalyst/analysis/Analyzer.scala | 45 +- .../catalyst/analysis/higherOrderFunctions.scala | 3 +- .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 8 ++-- 4 files changed, 35 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index e4a1f3f..f4c9132 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -263,13 +263,8 @@ object QueryCompilationErrors { s"its class is $classCanonicalName, which is not a generator.") } - def distinctOrFilterOnlyWithAggregateFunctionError(prettyName: String): Throwable = { -new AnalysisException("DISTINCT or FILTER specified, " + - s"but $prettyName is not an aggregate function") - } - - def ignoreNullsWithUnsupportedFunctionError(prettyName: String): Throwable = { -new AnalysisException(s"Function $prettyName does not support IGNORE NULLS") + def functionWithUnsupportedSyntaxError(prettyName: String, syntax: String): Throwable = { +new AnalysisException(s"Function $prettyName does not support $syntax") } def nonDeterministicFilterInAggregateError(): Throwable = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5e86368..fdd1cd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2120,24 +2120,30 @@ class Analyzer(override val catalogManager: CatalogManager) // the context of a Window clause. They do not need to be wrapped in an // AggregateExpression. case wf: AggregateWindowFunction => - if (isDistinct || filter.isDefined) { -throw QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( - wf.prettyName) + if (isDistinct) { +throw QueryCompilationErrors.functionWithUnsupportedSyntaxError( + wf.prettyName, "DISTINCT") + } else if (filter.isDefined) { +throw QueryCompilationErrors.functionWithUnsupportedSyntaxError( + wf.prettyName, "FILTER clause") } else if (ignoreNulls) { wf match { case nthValue: NthValue => nthValue.copy(ignoreNulls = ignoreNulls) case _ => -throw QueryCompilationErrors.ignoreNullsWithUnsupportedFunctionError( - wf.prettyName) +throw QueryCompilationErrors.functionWithUnsupportedSyntaxError( + wf.prettyName, "IGNORE NULLS") } } else { wf } case owf: FrameLessOffsetWindowFunction => - if (isDistinct || filter.isDefined) { -throw QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( - owf.prettyName) + if (isDistinct) { +throw QueryCompilationErrors.functionWithUnsupportedSyntaxError( + owf.prettyName, "DISTINCT") + } else if (filter.isDefined) { +throw QueryCompilationErrors.functionWithUnsupportedSyntaxError( +
[spark] branch branch-3.1 updated: [SPARK-33945][SQL][3.1] Handles a random seed consisting of an expr tree
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 9268392 [SPARK-33945][SQL][3.1] Handles a random seed consisting of an expr tree 9268392 is described below commit 9268392b957b263692e13fecaf9adec2136e1865 Author: Takeshi Yamamuro AuthorDate: Sun Jan 3 21:36:25 2021 -0800 [SPARK-33945][SQL][3.1] Handles a random seed consisting of an expr tree ### What changes were proposed in this pull request? This PR intends to fix the minor bug that throws an analysis exception when a seed param in `rand`/`randn` having a expr tree (e.g., `rand(1 + 1)`) with constant folding (`ConstantFolding` and `ReorderAssociativeOperator`) disabled. A query to reproduce this issue is as follows; ``` // v3.1.0, v3.0.2, and v2.4.8 $./bin/spark-shell scala> sql("select rand(1 + 2)").show() +---+ | rand((1 + 2))| +---+ |0.25738143505962285| +---+ $./bin/spark-shell --conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConstantFolding,org.apache.spark.sql.catalyst.optimizer.ReorderAssociativeOperator scala> sql("select rand(1 + 2)").show() org.apache.spark.sql.AnalysisException: Input argument to rand must be an integer, long or null literal.; at org.apache.spark.sql.catalyst.expressions.RDG.seed$lzycompute(randomExpressions.scala:49) at org.apache.spark.sql.catalyst.expressions.RDG.seed(randomExpressions.scala:46) at org.apache.spark.sql.catalyst.expressions.Rand.doGenCode(randomExpressions.scala:98) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146) at scala.Option.getOrElse(Option.scala:189) ... ``` A root cause is that the match-case code below cannot handle the case described above: https://github.com/apache/spark/blob/42f5e62403469cec6da680b9fbedd0aa508dcbe5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala#L46-L51 ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Checking if GA/Jenkins can pass Closes #30977 from maropu/FixRandSeedIssue. Authored-by: Takeshi Yamamuro Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/expressions/randomExpressions.scala| 6 +++--- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 17 - 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala index 6a94517..a14b1fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala @@ -44,10 +44,10 @@ abstract class RDG extends UnaryExpression with ExpectsInputTypes with Stateful } @transient protected lazy val seed: Long = child match { -case Literal(s, IntegerType) => s.asInstanceOf[Int] -case Literal(s, LongType) => s.asInstanceOf[Long] +case e if child.foldable && e.dataType == IntegerType => e.eval().asInstanceOf[Int] +case e if child.foldable && e.dataType == LongType => e.eval().asInstanceOf[Long] case _ => throw new AnalysisException( - s"Input argument to $prettyName must be an integer, long or null literal.") + s"Input argument to $prettyName must be an integer, long, or null constant.") } override def nullable: Boolean = false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 237d2c3..a003275 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Partial} -import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedColumnAliasingSuite} +import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, ConvertToLocalRelation, NestedColumnAliasingSuite, ReorderAssociativeOperator} import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@
svn commit: r45163 - /dev/spark/KEYS
Author: gurwls223 Date: Mon Jan 4 05:16:14 2021 New Revision: 45163 Log: Update KEYS Modified: dev/spark/KEYS Modified: dev/spark/KEYS == --- dev/spark/KEYS (original) +++ dev/spark/KEYS Mon Jan 4 05:16:14 2021 @@ -1470,3 +1470,51 @@ HzpIypF5A8FUA+gcNsUUPkbm4JeTTxTxb0AEb6iB eGh3VPV1RM3YCRkGY7/1fheg =/4cF -END PGP PUBLIC KEY BLOCK- + +pub rsa3072 2021-01-04 [SC] + 3D2F69CEED5E30F6252935903FF7C4E834D9EA44 +uid [ultimate] Hyukjin Kwon +sub rsa3072 2021-01-04 [E] + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQGNBF/ylZIBDADD84eY/eWTnoBQYYIf3yEE0xGOa6giP7b5edUNMfOvbVqZ0fd4 +i2BXDf4C+09p7iSDEn5PZhIUmboVIoSZPxuncTd4sUaqBvp1KQ3+9FT9Cx6W9pPF +MGryMN8I3qOLLoaXsbcr4QWxFNGRi0UFsWAhAduSoc3U5GLxVHv1hio6Va4A3eTk +9w42nYdOGSXYCuQuEo6nN8wWoOqHuccrTdp5+ehtv9kXf10unQh5+vRYn3HqG9zF +fPe+de9KhC3LmNe3o5Eca6TCfFtZE9N883CGSVP+APtvvFjpaL1e9UlFXKjHvtUJ +bCE7BmsEJOMUGuu1LOonkEbK69EWutLsj7skxOOzOdVidSgMTu1Dwli9nXC/kFFd +3W0GSOi7muBmvaSDOjoEDcB5SrMR9ccss7x+fQeRvtwYWOh7eruiH35dctryyUGR +GLZrIfuKzOydVAaUzmOaBZoZ1yK3BPkpa5Qd7ZFT0aD5q59fgU9uC209meOzIDaT +iQVzeTBPrvy5R9MAEQEAAbQjSHl1a2ppbiBLd29uIDxndXJ3bHMyMjNAYXBhY2hl +Lm9yZz6JAc4EEwEIADgCGwMFCwkIBwIGFQoJCAsCBBYCAwECHgECF4AWIQQ9L2nO +7V4w9iUpNZA/98ToNNnqRAUCX/KdQAAKCRA/98ToNNnqRF+7C/93gz9YfV8J8jVp +5/hFJXo/SfYqvjeASoy+Adpvn9FIw2D0wxDXrY8EVHpTQ/QblbO00HErM4OwcY9h +jDFLm0heMODKOaAWscc90FM81yDwCaGhx1G2MzSPjKEj+JjTo7mStJm6Vr1FJ0Ix +MQFKNx8axwTiXH+z9+CDi6WQh726JvD/T+bXiF9VRriKCNl1xxQmLJVJFmZB57yH +oPXSzlIwuTAbWa2v19Fzw0ssUR8x7BFhM0eFGINghv7KL8JGMPNhgkgdeKx6Fail +UxaVxMdoyoiBllL0XCKakpFQ1ZRuVHK+f9l4GUoJ7TgvmXUXttvBYRBQ1KGIPLtZ +REVgBVqAEE6y/n3rMA6NB2kLNVz1TEFnDMIz1B7Hgxq8AI2kR8lAxLVviQRVnDwi +hUigjIsqwNejYhvZGxT864Ug8G/RxlcVRPrxKxEdRgg8hMs3MFz+MbuLlOxCujun +qtCRgkq6Q/uxpyo2B9frVhZLUCyzbdw+WhHTingqLpB0NSLeM+S5AY0EX/KVkgEM +ANxsyRZyjKRF/otrEjfOwAogFALVNCx0s2bRbzFBTAvLvFxRTOCXn6kIz3WEysWH +ae7b+fDyhpoJYUYDEWlQNtNifHUtzAIttnUIV1/hQkCSIjaI/Q094fl5cPtI4g14 +6qRzVmDuXW7m2xoQocpILtXiXMceFJY4wwD73fg0+lwARsz+6QgyoJ8IG1x7RlEX +uUIQCXO77o4zXGAVHpOBlz3dlFUxKwieZmz4r7H7WBbUqlnXL+pcNhalSqXeF+m3 +0d6JiKaVcZvc3fsZHCNr7IGiDhLzIsSudT+EdmSr8RqXgjaL+A+1X1zvgwg3waPM +ZpGR1SF+Xqbz5qYAbdGMZ7rdgV8QggHOklLzwxkPsjlo1VVC7oAKnE3wdH0zgG1q +TjC15FUIZioQ2pH2DEZYKsOe3ud8vQ+hoFOTjZ76PyVa2W/lEEs4dnTraNgvWehv +uU9GVecs8Gib0YOY+dnExgwePD09bIB900wBU4Mrzeny+ooh4fAruScs0ioVFEY5 +XQARAQABiQG2BBgBCAAgAhsMFiEEPS9pzu1eMPYlKTWQP/fE6DTZ6kQFAl/ynU0A +CgkQP/fE6DTZ6kSCCAv9FPa0vbivYOmekrtgYrwmZ5ZYb7cfO41JU1N/021xmSRt +pl0bem38ln6u5zNlHeI7emQCsrWDssgbJe4GUQUqiR6dVqKtKIx0HYHqGbEoKh7Z +57UUn8A0tM7TH9gGy8tNaVLM9tFSGfkiZU4ssqgkkMNK0Icnr+k5ftBzDXNxrMdz +kwFwbq9SB/u5QJ0DFRjEpX1k2zUKdfOfbfNUbZVcszFzxSSUG5yJEulCA++Dt4Li +Gk+jz4CYK7ibXmFH7on6OjcPAJYkRYiqE6cBkRLX8vn1U8xZ7rjWDFvzCxmGMZEl +wTE0joYpIWOVXlcTcn9p9waGhxesflo4Ggj7EH7SskU/DLUIGaJroylkGUkPyzhx +Ma1nv2S6k2mAsqt6TwlOnzq4ZxnQy+lQPU4aqqVR036TjU3I90+IQzmDjHsyfkiI +6zEp93VXoIM/ol7npyrjnPum3AeIggXpYgypLuHgpWiS4O4Lo5kjh0nWnRN2bbdS +JJD+I5Xl6Az9BLsQ9BHB +=VYYl +-END PGP PUBLIC KEY BLOCK- + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6b7527e -> 67195d0)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6b7527e [SPARK-33398] Fix loading tree models prior to Spark 3.0 add 67195d0 [SPARK-33950][SQL] Refresh cache in v1 `ALTER TABLE .. DROP PARTITION` No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/command/ddl.scala| 1 + .../command/v1/AlterTableDropPartitionSuite.scala | 17 - 2 files changed, 17 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-33398] Fix loading tree models prior to Spark 3.0
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9f1bf4e [SPARK-33398] Fix loading tree models prior to Spark 3.0 9f1bf4e is described below commit 9f1bf4e47c4978be7d55cfadb7da6b7863942bc8 Author: Ruifeng Zheng AuthorDate: Sun Jan 3 11:52:46 2021 -0600 [SPARK-33398] Fix loading tree models prior to Spark 3.0 ### What changes were proposed in this pull request? In https://github.com/apache/spark/pull/21632/files#diff-0fdae8a6782091746ed20ea43f77b639f9c6a5f072dd2f600fcf9a7b37db4f47, a new field `rawCount` was added into `NodeData`, which cause that a tree model trained in 2.4 can not be loaded in 3.0/3.1/master; field `rawCount` is only used in training, and not used in `transform`/`predict`/`featureImportance`. So I just set it to -1L. ### Why are the changes needed? to support load old tree model in 3.0/3.1/master ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added testsuites Closes #30889 from zhengruifeng/fix_tree_load. Authored-by: Ruifeng Zheng Signed-off-by: Sean Owen (cherry picked from commit 6b7527e381591bcd51be205853aea3e349893139) Signed-off-by: Sean Owen --- .../org/apache/spark/ml/tree/treeModels.scala | 48 ++--- .../ml-models/dtc-2.4.7/data/._SUCCESS.crc | Bin 0 -> 8 bytes ...-406c-894c-ca4eac67c690-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../resources/ml-models/dtc-2.4.7/data/_SUCCESS| 0 ...c890-406c-894c-ca4eac67c690-c000.snappy.parquet | Bin 0 -> 3242 bytes .../ml-models/dtc-2.4.7/metadata/._SUCCESS.crc | Bin 0 -> 8 bytes .../ml-models/dtc-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/dtc-2.4.7/metadata/_SUCCESS | 0 .../ml-models/dtc-2.4.7/metadata/part-0| 1 + .../ml-models/dtr-2.4.7/data/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4b3d-84af-d861adcb9ca8-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../resources/ml-models/dtr-2.4.7/data/_SUCCESS| 0 ...a437-4b3d-84af-d861adcb9ca8-c000.snappy.parquet | Bin 0 -> 3264 bytes .../ml-models/dtr-2.4.7/metadata/._SUCCESS.crc | Bin 0 -> 8 bytes .../ml-models/dtr-2.4.7/metadata/.part-0.crc | Bin 0 -> 12 bytes .../ml-models/dtr-2.4.7/metadata/_SUCCESS | 0 .../ml-models/dtr-2.4.7/metadata/part-0| 1 + .../ml-models/gbtc-2.4.7/data/._SUCCESS.crc| Bin 0 -> 8 bytes ...-41c7-91c0-6da8cc01fb43-c000.snappy.parquet.crc | Bin 0 -> 44 bytes .../resources/ml-models/gbtc-2.4.7/data/_SUCCESS | 0 ...c861-41c7-91c0-6da8cc01fb43-c000.snappy.parquet | Bin 0 -> 4542 bytes .../ml-models/gbtc-2.4.7/metadata/._SUCCESS.crc| Bin 0 -> 8 bytes .../ml-models/gbtc-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/gbtc-2.4.7/metadata/_SUCCESS | 0 .../ml-models/gbtc-2.4.7/metadata/part-0 | 1 + .../gbtc-2.4.7/treesMetadata/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4a90-813c-ddc394101e21-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../ml-models/gbtc-2.4.7/treesMetadata/_SUCCESS| 0 ...31e3-4a90-813c-ddc394101e21-c000.snappy.parquet | Bin 0 -> 3075 bytes .../ml-models/gbtr-2.4.7/data/._SUCCESS.crc| Bin 0 -> 8 bytes ...-4511-9aab-639288bfae6d-c000.snappy.parquet.crc | Bin 0 -> 40 bytes .../resources/ml-models/gbtr-2.4.7/data/_SUCCESS | 0 ...d346-4511-9aab-639288bfae6d-c000.snappy.parquet | Bin 0 -> 3740 bytes .../ml-models/gbtr-2.4.7/metadata/._SUCCESS.crc| Bin 0 -> 8 bytes .../ml-models/gbtr-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/gbtr-2.4.7/metadata/_SUCCESS | 0 .../ml-models/gbtr-2.4.7/metadata/part-0 | 1 + .../gbtr-2.4.7/treesMetadata/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4fd8-ad9c-4be239c2215a-c000.snappy.parquet.crc | Bin 0 -> 32 bytes .../ml-models/gbtr-2.4.7/treesMetadata/_SUCCESS| 0 ...87fe-4fd8-ad9c-4be239c2215a-c000.snappy.parquet | Bin 0 -> 3038 bytes .../ml-models/rfc-2.4.7/data/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4485-b112-25b4b11c9009-c000.snappy.parquet.crc | Bin 0 -> 40 bytes .../resources/ml-models/rfc-2.4.7/data/_SUCCESS| 0 ...91f8-4485-b112-25b4b11c9009-c000.snappy.parquet | Bin 0 -> 3836 bytes .../ml-models/rfc-2.4.7/metadata/._SUCCESS.crc | Bin 0 -> 8 bytes .../ml-models/rfc-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/rfc-2.4.7/metadata/_SUCCESS | 0 .../ml-models/rfc-2.4.7/metadata/part-0| 1 + .../rfc-2.4.7/treesMetadata/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4c4e-a823-70c7afdcbdc5-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../ml-models/rfc-2.4.7/treesMetadata/_SUCCESS | 0
[spark] branch branch-3.1 updated: [SPARK-33398] Fix loading tree models prior to Spark 3.0
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 1fa052f [SPARK-33398] Fix loading tree models prior to Spark 3.0 1fa052f is described below commit 1fa052fa589daa7d9e5218296cd2ef7143bae443 Author: Ruifeng Zheng AuthorDate: Sun Jan 3 11:52:46 2021 -0600 [SPARK-33398] Fix loading tree models prior to Spark 3.0 ### What changes were proposed in this pull request? In https://github.com/apache/spark/pull/21632/files#diff-0fdae8a6782091746ed20ea43f77b639f9c6a5f072dd2f600fcf9a7b37db4f47, a new field `rawCount` was added into `NodeData`, which cause that a tree model trained in 2.4 can not be loaded in 3.0/3.1/master; field `rawCount` is only used in training, and not used in `transform`/`predict`/`featureImportance`. So I just set it to -1L. ### Why are the changes needed? to support load old tree model in 3.0/3.1/master ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added testsuites Closes #30889 from zhengruifeng/fix_tree_load. Authored-by: Ruifeng Zheng Signed-off-by: Sean Owen (cherry picked from commit 6b7527e381591bcd51be205853aea3e349893139) Signed-off-by: Sean Owen --- .../org/apache/spark/ml/tree/treeModels.scala | 48 ++--- .../ml-models/dtc-2.4.7/data/._SUCCESS.crc | Bin 0 -> 8 bytes ...-406c-894c-ca4eac67c690-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../resources/ml-models/dtc-2.4.7/data/_SUCCESS| 0 ...c890-406c-894c-ca4eac67c690-c000.snappy.parquet | Bin 0 -> 3242 bytes .../ml-models/dtc-2.4.7/metadata/._SUCCESS.crc | Bin 0 -> 8 bytes .../ml-models/dtc-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/dtc-2.4.7/metadata/_SUCCESS | 0 .../ml-models/dtc-2.4.7/metadata/part-0| 1 + .../ml-models/dtr-2.4.7/data/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4b3d-84af-d861adcb9ca8-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../resources/ml-models/dtr-2.4.7/data/_SUCCESS| 0 ...a437-4b3d-84af-d861adcb9ca8-c000.snappy.parquet | Bin 0 -> 3264 bytes .../ml-models/dtr-2.4.7/metadata/._SUCCESS.crc | Bin 0 -> 8 bytes .../ml-models/dtr-2.4.7/metadata/.part-0.crc | Bin 0 -> 12 bytes .../ml-models/dtr-2.4.7/metadata/_SUCCESS | 0 .../ml-models/dtr-2.4.7/metadata/part-0| 1 + .../ml-models/gbtc-2.4.7/data/._SUCCESS.crc| Bin 0 -> 8 bytes ...-41c7-91c0-6da8cc01fb43-c000.snappy.parquet.crc | Bin 0 -> 44 bytes .../resources/ml-models/gbtc-2.4.7/data/_SUCCESS | 0 ...c861-41c7-91c0-6da8cc01fb43-c000.snappy.parquet | Bin 0 -> 4542 bytes .../ml-models/gbtc-2.4.7/metadata/._SUCCESS.crc| Bin 0 -> 8 bytes .../ml-models/gbtc-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/gbtc-2.4.7/metadata/_SUCCESS | 0 .../ml-models/gbtc-2.4.7/metadata/part-0 | 1 + .../gbtc-2.4.7/treesMetadata/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4a90-813c-ddc394101e21-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../ml-models/gbtc-2.4.7/treesMetadata/_SUCCESS| 0 ...31e3-4a90-813c-ddc394101e21-c000.snappy.parquet | Bin 0 -> 3075 bytes .../ml-models/gbtr-2.4.7/data/._SUCCESS.crc| Bin 0 -> 8 bytes ...-4511-9aab-639288bfae6d-c000.snappy.parquet.crc | Bin 0 -> 40 bytes .../resources/ml-models/gbtr-2.4.7/data/_SUCCESS | 0 ...d346-4511-9aab-639288bfae6d-c000.snappy.parquet | Bin 0 -> 3740 bytes .../ml-models/gbtr-2.4.7/metadata/._SUCCESS.crc| Bin 0 -> 8 bytes .../ml-models/gbtr-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/gbtr-2.4.7/metadata/_SUCCESS | 0 .../ml-models/gbtr-2.4.7/metadata/part-0 | 1 + .../gbtr-2.4.7/treesMetadata/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4fd8-ad9c-4be239c2215a-c000.snappy.parquet.crc | Bin 0 -> 32 bytes .../ml-models/gbtr-2.4.7/treesMetadata/_SUCCESS| 0 ...87fe-4fd8-ad9c-4be239c2215a-c000.snappy.parquet | Bin 0 -> 3038 bytes .../ml-models/rfc-2.4.7/data/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4485-b112-25b4b11c9009-c000.snappy.parquet.crc | Bin 0 -> 40 bytes .../resources/ml-models/rfc-2.4.7/data/_SUCCESS| 0 ...91f8-4485-b112-25b4b11c9009-c000.snappy.parquet | Bin 0 -> 3836 bytes .../ml-models/rfc-2.4.7/metadata/._SUCCESS.crc | Bin 0 -> 8 bytes .../ml-models/rfc-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/rfc-2.4.7/metadata/_SUCCESS | 0 .../ml-models/rfc-2.4.7/metadata/part-0| 1 + .../rfc-2.4.7/treesMetadata/._SUCCESS.crc | Bin 0 -> 8 bytes ...-4c4e-a823-70c7afdcbdc5-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../ml-models/rfc-2.4.7/treesMetadata/_SUCCESS | 0
[spark] branch master updated (963c60f -> 6b7527e)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 963c60f [SPARK-33955][SS] Add latest offsets to source progress add 6b7527e [SPARK-33398] Fix loading tree models prior to Spark 3.0 No new revisions were added by this update. Summary of changes: .../org/apache/spark/ml/tree/treeModels.scala | 48 ++--- .../metadata => dtc-2.4.7/data}/._SUCCESS.crc | Bin ...-406c-894c-ca4eac67c690-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../resources/ml-models/dtc-2.4.7/data/_SUCCESS| 0 ...c890-406c-894c-ca4eac67c690-c000.snappy.parquet | Bin 0 -> 3242 bytes .../metadata/._SUCCESS.crc | Bin .../ml-models/dtc-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/dtc-2.4.7/metadata/_SUCCESS | 0 .../ml-models/dtc-2.4.7/metadata/part-0| 1 + .../metadata => dtr-2.4.7/data}/._SUCCESS.crc | Bin ...-4b3d-84af-d861adcb9ca8-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../resources/ml-models/dtr-2.4.7/data/_SUCCESS| 0 ...a437-4b3d-84af-d861adcb9ca8-c000.snappy.parquet | Bin 0 -> 3264 bytes .../metadata/._SUCCESS.crc | Bin .../ml-models/dtr-2.4.7/metadata/.part-0.crc | Bin 0 -> 12 bytes .../ml-models/dtr-2.4.7/metadata/_SUCCESS | 0 .../ml-models/dtr-2.4.7/metadata/part-0| 1 + .../metadata => gbtc-2.4.7/data}/._SUCCESS.crc | Bin ...-41c7-91c0-6da8cc01fb43-c000.snappy.parquet.crc | Bin 0 -> 44 bytes .../resources/ml-models/gbtc-2.4.7/data/_SUCCESS | 0 ...c861-41c7-91c0-6da8cc01fb43-c000.snappy.parquet | Bin 0 -> 4542 bytes .../metadata/._SUCCESS.crc | Bin .../ml-models/gbtc-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/gbtc-2.4.7/metadata/_SUCCESS | 0 .../ml-models/gbtc-2.4.7/metadata/part-0 | 1 + .../treesMetadata}/._SUCCESS.crc | Bin ...-4a90-813c-ddc394101e21-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../ml-models/gbtc-2.4.7/treesMetadata/_SUCCESS| 0 ...31e3-4a90-813c-ddc394101e21-c000.snappy.parquet | Bin 0 -> 3075 bytes .../metadata => gbtr-2.4.7/data}/._SUCCESS.crc | Bin ...-4511-9aab-639288bfae6d-c000.snappy.parquet.crc | Bin 0 -> 40 bytes .../resources/ml-models/gbtr-2.4.7/data/_SUCCESS | 0 ...d346-4511-9aab-639288bfae6d-c000.snappy.parquet | Bin 0 -> 3740 bytes .../metadata/._SUCCESS.crc | Bin .../ml-models/gbtr-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/gbtr-2.4.7/metadata/_SUCCESS | 0 .../ml-models/gbtr-2.4.7/metadata/part-0 | 1 + .../treesMetadata}/._SUCCESS.crc | Bin ...-4fd8-ad9c-4be239c2215a-c000.snappy.parquet.crc | Bin 0 -> 32 bytes .../ml-models/gbtr-2.4.7/treesMetadata/_SUCCESS| 0 ...87fe-4fd8-ad9c-4be239c2215a-c000.snappy.parquet | Bin 0 -> 3038 bytes .../metadata => rfc-2.4.7/data}/._SUCCESS.crc | Bin ...-4485-b112-25b4b11c9009-c000.snappy.parquet.crc | Bin 0 -> 40 bytes .../resources/ml-models/rfc-2.4.7/data/_SUCCESS| 0 ...91f8-4485-b112-25b4b11c9009-c000.snappy.parquet | Bin 0 -> 3836 bytes .../metadata/._SUCCESS.crc | Bin .../ml-models/rfc-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/rfc-2.4.7/metadata/_SUCCESS | 0 .../ml-models/rfc-2.4.7/metadata/part-0| 1 + .../treesMetadata}/._SUCCESS.crc | Bin ...-4c4e-a823-70c7afdcbdc5-c000.snappy.parquet.crc | Bin 0 -> 36 bytes .../ml-models/rfc-2.4.7/treesMetadata/_SUCCESS | 0 ...b666-4c4e-a823-70c7afdcbdc5-c000.snappy.parquet | Bin 0 -> 3391 bytes .../metadata => rfr-2.4.7/data}/._SUCCESS.crc | Bin ...-40fc-b681-981caaeca996-c000.snappy.parquet.crc | Bin 0 -> 40 bytes .../resources/ml-models/rfr-2.4.7/data/_SUCCESS| 0 ...6edb-40fc-b681-981caaeca996-c000.snappy.parquet | Bin 0 -> 3797 bytes .../metadata/._SUCCESS.crc | Bin .../ml-models/rfr-2.4.7/metadata/.part-0.crc | Bin 0 -> 16 bytes .../ml-models/rfr-2.4.7/metadata/_SUCCESS | 0 .../ml-models/rfr-2.4.7/metadata/part-0| 1 + .../treesMetadata}/._SUCCESS.crc | Bin ...-447a-9b86-d95edaabcde8-c000.snappy.parquet.crc | Bin 0 -> 32 bytes .../ml-models/rfr-2.4.7/treesMetadata/_SUCCESS | 0 ...d349-447a-9b86-d95edaabcde8-c000.snappy.parquet | Bin 0 -> 3055 bytes .../DecisionTreeClassifierSuite.scala | 12 ++ .../ml/classification/GBTClassifierSuite.scala | 14 ++ .../MultilayerPerceptronClassifierSuite.scala | 2 +- .../RandomForestClassifierSuite.scala | 16 ++- .../apache/spark/ml/feature/HashingTFSuite.scala | 2 +- .../spark/ml/feature/StringIndexerSuite.scala | 2 +-
[spark] branch master updated (cfd4a08 -> 963c60f)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from cfd4a08 [SPARK-33962][SS] Fix incorrect min partition condition add 963c60f [SPARK-33955][SS] Add latest offsets to source progress No new revisions were added by this update. Summary of changes: .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 8 ++- .../apache/spark/sql/kafka010/KafkaSource.scala| 10 + project/MimaExcludes.scala | 5 - .../read/streaming/SupportsAdmissionControl.java | 8 +++ .../execution/streaming/MicroBatchExecution.scala | 25 -- .../sql/execution/streaming/ProgressReporter.scala | 13 +-- .../sql/execution/streaming/StreamExecution.scala | 9 .../streaming/continuous/ContinuousExecution.scala | 2 +- .../org/apache/spark/sql/streaming/progress.scala | 3 +++ .../StreamingQueryStatusAndProgressSuite.scala | 4 10 files changed, 75 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-33962][SS] Fix incorrect min partition condition
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cfd4a08 [SPARK-33962][SS] Fix incorrect min partition condition cfd4a08 is described below commit cfd4a083987f985da4659333c718561c19e0cbfe Author: Liang-Chi Hsieh AuthorDate: Sun Jan 3 01:29:12 2021 -0800 [SPARK-33962][SS] Fix incorrect min partition condition ### What changes were proposed in this pull request? This patch fixes an incorrect condition when comparing offset range size and min partition config. ### Why are the changes needed? When calculating offset ranges, we consider `minPartitions` configuration. If `minPartitions` is not set or is less than or equal the size of given ranges, it means there are enough partitions at Kafka so we don't need to split offsets to satisfy min partition requirement. But the current condition is `offsetRanges.size > minPartitions.get` and is not correct. Currently `getRanges` will split offsets in unnecessary case. Besides, in non-split case, we can assign preferred executor location and reuse `KafkaConsumer`. So unnecessary splitting offset range will miss the chance to reuse `KafkaConsumer`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Manual test in Spark cluster with Kafka. Closes #30994 from viirya/ss-minor4. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun --- .../spark/sql/kafka010/KafkaOffsetRangeCalculator.scala| 2 +- .../sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala | 14 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index f7183f7..1e9a62e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -46,7 +46,7 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int val offsetRanges = ranges.filter(_.size > 0) // If minPartitions not set or there are enough partitions to satisfy minPartitions -if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { +if (minPartitions.isEmpty || offsetRanges.size >= minPartitions.get) { // Assign preferred executor locations to each range such that the same topic-partition is // preferentially read from the same executor and the KafkaConsumer can be reused. offsetRanges.map { range => diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala index 5d010cd..751b877 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala @@ -71,6 +71,20 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { KafkaOffsetRange(tp3, 1, 2, None))) } + testWithMinPartitions("N TopicPartitions to N offset ranges with executors", 3) { calc => +assert( + calc.getRanges( +Seq( + KafkaOffsetRange(tp1, 1, 2), + KafkaOffsetRange(tp2, 1, 2), + KafkaOffsetRange(tp3, 1, 2)), +Seq("exec1", "exec2", "exec3")) === +Seq( + KafkaOffsetRange(tp1, 1, 2, Some("exec3")), + KafkaOffsetRange(tp2, 1, 2, Some("exec1")), + KafkaOffsetRange(tp3, 1, 2, Some("exec2" + } + testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc => assert( calc.getRanges( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org