[GitHub] spark issue #19287: [SPARK-22074][Core] Task killed by other attempt task sh...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19287 **[Test build #81969 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81969/testReport)** for PR 19287 at commit [`a28daa2`](https://github.com/apache/spark/commit/a28daa2c3283ad31659f840e6d401ab48a42ad88). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17819 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17819 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81963/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17819 **[Test build #81963 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81963/testReport)** for PR 17819 at commit [`60d3ba1`](https://github.com/apache/spark/commit/60d3ba1ec3c2c9d767e8f63f43aadda2de4c4e28). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/19287 [SPARK-22074][Core] Task killed by other attempt task should not be resubmitted ## What changes were proposed in this pull request? As the detail scenario described in [SPARK-22074](https://issues.apache.org/jira/browse/SPARK-22074), unnecessary resubmitted may cause stage hanging in currently release versions. This patch add a new var in TaskInfo to mark this task killed by other attempt or not. ## How was this patch tested? Add a new UT `[SPARK-22074] Task killed by other attempt task should not be resubmitted` in TaskSetManagerSuite, this UT recreate the scenario in JIRA description, it failed without the changes in this PR and passed conversely. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-22074 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19287.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19287 commit a28daa2c3283ad31659f840e6d401ab48a42ad88 Author: Yuanjian LiDate: 2017-09-20T05:35:35Z [SPARK-22074][Core] Task killed by other attempt task should not be resubmitted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18193: [SPARK-15616] [SQL] CatalogRelation should fallback to H...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18193 **[Test build #81968 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81968/testReport)** for PR 18193 at commit [`72f63fa`](https://github.com/apache/spark/commit/72f63fafbe23d31e831edfc9ca832c5d62aefde1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r139879632 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -140,6 +141,62 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } /** + * + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions( +session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case filter @ Filter(condition, relation: HiveTableRelation) if relation.isPartitioned => + val predicates = splitConjunctivePredicates(condition) + val normalizedFilters = predicates.map { e => +e transform { + case a: AttributeReference => +a.withName(relation.output.find(_.semanticEquals(a)).get.name) +} + } + val partitionSet = AttributeSet(relation.partitionCols) + val pruningPredicates = normalizedFilters.filter { predicate => +!predicate.references.isEmpty && + predicate.references.subsetOf(partitionSet) + } + if (pruningPredicates.nonEmpty && session.sessionState.conf.fallBackToHdfsForStatsEnabled && +session.sessionState.conf.metastorePartitionPruning) { +val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + pruningPredicates, + session.sessionState.conf.sessionLocalTimeZone) +val sizeInBytes = try { + prunedPartitions.map { part => +val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +if (totalSize.isDefined && totalSize.get > 0L) { --- End diff -- @cenyuhai Yes,I think what you said is right.Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19211 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19229 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19229 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81964/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19211 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19229 **[Test build #81964 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81964/testReport)** for PR 19229 at commit [`2086900`](https://github.com/apache/spark/commit/2086900168bb1595de7e68efdebfecc9fb38314b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19285 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19285 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81961/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19285 **[Test build #81961 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81961/testReport)** for PR 19285 at commit [`d2b8ccd`](https://github.com/apache/spark/commit/d2b8ccd500f0076d281cc402b4a9633fb38562ed). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19286: [SPARK-21338][SQL][FOLLOW-UP] Implement isCascadi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19286#discussion_r139878221 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala --- @@ -747,6 +747,19 @@ class JDBCSuite extends SparkFunSuite assert(agg.getCatalystType(0, "", 1, null) === Some(LongType)) assert(agg.getCatalystType(1, "", 1, null) === Some(StringType)) assert(agg.isCascadingTruncateTable() === Some(true)) + +val agg2 = new AggregatedDialect(List(new JdbcDialect { + override def canHandle(url: String) : Boolean = url.startsWith("jdbc:h2:") + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = +if (sqlType % 2 == 0) { + Some(LongType) +} else { + None +} + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) +}, testH2Dialect)) +assert(agg2.isCascadingTruncateTable() === None) --- End diff -- Let us add test cases to enumerate all the combinations of `None`, `true` and `false` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user zuotingbing commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139878136 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => --- End diff -- Because there can throws an exception with "Codec [$codecName] is not available" in CompressionCodec.createCodec function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139878051 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139877802 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => --- End diff -- The type of `child` can be `TimestampType` and `DateType`, but endpoints can only be `ArrayType` of `NumericType`. It may not be convenient to set up numeric endpoints for a timestamp or date child column. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19211#discussion_r139877613 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -39,20 +41,13 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { - - self => +private[spark] class LiveListenerBus(conf: SparkConf) { --- End diff -- oh sorry I missed that, `LiveListenerBus` doesn't extend `SparkListenerBus` anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calculate i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19281 **[Test build #81966 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81966/testReport)** for PR 19281 at commit [`7893935`](https://github.com/apache/spark/commit/7893935d694663316575a7485ea833fab998d108). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18805 **[Test build #81967 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81967/testReport)** for PR 18805 at commit [`029a753`](https://github.com/apache/spark/commit/029a753ad4be6881c4e1721eecdfaad0f8b158bd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18659 ok let's work around the type casting issue and discuss arrow upgrading later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139877421 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") --- End diff -- Should we also check element type of `endpointsExpression`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calculate i...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19281 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calculate i...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/19281 bq. This is not accurate. It depends on the length of required ordering and the length of child ordering. You are right. I did it right in the code but made a mistake in the description here. Thanks for pointing out, @gatorsmile! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139876729 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. --- End diff -- `An array expression with `NumericType` element to construct the intervals Must be foldable.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139876548 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, --- End diff -- `endpointsExpression` is foldable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calculate i...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19281 > If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering This is not accurate. It depends on the length of required ordering and the length of child ordering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19229 @viirya Thanks very much! Although the perf gap exists (when numCols is large), it won't block this PR. I will create a JIRA to track this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15544 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81960/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15544 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15544 **[Test build #81960 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81960/testReport)** for PR 15544 at commit [`cadb31b`](https://github.com/apache/spark/commit/cadb31b4fe9d2a5a061e35d37629dd5bd7b3b96e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19286: [SPARK-21338][SQL][FOLLOW-UP] Implement isCascadingTrunc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19286 **[Test build #81965 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81965/testReport)** for PR 19286 at commit [`803b196`](https://github.com/apache/spark/commit/803b1961a34d4d9f4c8ebcbe5544dd23fbaa720a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19286: [SPARK-21338][SQL][FOLLOW-UP] Implement isCascadingTrunc...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19286 cc @gatorsmile @huaxingao --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139875187 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -101,14 +101,15 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - /** - * For SMJ, child's output must have been sorted on key or expressions with the same order as - * key, so we can get ordering for key from child's output ordering. - */ private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder]) : Seq[SortOrder] = { -keys.zip(childOutputOrdering).map { case (key, childOrder) => - SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key) +val requiredOrdering = requiredOrders(keys) +if (SparkPlan.orderingSatisfies(childOutputOrdering, requiredOrdering)) { --- End diff -- Please add a comment here to explain the reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139875333 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext { } } + def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: Int): Any = { --- End diff -- Please add function comments to explain what it does --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19286: [SPARK-21338][SQL][FOLLOW-UP] Implement isCascadi...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/19286 [SPARK-21338][SQL][FOLLOW-UP] Implement isCascadingTruncateTable() method in AggregatedDialect ## What changes were proposed in this pull request? The implemented `isCascadingTruncateTable` in `AggregatedDialect` is wrong. When no dialect claims cascading, once there is an unknown cascading truncate in the dialects, we should return unknown cascading, instead of false. ## How was this patch tested? Added test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-21338-followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19286.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19286 commit 803b1961a34d4d9f4c8ebcbe5544dd23fbaa720a Author: Liang-Chi HsiehDate: 2017-09-20T04:52:08Z Fix isCascadingTruncateTable for AggregatedDialect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139873950 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { --- End diff -- Let us first move it to `SortOrder`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139873547 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { +if (required.nonEmpty) { + if (required.length > actual.length) { +false + } else { +required.zip(actual).forall { + case (requiredOrder, actualOrder) => +actualOrder.satisfies(requiredOrder) +} + } +} else { + true +} --- End diff -- Please simplify it to ```Scala if (required.isEmpty) { true } else if (required.length > actual.length) { false } else { required.zip(actual).forall { case (requiredOrder, actualOrder) => actualOrder.satisfies(requiredOrder) } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19246: [SPARK-22025][PySpark] Speeding up fromInternal for Stru...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19246 I'd close this PR if there is no objection @maver1ck and I didn't miss something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19246: [SPARK-22025][PySpark] Speeding up fromInternal f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19246#discussion_r139874732 --- Diff: python/pyspark/sql/types.py --- @@ -410,6 +410,24 @@ def __init__(self, name, dataType, nullable=True, metadata=None): self.dataType = dataType self.nullable = nullable self.metadata = metadata or {} +self.needConversion = dataType.needConversion +self.toInternal = dataType.toInternal +self.fromInternal = dataType.fromInternal + +def __getstate__(self): +"""Return state values to be pickled.""" +return (self.name, self.dataType, self.nullable, self.metadata) + +def __setstate__(self, state): +"""Restore state from the unpickled state values.""" +name, dataType, nullable, metadata = state +self.name = name +self.dataType = dataType +self.nullable = nullable +self.metadata = metadata +self.needConversion = dataType.needConversion --- End diff -- At the current master, https://github.com/apache/spark/commit/718bbc939037929ef5b8f4b4fe10aadfbab4408e **Before** ``` ./build/mvn -DskipTests -Psparkr -Phive -Phive-thriftserver clean package find . -name "*.pyc" -exec rm -f {} \; sync && sudo purge ./bin/pyspark --conf spark.python.profile=true ``` ```python df = spark.range(1000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache() df.count() df.rdd.map(lambda x: x).count() sc.show_profiles() ``` ``` Profile of
[GitHub] spark pull request #18754: [WIP][SPARK-21552][SQL] Add DecimalType support t...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18754#discussion_r139872489 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala --- @@ -224,6 +226,25 @@ private[arrow] class DoubleWriter(val valueVector: NullableFloat8Vector) extends } } +private[arrow] class DecimalWriter( +val valueVector: NullableDecimalVector, +precision: Int, +scale: Int) extends ArrowFieldWriter { + + override def valueMutator: NullableDecimalVector#Mutator = valueVector.getMutator() + + override def setNull(): Unit = { +valueMutator.setNull(count) + } + + override def setValue(input: SpecializedGetters, ordinal: Int): Unit = { +valueMutator.setIndexDefined(count) +val decimal = input.getDecimal(ordinal, precision, scale) +decimal.changePrecision(precision, scale) +DecimalUtility.writeBigDecimalToArrowBuf(decimal.toJavaBigDecimal, valueVector.getBuffer, count) --- End diff -- I've confirmed it fixes the failure. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19246: [SPARK-22025][PySpark] Speeding up fromInternal f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19246#discussion_r139872337 --- Diff: python/pyspark/sql/types.py --- @@ -410,6 +410,24 @@ def __init__(self, name, dataType, nullable=True, metadata=None): self.dataType = dataType self.nullable = nullable self.metadata = metadata or {} +self.needConversion = dataType.needConversion +self.toInternal = dataType.toInternal +self.fromInternal = dataType.fromInternal + +def __getstate__(self): +"""Return state values to be pickled.""" +return (self.name, self.dataType, self.nullable, self.metadata) + +def __setstate__(self, state): +"""Restore state from the unpickled state values.""" +name, dataType, nullable, metadata = state +self.name = name +self.dataType = dataType +self.nullable = nullable +self.metadata = metadata +self.needConversion = dataType.needConversion --- End diff -- Ah, I think @maver1ck did this with https://github.com/apache/spark/pull/19249 I guess. I ran the same code in the PR description. Will double check and be back with some commands I ran. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19229 numColums | RDD Mean | RDD Median | DataFrame Mean | DataFrame Median -- | -- | -- | -- | -- 1 | 0.1642173481 | 0.199774305 | 0.4260180671006 | 0.2025112919 10 | 0.3713707549 | 0.529010404301 | 0.4362606840996 | 0.4952177834006 100 | 6.8645389335 | 8.83867498289 | 1.6645560224 | 2.921396424397 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19246: [SPARK-22025][PySpark] Speeding up fromInternal f...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19246#discussion_r139871986 --- Diff: python/pyspark/sql/types.py --- @@ -410,6 +410,24 @@ def __init__(self, name, dataType, nullable=True, metadata=None): self.dataType = dataType self.nullable = nullable self.metadata = metadata or {} +self.needConversion = dataType.needConversion +self.toInternal = dataType.toInternal +self.fromInternal = dataType.fromInternal + +def __getstate__(self): +"""Return state values to be pickled.""" +return (self.name, self.dataType, self.nullable, self.metadata) + +def __setstate__(self, state): +"""Restore state from the unpickled state values.""" +name, dataType, nullable, metadata = state +self.name = name +self.dataType = dataType +self.nullable = nullable +self.metadata = metadata +self.needConversion = dataType.needConversion --- End diff -- What's the difference between your benchmark and @maver1ck's? Why are the improvements so different? If the improvement is not quite significant, we shouldn't take this patch because it confesses developers as you said. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19229 I ran the test codes to benchmark RDD-version and DataFrame version with this `ImputerModel` change: import org.apache.spark.ml.feature._ import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types._ import spark.implicits._ import scala.util.Random def genData(): DataFrame = { val seed = 123l val random = new Random(seed) val n = 1 val m = 100 val rows = sc.parallelize(1 to n).map(i=> Row(Array.fill(m)(random.nextDouble): _*)) val struct = new StructType(Array.range(0,m,1).map(i => StructField(s"c$i",DoubleType,true))) val df = spark.createDataFrame(rows, struct) df.cache() df.count() df } for (strategy <- Seq("mean", "median"); k <- Seq(1,10,100)) { val imputer = new Imputer().setStrategy(strategy).setInputCols(Array.range(0,k,1).map(i=>s"c$i")).setOutputCols(Array.range(0,k,1).map(i=>s"o$i")) var duration = 0.0 for (i<- 0 until 10) { val df = genData() val start = System.nanoTime() val model = imputer.fit(df) val end = System.nanoTime() val df2 = genData() val start2 = System.nanoTime() model.transform(df2).count val end2 = System.nanoTime() duration += ((end - start) + (end2 - start2)) / 1e9 } println((strategy, k, duration/10)) } --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19243: [SPARK-21780][R] Simpler Dataset.sample API in R
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19243#discussion_r139868488 --- Diff: R/pkg/R/DataFrame.R --- @@ -998,33 +998,44 @@ setMethod("unique", #' sparkR.session() #' path <- "path/to/file.json" #' df <- read.json(path) +#' collect(sample(df, fraction = 0.5)) #' collect(sample(df, FALSE, 0.5)) -#' collect(sample(df, TRUE, 0.5)) +#' collect(sample(df, TRUE, 0.5, seed = 3)) #'} #' @note sample since 1.4.0 setMethod("sample", - signature(x = "SparkDataFrame", withReplacement = "logical", -fraction = "numeric"), - function(x, withReplacement, fraction, seed) { -if (fraction < 0.0) stop(cat("Negative fraction value:", fraction)) + signature(x = "SparkDataFrame"), + function(x, withReplacement = FALSE, fraction, seed) { +if (!is.numeric(fraction)) { + stop(paste("fraction must be numeric; however, got", class(fraction))) +} +if (!is.logical(withReplacement)) { + stop(paste("withReplacement must be logical; however, got", class(withReplacement))) +} + if (!missing(seed)) { + if (is.null(seed) || is.na(seed)) { +stop(paste("seed must not be NULL or NA; however, got", class(seed))) --- End diff -- this actually doesn't work for NA ``` > class(NULL) [1] "NULL" > class(NA) [1] "logical" ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19276: [SPARK-22049][DOCS] Confusing behavior of from_ut...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19276#discussion_r139868265 --- Diff: R/pkg/R/functions.R --- @@ -2286,8 +2286,8 @@ setMethod("next_day", signature(y = "Column", x = "character"), }) #' @details -#' \code{to_utc_timestamp}: Given a timestamp, which corresponds to a certain time of day -#' in the given timezone, returns another timestamp that corresponds to the same time of day in UTC. +#' \code{to_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in the given +# time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1' would yield '2017-07-14 01:40:00.0'. --- End diff -- same here with `#` -> `#'` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139867429 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => +codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) --- End diff -- Is it better to move this line ` val in = new BufferedInputStream(fs.open(log))` to here to solve your problem? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r139867369 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => --- End diff -- Why would here throw an exception? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19229 **[Test build #81964 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81964/testReport)** for PR 19229 at commit [`2086900`](https://github.com/apache/spark/commit/2086900168bb1595de7e68efdebfecc9fb38314b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/17819 @MLnick Yea, you're right, only move `setXXX` to concrete class also work fine. The root cause is the `setXXX` return type. But I think the multi / single logic can be merged, because single input column is a special case of multiple input column. What do you think of it ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17819 **[Test build #81963 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81963/testReport)** for PR 17819 at commit [`60d3ba1`](https://github.com/apache/spark/commit/60d3ba1ec3c2c9d767e8f63f43aadda2de4c4e28). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19285 **[Test build #81961 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81961/testReport)** for PR 19285 at commit [`d2b8ccd`](https://github.com/apache/spark/commit/d2b8ccd500f0076d281cc402b4a9633fb38562ed). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory usage to...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19160 **[Test build #81962 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81962/testReport)** for PR 19160 at commit [`9a080dd`](https://github.com/apache/spark/commit/9a080dd1f13dc9c6ad7d171f4cbe9f077be867eb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19285 ok to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19271 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81958/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSpli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19278 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81959/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19271 **[Test build #81958 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81958/testReport)** for PR 19271 at commit [`8e13959`](https://github.com/apache/spark/commit/8e139594ce164a18fa54df680afbc213691da081). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19271 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSpli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19278 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSpli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19278 **[Test build #81959 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81959/testReport)** for PR 19278 at commit [`cc30578`](https://github.com/apache/spark/commit/cc30578d2d25d3345821793fcf2ce030cf991a92). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18648: [SPARK-21428] Turn IsolatedClientLoader off while using ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/18648 The description is not clear, at least I get understood after diving into the code changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19271 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81957/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19271 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19271 **[Test build #81957 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81957/testReport)** for PR 19271 at commit [`8a551d7`](https://github.com/apache/spark/commit/8a551d7fc045fd633cc15c14b27133a13eacb727). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17819 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81956/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17819 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17819 **[Test build #81956 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81956/testReport)** for PR 17819 at commit [`92ef9bd`](https://github.com/apache/spark/commit/92ef9bde1e048eef7e3b530286723cad5773debc). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18685: [SPARK-21439][PySpark] Support for ABCMeta in PySpark
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18685 This should be good to go as soon as updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19234: [SPARK-22010][PySpark] Change fromInternal method of Tim...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19234 Hm, BTW, do we handle https://github.com/python/cpython/blob/018d353c1c8c87767d2335cd884017c2ce12e045/Lib/datetime.py#L1443-L1455: ```python if tz is None: # As of version 2015f max fold in IANA database is # 23 hours at 1969-09-30 13:00:00 in Kwajalein. # Let's probe 24 hours in the past to detect a transition: max_fold_seconds = 24 * 3600 y, m, d, hh, mm, ss = converter(t - max_fold_seconds)[:6] probe1 = cls(y, m, d, hh, mm, ss, us, tz) trans = result - probe1 - timedelta(0, max_fold_seconds) if trans.days < 0: y, m, d, hh, mm, ss = converter(t + trans // timedelta(0, 1))[:6] probe2 = cls(y, m, d, hh, mm, ss, us, tz) if probe2 == result: result._fold = 1 ``` Or do you guys see it could be ignorable as it is quite newly fixed (in 3.6.x) and a corner case vs the improvement? Looking at the performance improvement in the PR description, it sounds pretty trivial. If it is safe to go, I am okay but if we miss anything, I doubt if it is worth fixing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139861892 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java --- @@ -117,6 +118,12 @@ public void fetchBlocks( } } + @Override + public MetricSet shuffleMetrics() { +checkInit(); --- End diff -- Seems it should be, but looks like we never touch this issue before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15544 **[Test build #81960 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81960/testReport)** for PR 15544 at commit [`cadb31b`](https://github.com/apache/spark/commit/cadb31b4fe9d2a5a061e35d37629dd5bd7b3b96e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user cenyuhai commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r139861601 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -140,6 +141,62 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } /** + * + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions( +session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case filter @ Filter(condition, relation: HiveTableRelation) if relation.isPartitioned => + val predicates = splitConjunctivePredicates(condition) + val normalizedFilters = predicates.map { e => +e transform { + case a: AttributeReference => +a.withName(relation.output.find(_.semanticEquals(a)).get.name) +} + } + val partitionSet = AttributeSet(relation.partitionCols) + val pruningPredicates = normalizedFilters.filter { predicate => +!predicate.references.isEmpty && + predicate.references.subsetOf(partitionSet) + } + if (pruningPredicates.nonEmpty && session.sessionState.conf.fallBackToHdfsForStatsEnabled && +session.sessionState.conf.metastorePartitionPruning) { +val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + pruningPredicates, + session.sessionState.conf.sessionLocalTimeZone) +val sizeInBytes = try { + prunedPartitions.map { part => +val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +if (totalSize.isDefined && totalSize.get > 0L) { --- End diff -- I think we should first use rawDataSize, because 1MB orc file is equal to 5MB textfile... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139861341 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -115,6 +115,7 @@ private[spark] class Executor( if (!isLocal) { env.metricsSystem.registerSource(executorSource) env.blockManager.initialize(conf.getAppId) +env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) --- End diff -- Sure, let me update it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139861303 --- Diff: core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala --- @@ -19,19 +19,19 @@ package org.apache.spark.deploy import javax.annotation.concurrent.ThreadSafe -import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.{MetricRegistry, MetricSet} import org.apache.spark.metrics.source.Source -import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler /** * Provides metrics source for external shuffle service */ @ThreadSafe -private class ExternalShuffleServiceSource -(blockHandler: ExternalShuffleBlockHandler) extends Source { +private class ExternalShuffleServiceSource extends Source { --- End diff -- I'm not sure, maybe it should a part of regular shuffle server. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19246: [SPARK-22025][PySpark] Speeding up fromInternal f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19246#discussion_r139861057 --- Diff: python/pyspark/sql/types.py --- @@ -410,6 +410,24 @@ def __init__(self, name, dataType, nullable=True, metadata=None): self.dataType = dataType self.nullable = nullable self.metadata = metadata or {} +self.needConversion = dataType.needConversion +self.toInternal = dataType.toInternal +self.fromInternal = dataType.fromInternal + +def __getstate__(self): +"""Return state values to be pickled.""" +return (self.name, self.dataType, self.nullable, self.metadata) + +def __setstate__(self, state): +"""Restore state from the unpickled state values.""" +name, dataType, nullable, metadata = state +self.name = name +self.dataType = dataType +self.nullable = nullable +self.metadata = metadata +self.needConversion = dataType.needConversion --- End diff -- WDYT @ueshin? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19246: [SPARK-22025][PySpark] Speeding up fromInternal f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19246#discussion_r139861001 --- Diff: python/pyspark/sql/types.py --- @@ -410,6 +410,24 @@ def __init__(self, name, dataType, nullable=True, metadata=None): self.dataType = dataType self.nullable = nullable self.metadata = metadata or {} +self.needConversion = dataType.needConversion +self.toInternal = dataType.toInternal +self.fromInternal = dataType.fromInternal + +def __getstate__(self): +"""Return state values to be pickled.""" +return (self.name, self.dataType, self.nullable, self.metadata) + +def __setstate__(self, state): +"""Restore state from the unpickled state values.""" +name, dataType, nullable, metadata = state +self.name = name +self.dataType = dataType +self.nullable = nullable +self.metadata = metadata +self.needConversion = dataType.needConversion --- End diff -- My only main concern is, it replaces the reference of the bound method from`StructType` to another method bound to another instance. I don't actually quite like a monkey patch in Python because, IMHO, it confuses other developers, which might slow down the improvement iteration from the community. I just ran the Python profile on the top of the current master with this patch: **Before** ``` Profile of
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139860969 --- Diff: core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala --- @@ -18,11 +18,14 @@ package org.apache.spark.network.netty import java.nio.ByteBuffer +import java.util --- End diff -- OK, I will change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139860924 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -248,6 +251,16 @@ private[spark] class BlockManager( logInfo(s"Initialized BlockManager: $blockManagerId") } + def shuffleMetricsSource: Source = { +import BlockManager._ + +if (externalShuffleServiceEnabled) { + new ShuffleMetricsSource("ExternalShuffle", shuffleClient.shuffleMetrics()) +} else { + new ShuffleMetricsSource("NettyBlockTransfer", shuffleClient.shuffleMetrics()) --- End diff -- For the external shuffle, we only have Transport client in the executor side, while for `NettyBlockTransfer` each executor will both server as transport client as well as server. So from my thought I explicitly distinguish those two cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18659 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18659 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81955/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18659 **[Test build #81955 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81955/testReport)** for PR 18659 at commit [`f451d65`](https://github.com/apache/spark/commit/f451d652a2656113cce1f0763e17c73ed2d03c44). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19259: [BACKPORT-2.1][SPARK-19318][SPARK-22041][SQL] Doc...
Github user wangyum closed the pull request at: https://github.com/apache/spark/pull/19259 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19284: [SPARK-22067][SQL] ArrowWriter should use positio...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19284 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19284 Thanks! merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r139859176 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervalsSuite.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateArray, Literal, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite { + + test("fails analysis if parameters are invalid") { +def assertEqual[T](left: T, right: T): Unit = { --- End diff -- oh, I'll remove this. Previously I put some other logic here, but we should remove it now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81954/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81954 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81954/testReport)** for PR 19196 at commit [`4eb7f4f`](https://github.com/apache/spark/commit/4eb7f4f6df3f2d5ae831bf15715651598e52c3e6). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class TestStatefulOperator(` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSpli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19278 **[Test build #81959 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81959/testReport)** for PR 19278 at commit [`cc30578`](https://github.com/apache/spark/commit/cc30578d2d25d3345821793fcf2ce030cf991a92). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19208: [SPARK-21087] [ML] CrossValidator, TrainValidationSplit ...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19208 @smurching I will update this PR after #19278 merged. Because now this PR depend on that one. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19284 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSpli...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19278 @BryanCutler The reason I add `skipParams` is that, if we don't use `DefaultParamReader.getAndSetParams`, we have to hardcoding all params which are very troublesome. And every time we add new params, we have to also update the hardcoding params, it is very easy to forget and cause bug. But if use `DefaultParamReader.getAndSetParams` but do not support `skipParams`, the `estimatorParamMaps` is difficult to handle. So this design is a balance of this concerns, although it makes the code a little weird. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139856165 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- Ah, I was thinking that disallowing 0-parameter panda_udf could be an option ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139855188 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,46 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): +""" +Serializes Pandas.Series as Arrow data. +""" + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize. Input is a single series or +a list of series accompanied by an optional pyarrow type to coerce the data to. +""" +import pyarrow as pa +# Make input conform to [(series1, type1), (series2, type2), ...] +if not isinstance(series, (list, tuple)) or \ +(len(series) == 2 and isinstance(series[1], pa.DataType)): +series = [series] +series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] --- End diff -- This might not be a big deal but .. I usually use generator if it iterates once and is discarded. This should consume less memory too as list comprehension should be evaluated once first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19271 **[Test build #81958 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81958/testReport)** for PR 19271 at commit [`8e13959`](https://github.com/apache/spark/commit/8e139594ce164a18fa54df680afbc213691da081). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139855087 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -303,16 +304,16 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] { val (metadata, estimator, evaluator, estimatorParamMaps) = ValidatorParams.loadImpl(path, sc, className) val numFolds = (metadata.params \ "numFolds").extract[Int] - val seed = (metadata.params \ "seed").extract[Long] val bestModelPath = new Path(path, "bestModel").toString val bestModel = DefaultParamsReader.loadParamsInstance[Model[_]](bestModelPath, sc) val avgMetrics = (metadata.metadata \ "avgMetrics").extract[Seq[Double]].toArray + val model = new CrossValidatorModel(metadata.uid, bestModel, avgMetrics) model.set(model.estimator, estimator) .set(model.evaluator, evaluator) .set(model.estimatorParamMaps, estimatorParamMaps) -.set(model.numFolds, numFolds) -.set(model.seed, seed) + DefaultParamsReader.getAndSetParams(model, metadata, skipParams = List("estimatorParamMaps")) --- End diff -- No. Because estimator and evaluator isn't included in metadata. You can check the saveImpl. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139854984 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -212,14 +213,12 @@ object CrossValidator extends MLReadable[CrossValidator] { val (metadata, estimator, evaluator, estimatorParamMaps) = ValidatorParams.loadImpl(path, sc, className) - val numFolds = (metadata.params \ "numFolds").extract[Int] - val seed = (metadata.params \ "seed").extract[Long] - new CrossValidator(metadata.uid) + val cv = new CrossValidator(metadata.uid) .setEstimator(estimator) .setEvaluator(evaluator) .setEstimatorParamMaps(estimatorParamMaps) -.setNumFolds(numFolds) -.setSeed(seed) + DefaultParamsReader.getAndSetParams(cv, metadata, skipParams = List("estimatorParamMaps")) --- End diff -- No. Because `estimator` and `evaluator` isn't included in metadata. You can check the `saveImpl`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org