[GitHub] spark pull request #22077: [SPARK-25084][SQL][BACKPORT-2.3] "distribute by" ...
Github user LantaoJin closed the pull request at: https://github.com/apache/spark/pull/22077 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21561: [SPARK-24555][ML] logNumExamples in KMeans/BiKM/GMM/AFT/...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21561 **[Test build #94669 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94669/testReport)** for PR 21561 at commit [`fb3ff2b`](https://github.com/apache/spark/commit/fb3ff2b1f6cfa3936f2aa3901be844555d33887e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21561: [SPARK-24555][ML] logNumExamples in KMeans/BiKM/GMM/AFT/...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21561 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2115/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21561: [SPARK-24555][ML] logNumExamples in KMeans/BiKM/GMM/AFT/...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21561 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21561: [SPARK-24555][ML] logNumExamples in KMeans/BiKM/G...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/21561#discussion_r209496789 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -157,11 +157,15 @@ class NaiveBayes @Since("1.5.0") ( instr.logNumFeatures(numFeatures) val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) +val countAccum = dataset.sparkSession.sparkContext.longAccumulator + // Aggregates term frequencies per label. // TODO: Calling aggregateByKey and collect creates two stages, we can implement something // TODO: similar to reduceByKeyLocally to save one stage. val aggregated = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd - .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) + .map { row => +countAccum.add(1L) --- End diff -- This should work correctly, however, to guarantee the correctness, I update the pr to compute the number without Accumulator --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21933: [SPARK-24917][CORE] make chunk size configurable
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21933#discussion_r209496412 --- Diff: core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala --- @@ -70,6 +70,8 @@ private[spark] class SerializerManager( private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false) // Whether to compress shuffle output temporarily spilled to disk private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) + // Size of the chunks to be used in the ChunkedByteBuffer + private[this] val chunkSizeMb = conf.getSizeAsMb("spark.memory.serializer.chunkSize", "4m").toInt --- End diff -- @vincent-grosbois WDTY about this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22025: SPARK-25043: print master and appId from spark-sql on st...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22025 @abellina, how is the output like? mind if I ask put it in the PR description just to make the review easier if that's not hard? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22082: [SPARK-24420][Build][FOLLOW-UP] Upgrade ASM6 APIs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22082 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94660/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22082: [SPARK-24420][Build][FOLLOW-UP] Upgrade ASM6 APIs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22082 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22082: [SPARK-24420][Build][FOLLOW-UP] Upgrade ASM6 APIs
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22082 **[Test build #94660 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94660/testReport)** for PR 22082 at commit [`2666500`](https://github.com/apache/spark/commit/266650006ed1f5d19d6eaf24d7058ca341457039). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22075 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22075 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94663/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22075 **[Test build #94663 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94663/testReport)** for PR 22075 at commit [`4b48a39`](https://github.com/apache/spark/commit/4b48a39238ab80f2bd1ebb36fd653ecc6495e492). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209491244 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- If this should necessarily target 2.4.0, don't block by me since it's a new feature and probably we could consider another approach later but if we can avoid, I would suggest to avoid for now. Let me try to track the design doc and changes about this. I think I need more time to check why it happened like this and if there's another way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209491060 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- Yea, I read and understood if this is only initialised when the context is a `BarrierTaskContext` but this is super weird we start another Java gateway here. If it's a hard requirement, then I suspect the design issue. Should this be targeted to 2.4.0, @mengxr? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209490553 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- We have to port `BarrierTaskContext` from java to python side, otherwise there is no way to call `BarrierTaskContext.barrier()` from python side. Thus, of course, the JavaGateway is only initiated when the context is a `BarrierTaskContext`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22075#discussion_r209489692 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -90,6 +90,20 @@ trait CheckAnalysis extends PredicateHelper { u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}") case operator: LogicalPlan => +// Check argument data types of higher-order functions downwards first. +// If the arguments of the higher-order functions are resolved but the type check fails, +// the argument functions will not get resolved, but we should report the argument type --- End diff -- ah i see, makes sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22075 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22075 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94657/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22075 **[Test build #94657 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94657/testReport)** for PR 22075 at commit [`388c2d3`](https://github.com/apache/spark/commit/388c2d3d812bf749ddf9de029432eab729bcc932). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `trait HigherOrderFunction extends Expression with ExpectsInputTypes ` * `trait SimpleHigherOrderFunction extends HigherOrderFunction ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22075#discussion_r209488846 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -90,6 +90,20 @@ trait CheckAnalysis extends PredicateHelper { u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}") case operator: LogicalPlan => +// Check argument data types of higher-order functions downwards first. +// If the arguments of the higher-order functions are resolved but the type check fails, +// the argument functions will not get resolved, but we should report the argument type --- End diff -- I'm worried that if we say only `functions`, we might be confused whether the "function" means the higher-order function or the function as an argument. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22075#discussion_r209488516 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -90,6 +90,20 @@ trait CheckAnalysis extends PredicateHelper { u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}") case operator: LogicalPlan => +// Check argument data types of higher-order functions downwards first. +// If the arguments of the higher-order functions are resolved but the type check fails, +// the argument functions will not get resolved, but we should report the argument type --- End diff -- actually, I think it's clearer to say `functions`, instead of `argument functions`. Sorry for the back and forth. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17520: [WIP][SPARK-19712][SQL] Move PullupCorrelatedPredicates ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17520 Build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17520: [WIP][SPARK-19712][SQL] Move PullupCorrelatedPredicates ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17520 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94661/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17520: [WIP][SPARK-19712][SQL] Move PullupCorrelatedPredicates ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17520 **[Test build #94661 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94661/testReport)** for PR 17520 at commit [`b923bd5`](https://github.com/apache/spark/commit/b923bd5b2c79a84ada834a32b756ad0da80f12c6). * This patch **fails Spark unit tests**. * This patch **does not merge cleanly**. * This patch adds the following public classes _(experimental)_: * `case class SparkListenerBlockManagerAdded(` * `class StorageStatus(` * `public final class JavaStructuredSessionization ` * ` public static class LineWithTimestamp implements Serializable ` * ` public static class Event implements Serializable ` * ` public static class SessionInfo implements Serializable ` * ` public static class SessionUpdate implements Serializable ` * `case class Event(sessionId: String, timestamp: Timestamp)` * `case class SessionInfo(` * `case class SessionUpdate(` * `class Correlation(object):` * `case class UnresolvedMapObjects(` * `case class AssertNotNull(child: Expression, walkedTypePath: Seq[String] = Nil)` * `case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper ` * ` * Helper case class to hold (plan, rowCount) pairs.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22009 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2114/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22086: [SPARK-25096][SQL] Loosen nullability if the cast is for...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22086 good catch! LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22009 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22075 **[Test build #94668 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94668/testReport)** for PR 22075 at commit [`deee1dc`](https://github.com/apache/spark/commit/deee1dcef6d7cbde516fa082e4210261ff89b8ff). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22075 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2113/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22075 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22009 **[Test build #94667 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94667/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22009 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22075 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22075#discussion_r209487687 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -90,9 +90,10 @@ trait CheckAnalysis extends PredicateHelper { u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}") case operator: LogicalPlan => -// Check argument data types of higher-order functions downwards first because function -// arguments of the higher-order functions might be unresolved due to the unresolved -// argument data types, otherwise always claims the function arguments are unresolved. +// Check argument data types of higher-order functions downwards first. +// If the arguments of the higher-order functions are resolved but the type check fails, +// the argument functions will not get resolved, but we should report the argument type +// check failure instead of claiming the function arguments are unresolved. --- End diff -- "function arguments" -> "argument functions"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209486478 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] --- End diff -- Ok. I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21439: [SPARK-24391][SQL] Support arrays of any types by from_j...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21439 LGTM too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r209486199 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -294,7 +296,12 @@ object ShuffleExchangeExec { sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { -rdd +part match { + case partitioner: RangePartitioner[InternalRow @unchecked, _] --- End diff -- yes, but ``` def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } ``` the parallelize function needs this ClassTag, so we must match it here. I tried to match RangePartitioner[_, _], but there is an error: ``` Error:(302, 37) No ClassTag available for _ Error occurred in an application involving default arguments. sparkContext.parallelize(partitioner.getSampledArray.toSeq, rdd.getNumPartitions) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21439: [SPARK-24391][SQL] Support arrays of any types by from_j...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21439 **[Test build #94666 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94666/testReport)** for PR 21439 at commit [`74a7799`](https://github.com/apache/spark/commit/74a779964b666b36b36a65b2cdd4b47d9df1e04c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21439: [SPARK-24391][SQL] Support arrays of any types by from_j...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21439 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22079 **[Test build #94665 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94665/testReport)** for PR 22079 at commit [`8d2d558`](https://github.com/apache/spark/commit/8d2d5585b2c2832cd4d88b3851607ce15180cca5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 Hmmm... I somehow managed to break SparkR tests but fixing a comment. It seems to have auto-retried and broke the second time too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209485487 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -231,6 +231,15 @@ object TypeCoercion { }) } + /** + * Similar to [[findTightestCommonType]] but with string promotion. + */ + def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = { --- End diff -- I submitted a pr #22086. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22086: [SPARK-25096][SQL] Loosen nullability if the cast is for...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22086 cc @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22086: [SPARK-25096][SQL] Loosen nullability if the cast is for...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22086 **[Test build #94664 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94664/testReport)** for PR 22086 at commit [`f480354`](https://github.com/apache/spark/commit/f48035479c248b85519f1bbc7f5b7661c66c0e67). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22086: [SPARK-25096][SQL] Loosen nullability if the cast is for...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22086 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22086: [SPARK-25096][SQL] Loosen nullability if the cast is for...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22086 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2112/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22086: [SPARK-25096][SQL] Loosen nullability if the cast...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/22086 [SPARK-25096][SQL] Loosen nullability if the cast is force-nullable. ## What changes were proposed in this pull request? In type coercion for complex types, if the found type is force-nullable to cast, we should loosen the nullability to be able to cast. Also for map key type, we can't use the type. ## How was this patch tested? Added some test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-25096/fix_type_coercion Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22086.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22086 commit f48035479c248b85519f1bbc7f5b7661c66c0e67 Author: Takuya UESHIN Date: 2018-08-13T03:28:21Z Loosen nullability if the cast is force-nullable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22084: [SPARK-25026][BUILD] Binary releases should conta...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22084#discussion_r209483839 --- Diff: dev/make-distribution.sh --- @@ -188,6 +190,23 @@ if [ -f "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar cp "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/yarn" fi +# Only copy external jars if built +if [ -f "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" +fi +if [ -f "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" --- End diff -- I didn't want to include kinesis or ganglia because those would entail including OSS with licenses we can't redistribute. The existence of the modules is already a gray area. Let me look into what is built into these JARs. Some things like kafka we don't want to include, but do want to include kafka-client, yeah. We don't want to include Spark either for example. Yeah it's a reasonable argument, that nobody would use these directly anyway. The same could be said of some other JARs in the distro. Really the purpose of jars/ here is to support running in standalone mode. That is, most vendor distros would have spark-streaming-kafka on the classpath for you anyway, but, standalone doesn't. Standalone still won't pick up these new JARs because they're in external/jars/, but at least they're there at all, to be moved into jars/ if you cared to, for local deployment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22075 **[Test build #94663 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94663/testReport)** for PR 22075 at commit [`4b48a39`](https://github.com/apache/spark/commit/4b48a39238ab80f2bd1ebb36fd653ecc6495e492). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22075 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to arguments...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22075 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2111/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21439: [SPARK-24391][SQL] Support arrays of any types by from_j...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21439 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22079 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94656/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22079 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21439: [SPARK-24391][SQL] Support arrays of any types by from_j...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21439 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94659/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21439: [SPARK-24391][SQL] Support arrays of any types by from_j...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21439 **[Test build #94659 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94659/testReport)** for PR 21439 at commit [`74a7799`](https://github.com/apache/spark/commit/74a779964b666b36b36a65b2cdd4b47d9df1e04c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22079 **[Test build #94656 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94656/testReport)** for PR 22079 at commit [`8d2d558`](https://github.com/apache/spark/commit/8d2d5585b2c2832cd4d88b3851607ce15180cca5). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22005: [SPARK-16817][CORE][WIP] Use Alluxio to improve stabilit...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/22005 I believe such kind of PR requires SPIP and community discussion first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22036: [SPARK-25028][SQL] Avoid NPE when analyzing parti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22036#discussion_r209481430 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala --- @@ -204,6 +204,24 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("SPARK-25028: column stats collection for null partitioning columns") { +val table = "analyze_partition_with_null" +withTempDir { dir => + withTable(table) { +sql(s""" + |CREATE TABLE $table (name string, value string) + |USING PARQUET + |PARTITIONED BY (name) + |LOCATION '${dir.toURI}'""".stripMargin) +val df = Seq(("a", null), ("b", null)).toDF("value", "name") --- End diff -- when creating the table, we can put partition column at the end, to avoid this confusion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22077: [SPARK-25084][SQL][BACKPORT-2.3] "distribute by" on mult...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/22077 This is already merged, @LantaoJin please close this PR, thanksï¼ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22084: [SPARK-25026][BUILD] Binary releases should conta...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/22084#discussion_r209481255 --- Diff: dev/make-distribution.sh --- @@ -188,6 +190,23 @@ if [ -f "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar cp "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/yarn" fi +# Only copy external jars if built +if [ -f "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" +fi +if [ -f "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" --- End diff -- When building such external jar, assembly jar will also be built accordingly. And the assembly jar can be used directly. Jars provided here still not so useful because it lacks third-party dependencies like Kafka, so I'm not sure if it is more convenient compared to pull from maven repo directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22036: [SPARK-25028][SQL] Avoid NPE when analyzing parti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22036#discussion_r209481344 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala --- @@ -140,7 +140,13 @@ case class AnalyzePartitionCommand( val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count() df.collect().map { r => - val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString) + val partitionColumnValues = partitionColumns.indices.map { i => +if (r.isNullAt(i)) { + ExternalCatalogUtils.DEFAULT_PARTITION_NAME --- End diff -- do we need to chang the read path? i.e. where we use these statistics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22084: [SPARK-25026][BUILD] Binary releases should conta...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/22084#discussion_r209480817 --- Diff: dev/make-distribution.sh --- @@ -188,6 +190,23 @@ if [ -f "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar cp "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/yarn" fi +# Only copy external jars if built +if [ -f "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" +fi +if [ -f "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" --- End diff -- Also what about kinesis? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22084: [SPARK-25026][BUILD] Binary releases should conta...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22084#discussion_r209480769 --- Diff: dev/make-distribution.sh --- @@ -188,6 +190,23 @@ if [ -f "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar cp "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/yarn" fi +# Only copy external jars if built +if [ -f "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" +fi +if [ -f "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" --- End diff -- I didn't because the jars/ dir doesn't contain assemblies... well, it's kind of the contents of an assembly. I assume the end-user here is someone who wants to just consume the integration, to add to an app. The assembly would pull back in all of Spark (?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22084: [SPARK-25026][BUILD] Binary releases should conta...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/22084#discussion_r209480628 --- Diff: dev/make-distribution.sh --- @@ -188,6 +190,23 @@ if [ -f "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar cp "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/yarn" fi +# Only copy external jars if built +if [ -f "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/avro/target/spark-avro_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" +fi +if [ -f "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar ]; then + cp "$SPARK_HOME"/external/kafka-0-10/target/spark-streaming-kafka-0-10_${SCALA_VERSION}-${VERSION}.jar "$DISTDIR/external/jars/" --- End diff -- Shall we also copy assembly jar for Kafka and flume? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21369 LGTM except some code style issues. Thanks for improving the test! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480323 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +424,112 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert(it.isInstanceOf[CompletionIterator[_, _]]) +// org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns +// an instance of an annonymous Iterator class. + +val underlyingMapRef = WeakReference(map.currentMap) + +{ + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(!tmpIsNull) +} + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(map.numSpills == 0) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +// assert(map.currentMap == null) +eventually{ + System.gc() + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(tmpIsNull) +} + + +val next50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(!it.hasNext) +val keys = (first50Keys ++ next50Keys).sorted +assert(keys == (0 until 100)) + } + + test("drop all references to the underlying map once the iterator is exhausted") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val underlyingMapRef = WeakReference(map.currentMap) + +{ + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(!tmpIsNull) +} + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) + + +val keys = it.map{ + case (k, vs) => +val sortedVs = vs.sorted +assert(sortedVs.seq == (0 until 10).map(10 * k + _)) +k +} +.toList +.sorted + +assert(it.isEmpty) +assert(keys == (0 until 100)) + +assert(map.numSpills == 0) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) + +eventually{ --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480313 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = { + CompletionIterator[(K, C), SpillableIterator](this, this.destroy ) --- End diff -- nit: no space before `)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480264 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +424,112 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert(it.isInstanceOf[CompletionIterator[_, _]]) +// org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns +// an instance of an annonymous Iterator class. + +val underlyingMapRef = WeakReference(map.currentMap) + +{ + // direct asserts introduced some macro generated code that held a reference to the map + val tmpIsNull = null == underlyingMapRef.get.orNull + assert(!tmpIsNull) +} + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(map.numSpills == 0) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +// assert(map.currentMap == null) +eventually{ --- End diff -- nit: add a space `eventually {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480296 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { --- End diff -- nit: no space before `:` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480190 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -568,13 +573,14 @@ class ExternalAppendOnlyMap[K, V, C]( context.addTaskCompletionListener[Unit](context => cleanup()) } - private[this] class SpillableIterator(var upstream: Iterator[(K, C)]) + /** + * Exposed for testing + */ + private[collection] class SpillableIterator(var upstream: Iterator[(K, C)]) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209480155 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -114,7 +117,10 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() - @volatile private var readingIterator: SpillableIterator = null + /** + * Exposed for testing + */ + @volatile private[collection] var readingIterator: SpillableIterator = null --- End diff -- This is not exposed in the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209479417 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. +resetFetchedData() +throw new MissingOffsetException(offset, offsetAfterPoll) --- End diff -- So MissingOffsetRange is only used to signal that some offset may be missing due to control messages and nothing else. And the higher function (i.e. `get`) just handles it by resetting the fetched offsets. Why not let this `fetchData` method handle the situation instead of creating a new exception just for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209479551 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala --- @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD( offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray } - override def count(): Long = offsetRanges.map(_.size).sum --- End diff -- God catch. That would have never occurred to me! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209475048 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { --- End diff -- Its hard to understand this condition because it hard to understand what offsetAfterPoll means? Does it refer to the offset that will be fetched next by the KafkaConsumer? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209476712 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -347,9 +391,12 @@ private[kafka010] case class InternalKafkaConsumer( } private def poll(pollTimeoutMs: Long): Unit = { +offsetBeforePoll = consumer.position(topicPartition) --- End diff -- This variable `offsetBeforePoll` seems to be only used to identify whether data was actually fetched in a poll and nothing else. Rather than define another var (there are already many that already confusing), why not just return a boolean from poll which is true or false depending on whether poll moved anything. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209473392 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -31,6 +31,17 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread +/** + * An exception to indicate there is a missing offset in the records returned by Kafka consumer. + * This means it's either a transaction (commit or abort) marker, or an aborted message if + * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are + * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. + */ +private[kafka010] class MissingOffsetException( --- End diff -- nit: Is this meant to be used outside this KafkaDataConsumer class? If not, then maybe make it an inner class to KafkaDataConsumer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209477156 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. +resetFetchedData() +throw new MissingOffsetException(offset, offsetAfterPoll) + } else { +seek(offset) +poll(pollTimeoutMs) + } } if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: + // We cannot fetch anything after `poll`. Three possible cases: // - `offset` is out of range so that Kafka returns nothing. Just throw // `OffsetOutOfRangeException` to let the caller handle it. // - Cannot fetch any data before timeout. TimeoutException will be thrown. + // - Fetched something but all of them are not valid date messages. In this case, the position + // will be changed and we can use it to determine this case. val range = getAvailableOffsetRange() if (offset < range.earliest || offset >= range.latest) { throw new OffsetOutOfRangeException( Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) - } else { + } else if (offsetBeforePoll == offsetAfterPoll) { --- End diff -- Just to be clear, can this happen only if there is a timeout?? And if so then why push this condition and exception into the poll() method thus simplifying this method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209476548 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. +resetFetchedData() +throw new MissingOffsetException(offset, offsetAfterPoll) + } else { +seek(offset) +poll(pollTimeoutMs) + } } if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: + // We cannot fetch anything after `poll`. Three possible cases: // - `offset` is out of range so that Kafka returns nothing. Just throw // `OffsetOutOfRangeException` to let the caller handle it. // - Cannot fetch any data before timeout. TimeoutException will be thrown. + // - Fetched something but all of them are not valid date messages. In this case, the position --- End diff -- date => data --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209478033 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. +resetFetchedData() +throw new MissingOffsetException(offset, offsetAfterPoll) + } else { +seek(offset) +poll(pollTimeoutMs) + } } if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: + // We cannot fetch anything after `poll`. Three possible cases: // - `offset` is out of range so that Kafka returns nothing. Just throw // `OffsetOutOfRangeException` to let the caller handle it. // - Cannot fetch any data before timeout. TimeoutException will be thrown. + // - Fetched something but all of them are not valid date messages. In this case, the position + // will be changed and we can use it to determine this case. val range = getAvailableOffsetRange() if (offset < range.earliest || offset >= range.latest) { throw new OffsetOutOfRangeException( Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) - } else { + } else if (offsetBeforePoll == offsetAfterPoll) { throw new TimeoutException( s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") + } else { +assert(offset <= offsetAfterPoll, + s"seek to $offset and poll but the offset was reset to $offsetAfterPoll") +throw new MissingOffsetException(offset, offsetAfterPoll) } } else { --- End diff -- Let's remove this else and reduce the condition nesting. The previous `if` statement always ends in an exception, so we can remove this else. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209473432 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -31,6 +31,17 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread +/** + * An exception to indicate there is a missing offset in the records returned by Kafka consumer. + * This means it's either a transaction (commit or abort) marker, or an aborted message if + * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are + * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. + */ +private[kafka010] class MissingOffsetException( +val offset: Long, --- End diff -- maybe rename offset to something like missingOffset. Its weird to have a generic named field "offset" next to a specifically named field "nextOffsetToFetch". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209474755 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, --- End diff -- Update docs of this method saying that it can throw MissingOffsetException and what it means? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209473316 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -95,6 +106,10 @@ private[kafka010] case class InternalKafkaConsumer( ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET + @volatile private var offsetBeforePoll: Long = UNKNOWN_OFFSET --- End diff -- Can you add some docs to explain what these 2 vars siginify and why these vars are needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21859: [SPARK-24900][SQL]Speed up sort when the dataset is smal...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21859 **[Test build #94662 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94662/testReport)** for PR 21859 at commit [`46bab16`](https://github.com/apache/spark/commit/46bab165af68c1ef2dd1fc57e7f27f5d27c72015). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21859: [SPARK-24900][SQL]Speed up sort when the dataset is smal...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21859 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22075#discussion_r209478694 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -1852,6 +1852,11 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { df.selectExpr("transform(i, x -> x)") } assert(ex2.getMessage.contains("data type mismatch: argument 1 requires array type")) + +val ex3 = intercept[AnalysisException] { + df.selectExpr("transform(a, x -> x)") --- End diff -- The previous behavior was the same. I just added to check the behavior is as expected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21859: [SPARK-24900][SQL]Speed up sort when the dataset is smal...
Github user sddyljsx commented on the issue: https://github.com/apache/spark/pull/21859 please help retest it . @kiszk @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21732: [SPARK-24762][SQL] Enable Option of Product encoders
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21732 Can you give a description of the new encoding rules regarding `Option`? e.g. ``` Option[Int] in normal encoder -> a nullable int column Option[Int] in agg encoder -> a nullable int column Option[Int] in tupled encoder -> a nullable int column Option[Product] in normal encoder => a nullable struct column Option[Product] in agg encoder => ? ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22066: [SPARK-25084][SQL] "distribute by" on multiple columns (...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/22066 @cloud-fan , yeah, I will include it in 2.3.2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22084: [SPARK-25026][BUILD] Binary releases should contain some...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22084 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21939 @BryanCutler, not a big deal but why don't we link Arrow JIRA for "Allow for adding BinaryType support" too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17520: [WIP][SPARK-19712][SQL] Move PullupCorrelatedPredicates ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17520 **[Test build #94661 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94661/testReport)** for PR 17520 at commit [`b923bd5`](https://github.com/apache/spark/commit/b923bd5b2c79a84ada834a32b756ad0da80f12c6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209476191 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- Wait wait.. you guys sure have another Java gateway for each worker? (or did I rush to read this code?) Can you elaborate why this is needed? We should avoid this unless it's super required or necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22082: [SPARK-24420][Build][FOLLOW-UP] Upgrade ASM6 APIs
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22082 **[Test build #94660 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94660/testReport)** for PR 22082 at commit [`2666500`](https://github.com/apache/spark/commit/266650006ed1f5d19d6eaf24d7058ca341457039). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22007: [SPARK-25033] Bump Apache commons.{httpclient, ht...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22007 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22082: [SPARK-24420][Build][FOLLOW-UP] Upgrade ASM6 APIs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22082 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2110/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22082: [SPARK-24420][Build][FOLLOW-UP] Upgrade ASM6 APIs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22082 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22075#discussion_r209475881 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -1852,6 +1852,11 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { df.selectExpr("transform(i, x -> x)") } assert(ex2.getMessage.contains("data type mismatch: argument 1 requires array type")) + +val ex3 = intercept[AnalysisException] { + df.selectExpr("transform(a, x -> x)") --- End diff -- what's the previous behavior? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org