[GitHub] spark pull request #19745: [SPARK-2926][Core][Follow Up] Sort shuffle reader...
Github user xuanyuanking closed the pull request at: https://github.com/apache/spark/pull/19745 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19745: [SPARK-2926][Core][Follow Up] Sort shuffle reader for Sp...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19745 No problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21839 Thanks for reviewing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21839 @gatorsmile @maropu This is the follow up PR for #21447, please have a look when you have time, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21839: [SPARK-24339][SQL] Prunes the unused columns from...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/21839 [SPARK-24339][SQL] Prunes the unused columns from child of ScriptTransformation ## What changes were proposed in this pull request? Modify the strategy in ColumnPruning to add a Project between ScriptTransformation and its child, this strategy can reduce the scan time especially in the scenario of the table has many columns. ## How was this patch tested? Add UT in ColumnPruningSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-24339 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21839.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21839 commit 68869d9fb8cc0e2686fb1e01f4d4c3e7ac8a52fe Author: Yuanjian Li Date: 2018-07-22T14:46:31Z Prunes the unused columns from child of ScriptTransformation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21447: [SPARK-24339][SQL]Add project for transform/map/reduce s...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21447 I want to give a follow up PR and cc @gatorsmile @maropu for a review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21533: [SPARK-24195][Core] Ignore the files with "local" scheme...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21533 @jiangxb1987 Thanks for reminding, rephrase done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202703948 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2248,4 +2249,20 @@ class HiveDDLSuite checkAnswer(spark.table("t4"), Row(0, 0)) } } + + test("desc formatted table for last access verification") { +withTable("t1") { + sql(s"create table" + +s" if not exists t1 (c1_int int, c2_string string, c3_float float)") + val desc = sql("DESC FORMATTED t1").collect().toSeq + val lastAcessField = desc.filter((r: Row) => r.getValuesMap(Seq("col_name")) +.get("col_name").getOrElse("").equals("Last Access")) + // Check whether lastAcessField key is exist + assert(!lastAcessField.isEmpty) + val validLastAcessFieldValue = lastAcessField.filterNot((r: Row) => ((r --- End diff -- where is the val `validLastAcessFieldValue` used? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202701870 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2248,4 +2249,20 @@ class HiveDDLSuite checkAnswer(spark.table("t4"), Row(0, 0)) } } + + test("desc formatted table for last access verification") { +withTable("t1") { + sql(s"create table" + +s" if not exists t1 (c1_int int, c2_string string, c3_float float)") + val desc = sql("DESC FORMATTED t1").collect().toSeq + val lastAcessField = desc.filter((r: Row) => r.getValuesMap(Seq("col_name")) --- End diff -- nit: lastAccessField --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202703129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -114,7 +114,10 @@ case class CatalogTablePartition( map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}") } map.put("Created Time", new Date(createTime).toString) -map.put("Last Access", new Date(lastAccessTime).toString) +val lastAccess = { + if (-1 == lastAccessTime) "UNKNOWN" else new Date(lastAccessTime).toString +} +map.put("Last Access", lastAccess) --- End diff -- No need for the val lastAccess? ``` map.put("Last Access", if (-1 == lastAccessTime) "UNKNOWN" else new Date(lastAccessTime).toString) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202704259 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2248,4 +2249,20 @@ class HiveDDLSuite checkAnswer(spark.table("t4"), Row(0, 0)) } } + + test("desc formatted table for last access verification") { +withTable("t1") { + sql(s"create table" + +s" if not exists t1 (c1_int int, c2_string string, c3_float float)") + val desc = sql("DESC FORMATTED t1").collect().toSeq + val lastAcessField = desc.filter((r: Row) => r.getValuesMap(Seq("col_name")) +.get("col_name").getOrElse("").equals("Last Access")) + // Check whether lastAcessField key is exist + assert(!lastAcessField.isEmpty) + val validLastAcessFieldValue = lastAcessField.filterNot((r: Row) => ((r +.getValuesMap(Seq("data_type")) +.get("data_type").contains(new Date(-1).toString + assert(lastAcessField.size!=0) --- End diff -- code style nit: blank before and after '!=' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202701770 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2248,4 +2249,20 @@ class HiveDDLSuite checkAnswer(spark.table("t4"), Row(0, 0)) } } + + test("desc formatted table for last access verification") { +withTable("t1") { + sql(s"create table" + +s" if not exists t1 (c1_int int, c2_string string, c3_float float)") + val desc = sql("DESC FORMATTED t1").collect().toSeq + val lastAcessField = desc.filter((r: Row) => r.getValuesMap(Seq("col_name")) +.get("col_name").getOrElse("").equals("Last Access")) + // Check whether lastAcessField key is exist + assert(!lastAcessField.isEmpty) --- End diff -- lastAccessField.nonEmpty --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 @gatorsmile @maropu Please have a look about this, solving the conflicts takes me some time. Also cc @jiangxb1987 because the conflict mainly with #20696, also thanks for the work in #20696, the latest pr no longer need to do the extra work for partition column comment changing as before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r204805474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand( // Find the origin column from dataSchema by column name. val originColumn = findColumnByName(table.dataSchema, columnName, resolver) -// Throw an AnalysisException if the column name/dataType is changed. +// Throw an AnalysisException if the column name is changed. if (!columnEqual(originColumn, newColumn, resolver)) { throw new AnalysisException( "ALTER TABLE CHANGE COLUMN is not supported for changing column " + s"'${originColumn.name}' with type '${originColumn.dataType}' to " + s"'${newColumn.name}' with type '${newColumn.dataType}'") } +val typeChanged = originColumn.dataType != newColumn.dataType +val partitionColumnChanged = table.partitionColumnNames.contains(originColumn.name) + +// Throw an AnalysisException if the type of partition column is changed. +if (typeChanged && partitionColumnChanged) { --- End diff -- Just adding a check here when user changing the type of partition columns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214075761 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray( return result; } - public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { -return fromPrimitiveArray(null, offset, length, elementSize); - } - - public static boolean shouldUseGenericArrayData(int elementSize, int length) { --- End diff -- I think `shouldUseGenericArrayData` is still used in generated code, check the code here: https://github.com/apache/spark/blob/b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3633 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22252: [SPARK-25261][MINOR][DOC] correct the default uni...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22252#discussion_r213708177 --- Diff: docs/configuration.md --- @@ -152,7 +152,7 @@ of the most common options to set are: spark.driver.memory 1g -Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in MiB +Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in bytes --- End diff -- I think I got the point you want to report @ivoson, IIUC, this is a bug in the code not in doc, we should also make `spark.driver.memory=1024` with the unit of MiB, maybe this change the original behavior and we can announce in migrate guide? cc @srowen @HyukjinKwon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214084903 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray( return result; } - public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { -return fromPrimitiveArray(null, offset, length, elementSize); - } - - public static boolean shouldUseGenericArrayData(int elementSize, int length) { --- End diff -- Yep, the UT failed log proved this:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95468/testReport/org.apache.spark.sql.catalyst.expressions/CollectionExpressionsSuite/Array_Union/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22282 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/1#discussion_r214508181 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl( accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) -val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { +val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) -taskIdToTaskSetManager.get(id).map { taskSetMgr => +Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => --- End diff -- Just leave a small concern here, original code locked hole scope of ids in `accumUpdates`, after this changing, maybe some id could be found originally but can't find now, because `taskIdToTaskSetManager` can be changed by `removeExecutor` or `statusUpdate`. Its not big problem if executor has been removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22205: [SPARK-25212][SQL] Support Filter in ConvertToLoc...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22205#discussion_r214505595 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1349,6 +1357,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] { case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) => LocalRelation(output, data.take(limit), isStreaming) + +case Filter(condition, LocalRelation(output, data, isStreaming)) --- End diff -- super nit: comment in https://github.com/apache/spark/pull/22205/files#diff-a636a87d8843eeccca90140be91d4fafR1348 not change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22313: [SPARK-25306][SQL] Use cache to speed up `createF...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22313#discussion_r214528435 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala --- @@ -55,19 +59,52 @@ import org.apache.spark.sql.types._ * known to be convertible. */ private[orc] object OrcFilters extends Logging { + case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, DataType]) + + private lazy val cacheExpireTimeout = + org.apache.spark.sql.execution.datasources.orc.OrcFilters.cacheExpireTimeout + + private lazy val searchArgumentCache = CacheBuilder.newBuilder() +.expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS) +.build( + new CacheLoader[FilterWithTypeMap, Option[Builder]]() { +override def load(typeMapAndFilter: FilterWithTypeMap): Option[Builder] = { + buildSearchArgument( +typeMapAndFilter.typeMap, typeMapAndFilter.filter, SearchArgumentFactory.newBuilder()) +} + }) + + private def getOrBuildSearchArgumentWithNewBuilder( --- End diff -- Just a little question about is any possible to reuse code with https://github.com/apache/spark/pull/22313/files#diff-224b8cbedf286ecbfdd092d1e2e2f237R61? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215271726 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- ``` I am a bit surprised that works, it would be great to understand why. Thanks. ``` Sorry for the bad test, that's too special and the result just right by accident. The original implement will make all semi join return `[]` in PySpark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 gentle ping @maropu, could you help to review this? I'll keep follow up this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22341: [SPARK-24889][Core] Update block info when unpers...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22341#discussion_r215285275 --- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala --- @@ -55,7 +55,7 @@ class RDDInfo( } private[spark] object RDDInfo { - private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) + private lazy val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) --- End diff -- Is this related to the problem? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22341: [SPARK-24889][Core] Update block info when unpers...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22341#discussion_r215318105 --- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala --- @@ -55,7 +55,7 @@ class RDDInfo( } private[spark] object RDDInfo { - private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) + private lazy val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM) --- End diff -- Thanks for explaining. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214974819 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- Thanks, I'll do more test on the SemiJoin here, but as currently test over PySpark, this is not wrong, maybe I misunderstand you two `wrong` means, is your `wrong` means correctness or just benchmark regression? ![image](https://user-images.githubusercontent.com/4833765/45043269-4ba20c80-b09f-11e8-84dc-a1f3ff416303.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22140 Thanks @BryanCutler @HyukjinKwon ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22326 Gental ping @mgaido91 @HyukjinKwon @dilipbiswal, great thanks for advice, please have a look when you have time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r215859764 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand( // Find the origin column from dataSchema by column name. val originColumn = findColumnByName(table.dataSchema, columnName, resolver) -// Throw an AnalysisException if the column name/dataType is changed. +// Throw an AnalysisException if the column name is changed. if (!columnEqual(originColumn, newColumn, resolver)) { throw new AnalysisException( "ALTER TABLE CHANGE COLUMN is not supported for changing column " + s"'${originColumn.name}' with type '${originColumn.dataType}' to " + s"'${newColumn.name}' with type '${newColumn.dataType}'") --- End diff -- After add the type check, maybe we also need the type message in error message. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r215859681 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand( // Find the origin column from dataSchema by column name. val originColumn = findColumnByName(table.dataSchema, columnName, resolver) -// Throw an AnalysisException if the column name/dataType is changed. +// Throw an AnalysisException if the column name is changed. if (!columnEqual(originColumn, newColumn, resolver)) { --- End diff -- Thanks, not enough yet, add type compatible check in ef65c4d. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r215860035 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- Thanks for advise, I should also check the type compatible, add in ef65c4d. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r215859851 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -1697,6 +1697,16 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 INT COMMENT 'this is col1'") assert(getMetadata("col1").getString("key") == "value") assert(getMetadata("col1").getString("comment") == "this is col1") + +// Ensure that changing column type takes effect +sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 STRING") +val column = catalog.getTableMetadata(tableIdent).schema.fields.find(_.name == "col1") +assert(column.get.dataType == StringType) + +// Ensure that changing partition column type throw exception +intercept[AnalysisException] { + sql("ALTER TABLE dbx.tab1 CHANGE COLUMN a a STRING") +} --- End diff -- Thanks, done in ef65c4d. Also add check for type compatible check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216132926 --- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java --- @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * This is based on hadoop-common-2.7.2 + * {@link org.apache.hadoop.fs.Globber}. + * This class exposes globWithThreshold which can be used glob path in parallel. + */ +public class SparkGlobber { + public static final Log LOG = LogFactory.getLog(SparkGlobber.class.getName()); + + private final FileSystem fs; + private final FileContext fc; + private final Path pathPattern; + + public SparkGlobber(FileSystem fs, Path pathPattern) { +this.fs = fs; +this.fc = null; +this.pathPattern = pathPattern; + } + + public SparkGlobber(FileContext fc, Path pathPattern) { +this.fs = null; +this.fc = fc; +this.pathPattern = pathPattern; + } + + private FileStatus getFileStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.getFileStatus(path); + } else { +return fc.getFileStatus(path); + } +} catch (FileNotFoundException e) { + return null; +} + } + + private FileStatus[] listStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.listStatus(path); + } else { +return fc.util().listStatus(path); + } +} catch (FileNotFoundException e) { + return new FileStatus[0]; +} + } + + private Path fixRelativePart(Path path) { +if (fs != null) { + return fs.fixRelativePart(path); +} else { + return fc.fixRelativePart(path); +} + } + + /** + * Convert a path component that contains backslash ecape sequences to a + * literal string. This is necessary when you want to explicitly refer to a + * path that contains globber metacharacters. + */ + private static String unescapePathComponent(String name) { +return name.replaceAll("(.)", "$1"); + } + + /** + * Translate an absolute path into a list of path components. + * We merge double slashes into a single slash here. + * POSIX root path, i.e. '/', does not get an entry in the list. + */ + private static List getPathComponents(String path) + throws IOException { +ArrayList ret = new ArrayList(); +for (String component : path.split(Path.SEPARATOR)) { + if (!component.isEmpty()) { +ret.add(component); + } +} +return ret; + } + + private String schemeFromPath(Path path) throws IOException { +String scheme = path.toUri().getScheme(); +if (scheme == null) { + if (fs != null) { +scheme = fs.getUri().getScheme(); + } else { +scheme = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme(); + } +} +return scheme; + } + + private String authorityFromPath(Path path) throws IOException { +String authority = path.toUri().getAuthority(); +if (authority == null) { + if (fs != null) { +authority = fs.getUri().getAuthority(); + } else { +authority = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority(); + } +} +return authority ; + } + + public FileStatus[] globWithThreshold(int threshold) throws IOException { +
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216132779 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- Its also can't work in Hive, I test with Hive 1.2.2 and Hadoop 2.7.3. ``` Logging initialized using configuration in jar:file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/lib/hive-common-1.2.2.jar!/hive-log4j.properties hive> CREATE TABLE t(a INT, b STRING, c INT) stored as parquet; OK Time taken: 1.604 seconds hive> INSERT INTO t VALUES (1, 'a', 3); Query ID = XuanYuan_20180908230549_3c8732ff-07e0-4a7a-95b4-260aed04a762 Total jobs = 3 Launching Job 1 out of 3 Number of reduce tasks is set to 0 since there's no reduce operator Job running in-process (local Hadoop) SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 2018-09-08 23:06:08,732 Stage-1 map = 0%, reduce = 0% 2018-09-08 23:06:09,743 Stage-1 map = 100%, reduce = 0% Ended Job = job_local712899233_0001 Stage-4 is selected by condition resolver. Stage-3 is filtered out by condition resolver. Stage-5 is filtered out by condition resolver. Moving data to: file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/warehouse/t/.hive-staging_hive_2018-09-08_23-05-49_782_100109481692677607-1/-ext-1 Loading data to table default.t Table default.t stats: [numFiles=1, numRows=1, totalSize=343, rawDataSize=3] MapReduce Jobs Launched: Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec OK Time taken: 20.294 seconds hive> select * from t; OK 1 a 3 Time taken: 0.154 seconds, Fetched: 1 row(s) hive> ALTER TABLE t CHANGE a a STRING; OK Time taken: 0.18 seconds hive> select * from t; OK Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.UnsupportedOperationException: Cannot inspect org.apache.hadoop.io.IntWritable Time taken: 0.116 seconds hive> ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216133261 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala --- @@ -77,6 +80,51 @@ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { }) } + test("test expanding glob path") { --- End diff -- ``` IIUC, the new feature is disabled as default since spark.sql.sources.parallelGetGlobbedPath.numThreads is 0. ``` Yes that's right. ``` I am afraid these test causes are executed only with disabling the new feature. ``` These mainly test the correctness of `sparkHadoopUtil.expandGlobPath`, maybe it's necessary to keep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21618 @kiszk @maropu Great thanks for your review and advise! I'll address them and resolve the conflicts ASAP. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22369 [SPARK-25072][DOC] Update migration guide for behavior change ## What changes were proposed in this pull request? Update the document for the behavior change in PySpark Row creation. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25072-DOC Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22369.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22369 commit d257a38c647b45a9e83a2bdbbd2814f1b3fc5d56 Author: Yuanjian Li Date: 2018-09-09T04:26:23Z Update doc for SPARK-25072 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147915 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -656,6 +656,25 @@ object SQLConf { .intConf .createWithDefault(1) + val PARALLEL_GET_GLOBBED_PATH_THRESHOLD = +buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold") + .doc("The maximum number of subfiles or directories allowed after a globbed path " + +"expansion.") + .intConf + .checkValue(threshold => threshold >= 0, "The maximum number of subfiles or directories " + --- End diff -- Maybe we should keep this public? Because the parallel only opened when the thread number > 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147921 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -724,4 +726,37 @@ object DataSource extends Logging { """.stripMargin) } } + + /** + * Return all paths represented by the wildcard string. + * This will be done in main thread by default while the value of config + * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local thread + * pool will expand the globbed paths. --- End diff -- Thanks, done in 1319cd3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147887 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1557,6 +1576,15 @@ class SQLConf extends Serializable with Logging { def parallelPartitionDiscoveryParallelism: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) + def parallelGetGlobbedPathThreshold: Int = +getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_THRESHOLD) + + def parallelGetGlobbedPathNumThreads: Int = +getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_NUM_THREADS) + + def parallelGetGlobbedPathEnabled: Boolean = --- End diff -- Thanks, done in 1319cd3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147889 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -724,4 +726,37 @@ object DataSource extends Logging { """.stripMargin) } } + + /** + * Return all paths represented by the wildcard string. + * This will be done in main thread by default while the value of config + * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local thread + * pool will expand the globbed paths. + */ + private def getGlobbedPaths( + sparkSession: SparkSession, --- End diff -- Thanks for advise, done in 1319cd3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216147919 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -724,4 +726,37 @@ object DataSource extends Logging { """.stripMargin) } } + + /** + * Return all paths represented by the wildcard string. + * This will be done in main thread by default while the value of config + * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local thread + * pool will expand the globbed paths. + */ + private def getGlobbedPaths( --- End diff -- Thanks, that's more clear, done in 1319cd3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22140 ``` @xuanyuanking Could you please update the document? ``` #22369 Thanks for reminding, I'll pay attention in future work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22369#discussion_r216189359 --- Diff: docs/sql-programming-guide.md --- @@ -1901,6 +1901,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above - As of version 2.3.1 Arrow functionality, including `pandas_udf` and `toPandas()`/`createDataFrame()` with `spark.sql.execution.arrow.enabled` set to `True`, has been marked as experimental. These are still evolving and not currently recommended for use in production. + - In version 2.3.1 and earlier, it is possible for PySpark to create a Row object by providing more value than column number through the customized Row class. Since Spark 2.3.3, Spark will confirm value length is less or equal than column length in PySpark. See [SPARK-25072](https://issues.apache.org/jira/browse/SPARK-25072) for details. --- End diff -- Thanks Bryan, I'll address this after discussion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22369: [SPARK-25072][DOC] Update migration guide for behavior c...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22369 Got it, thanks @HyukjinKwon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22140#discussion_r215601543 --- Diff: python/pyspark/sql/tests.py --- @@ -269,6 +269,10 @@ def test_struct_field_type_name(self): struct_field = StructField("a", IntegerType()) self.assertRaises(TypeError, struct_field.typeName) +def test_invalid_create_row(slef): +rowClass = Row("c1", "c2") --- End diff -- Thanks, done in eb3f506. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22140#discussion_r215601350 --- Diff: python/pyspark/sql/tests.py --- @@ -269,6 +269,10 @@ def test_struct_field_type_name(self): struct_field = StructField("a", IntegerType()) self.assertRaises(TypeError, struct_field.typeName) +def test_invalid_create_row(slef): --- End diff -- Thanks, done in eb3f506. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22140#discussion_r215601486 --- Diff: python/pyspark/sql/types.py --- @@ -1397,6 +1397,8 @@ def _create_row_inbound_converter(dataType): def _create_row(fields, values): +if len(values) > len(fields): +raise ValueError("Can not create %s by %s" % (fields, values)) --- End diff -- Thanks, improve done and move this check to `__call__` in `Row`. eb3f506 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214932266 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") --- End diff -- Thanks, done in a86a7d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214968900 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + --- End diff -- Thanks, done in 82e50d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214968794 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") --- End diff -- Make sense, also change this in `CheckCartesianProducts`. Done in 82e50d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214969191 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Filter(others.reduceLeft(And), join) --- End diff -- Thanks, no need to add extra Filter in LeftSemi case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214931484 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") + Cross +} else joinType --- End diff -- Thanks, done in a86a7d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r215635071 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -65,7 +65,7 @@ private[spark] class BarrierCoordinator( // Record all active stage attempts that make barrier() call(s), and the corresponding internal // state. - private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + private[spark] val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] --- End diff -- No problem, done in ecf12bd. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r215636283 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -187,6 +191,9 @@ private[spark] class BarrierCoordinator( requesters.clear() cancelTimerTask() } + +// Check for clearing internal data, visible for test only. +private[spark] def cleanCheck(): Boolean = requesters.isEmpty && timerTask == null --- End diff -- Add internal data clean check in Xingbo's comments: https://github.com/apache/spark/pull/22165#issuecomment-415086679. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r215635587 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeoutException + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.apache.spark._ +import org.apache.spark.rpc.RpcTimeout + +class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext { + + /** + * Get the current barrierEpoch from barrierCoordinator.states by ContextBarrierId + */ + def getCurrentBarrierEpoch( + stageId: Int, stageAttemptId: Int, barrierCoordinator: BarrierCoordinator): Int = { +val barrierId = ContextBarrierId(stageId, stageAttemptId) +barrierCoordinator.states.get(barrierId).barrierEpoch + } + + test("normal test for single task") { +sc = new SparkContext("local", "test") +val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, sc.env.rpcEnv) +val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", barrierCoordinator) +val stageId = 0 +val stageAttemptNumber = 0 +rpcEndpointRef.askSync[Unit]( + message = RequestToSync(numTasks = 1, stageId, stageAttemptNumber, taskAttemptId = 0, +barrierEpoch = 0), + timeout = new RpcTimeout(5 seconds, "rpcTimeOut")) +// sleep for waiting barrierEpoch value change +Thread.sleep(500) --- End diff -- Thanks for guidance, done in ecf12bd. I'll also pay attention in the future work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215876606 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1222,50 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => +case LeftSemi => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + } else { +join + } +case _: InnerLike => + // push down the single side only join filter for both sides sub queries + val newLeft = leftJoinConditions. --- End diff -- No problem, done in 87440b0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215876550 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1149,6 +1149,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { +if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +"plan is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join. This plan shows below:\n $j") + Cross +} else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // [[CheckCartesianProducts]], we throw firstly here for better readable --- End diff -- Thanks, done in 87440b0. I'll also pay attention in future work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215877417 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), -condition = Some("x.a".attr === Rand(10))) +joinType = Cross).where("x.a".attr === Rand(10)) // CheckAnalysis will ensure nondeterministic expressions not appear in join condition. // TODO support nondeterministic expressions in join condition. -comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, - checkAnalysis = false) +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, +checkAnalysis = false) +} + } + + test("join condition pushdown: deterministic and non-deterministic in left semi join") { --- End diff -- I didn't add SPARK-25314 cause it maybe a supplement for test("join condition pushdown: deterministic and non-deterministic"). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216377621 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,12 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)} + } + else { --- End diff -- nit: } else { --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216378185 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -503,9 +503,12 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): --- End diff -- nit: `: AppStatusStore`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216377882 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class AppStatusSource extends Source{ --- End diff -- nit: Source { --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216377526 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -560,6 +561,7 @@ class SparkContext(config: SparkConf) extends Logging { setupAndStartListenerBus() postEnvironmentUpdate() +_env.metricsSystem.registerSource(appStatusSource) --- End diff -- Better to put this line to +574 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/2 Got it, I'll revert the changes in file source in this commit, thanks for your reply. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/2 @cloud-fan @rdblue I want to leave some comments and thoughts during looking into this again, hope these can help us deciding the next step plan. Currently all the plan assumed input row is `RDD[InternalRow]`, whole framework treat columnar read as special case. Also the `inputRDDs` function not only be called in `WholeStageCodegenExec`, but also all the father physical node, it's very easy to get a mess in the scenario of nested plan during debug this fix. So we may have these 3 choices, the first two can totally remove cast but maybe have many changes on `CodegenSupport`, the last one can limited the changes but still has cast problem: 1. Erasure the type of `inputRDDs`, because we should allow both RDD[InternalRow] and RDD[ColumnarBatch] passed, mainly for the parent physical plan call the child. This is implemented as the last commit in this PR: https://github.com/apache/spark/pull/2/files 2. Refactor the framework to let all plan dealing with columnar batch 3. Limited the changes in `ColumnarBatchScan`, don't change `CodegenSupport`, but still left the cast problem. This is implemented as the first two commit in this PR: https://github.com/apache/spark/pull/2/files/7e88599dfc2caf177d12e890d588be68bdd3bc8e If all of these are not make sense, I'll just close this. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216122599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- Thanks for your question, actually that's also what I'm consider during do the compatible check. Hive do this column type change work in [HiveAlterHandler](https://github.com/apache/hive/blob/3287a097e31063cc805ca55c2ca7defffe761b6f/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java#L175 ) and the detailed compatible check is in [ColumnType](https://github.com/apache/hive/blob/3287a097e31063cc805ca55c2ca7defffe761b6f/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ColumnType.java#L206). You can see in the ColumnType checking work, it actually use the `canCast` semantic to judge compatible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127605 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1149,6 +1149,47 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { +if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +"plan is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join. This plan shows below:\n $j") + Cross +} else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // CheckCartesianProducts, we throw firstly here for better readable information. + throw new AnalysisException("Detected the whole commonJoinCondition:" + +s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + +"join to cross join by setting the configuration variable " + +s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") +} + } + + /** + * Generate new left and right child of join by pushing down the side only join filter, + * split commonJoinCondition based on the expression can be evaluated within join or not. + * + * @return (newLeftChild, newRightChild, newJoinCondition, conditionCannotEvaluateWithinJoin) --- End diff -- Got it, just see the demo here https://github.com/apache/spark/pull/22326/files#diff-a636a87d8843eeccca90140be91d4fafR1140, remove in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127673 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. -reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. -reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) +case LeftSemi => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + } else { +join + } +case _: InnerLike => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // only need to add cross join when whole commonJoinCondition are unevaluable + val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { --- End diff -- Thanks, after a detailed checking, I change this to `others.nonEmpty`, this maybe an unnecessary worry about the commonJoin contains both unevaluable and evaluable condition. Also add a test in next commit to ensure this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127710 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. -reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. -reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) +case LeftSemi => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) --- End diff -- Could I try to answer this? The projection only used in a left semi join after cross join in this scenario for ensuring it only contains left side attributes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22326 @holdenk Thanks, sorry for the typo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216127880 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- Thanks for the checking, my mistake of not describe the intention to do this feature. We want support type change just want to add the ability of changing the metadata of column type. The scenario we meet is our user want a type change(like int is not enough, need a long type), they has done the type changing in their data file, but we should hack to change the metastore or create the whole table again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127932 --- Diff: python/pyspark/sql/tests.py --- @@ -547,6 +547,74 @@ def test_udf_in_filter_on_top_of_join(self): df = left.crossJoin(right).filter(f("a", "b")) self.assertEqual(df.collect(), [Row(a=1, b=1)]) +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, f("a", "b")) +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, b=1)]) + +def test_udf_in_left_semi_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, f("a", "b"), "leftsemi") +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + +def test_udf_and_filter_in_join_condition(self): +# regression test for SPARK-25314 +# test the complex scenario with both udf(non-deterministic) +# and normal filter(deterministic) +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2]) +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, b1=1, b2=2)]) + +def test_udf_and_filter_in_left_semi_join_condition(self): +# regression test for SPARK-25314 +# test the complex scenario with both udf(non-deterministic) +# and normal filter(deterministic) +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + +def test_udf_and_common_filter_in_join_condition(self): --- End diff -- Add these two test for the comment in https://github.com/apache/spark/pull/22326#discussion_r216127673. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216127904 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- So maybe we should also add a conf like `MetastoreConf.ConfVars.DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES` in hive to wrap this behavior? WDYT. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215261610 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- Thanks @mgaido91 and @dilipbiswal ! I fix this in 63fbcce. The mainly problem is semi join in both deterministic and non-deterministic condition, filter after semi join will fail. Also add more tests both on python and scala side, including semi join, inner join and complex scenario described below. It makes the strategy difficult to read after considering left semi, so in 63fbcce I split the logic of semi join and inner join. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 ``` Could I do the refactor of moving ContextBarrierState out of BarrierCoordinator? ``` gental ping @jiangxb1987, I still follow up this. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22140 gental ping @HyukjinKwon @BryanCutler --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21618 gental ping @cloud-fan @gatorsmile @kiszk, we still meet this in internal folk, could you help to review? I'll resolve the conflict, great thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216588353 --- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java --- @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * This is based on hadoop-common-2.7.2 + * {@link org.apache.hadoop.fs.Globber}. + * This class exposes globWithThreshold which can be used glob path in parallel. + */ +public class SparkGlobber { + public static final Log LOG = LogFactory.getLog(SparkGlobber.class.getName()); + + private final FileSystem fs; + private final FileContext fc; + private final Path pathPattern; + + public SparkGlobber(FileSystem fs, Path pathPattern) { +this.fs = fs; +this.fc = null; +this.pathPattern = pathPattern; + } + + public SparkGlobber(FileContext fc, Path pathPattern) { +this.fs = null; +this.fc = fc; +this.pathPattern = pathPattern; + } + + private FileStatus getFileStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.getFileStatus(path); + } else { +return fc.getFileStatus(path); + } +} catch (FileNotFoundException e) { + return null; +} + } + + private FileStatus[] listStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.listStatus(path); + } else { +return fc.util().listStatus(path); + } +} catch (FileNotFoundException e) { + return new FileStatus[0]; +} + } + + private Path fixRelativePart(Path path) { +if (fs != null) { + return fs.fixRelativePart(path); +} else { + return fc.fixRelativePart(path); +} + } + + /** + * Convert a path component that contains backslash ecape sequences to a + * literal string. This is necessary when you want to explicitly refer to a + * path that contains globber metacharacters. + */ + private static String unescapePathComponent(String name) { +return name.replaceAll("(.)", "$1"); + } + + /** + * Translate an absolute path into a list of path components. + * We merge double slashes into a single slash here. + * POSIX root path, i.e. '/', does not get an entry in the list. + */ + private static List getPathComponents(String path) + throws IOException { +ArrayList ret = new ArrayList(); +for (String component : path.split(Path.SEPARATOR)) { + if (!component.isEmpty()) { +ret.add(component); + } +} +return ret; + } + + private String schemeFromPath(Path path) throws IOException { +String scheme = path.toUri().getScheme(); +if (scheme == null) { + if (fs != null) { +scheme = fs.getUri().getScheme(); + } else { +scheme = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme(); + } +} +return scheme; + } + + private String authorityFromPath(Path path) throws IOException { +String authority = path.toUri().getAuthority(); +if (authority == null) { + if (fs != null) { +authority = fs.getUri().getAuthority(); + } else { +authority = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority(); + } +} +return authority ; + } + + public FileStatus[] globWithThreshold(int threshold) throws IOException { +
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r216600156 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { -comment.map(column.withComment(_)).getOrElse(column) - } - --- End diff -- ``` Although the query above doesn't work well, why do users change column types? ``` As the scenario described above, user firstly use int but during some time found here we need a Long, he can rewrite the new data as Long and load data to new partitions. And if we not support the type change, user should do the table recreate job for this type change work. Yep, if not the binary file, the query works OK. ``` Logging initialized using configuration in jar:file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/lib/hive-common-1.2.2.jar!/hive-log4j.properties hive> CREATE TABLE t(a INT, b STRING, c INT); OK Time taken: 2.576 seconds hive> INSERT INTO t VALUES (1, 'a', 3);; Query ID = XuanYuan_20180911164348_32238a6c-b0a4-4cfd-aa3d-00a7628031cf Total jobs = 3 Launching Job 1 out of 3 Number of reduce tasks is set to 0 since there's no reduce operator Job running in-process (local Hadoop) 2018-09-11 16:43:51,684 Stage-1 map = 100%, reduce = 0% Ended Job = job_local162423_0001 Stage-4 is selected by condition resolver. Stage-3 is filtered out by condition resolver. Stage-5 is filtered out by condition resolver. Moving data to: file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/warehouse/t/.hive-staging_hive_2018-09-11_16-43-48_117_2262603440504094412-1/-ext-1 Loading data to table default.t Table default.t stats: [numFiles=1, numRows=1, totalSize=6, rawDataSize=5] MapReduce Jobs Launched: Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec OK Time taken: 4.025 seconds hive> select * from t;; OK 1 a 3 Time taken: 0.164 seconds, Fetched: 1 row(s) hive> ALTER TABLE t CHANGE a a STRING; OK Time taken: 0.177 seconds hive> select * from t; OK 1 a 3 Time taken: 0.12 seconds, Fetched: 1 row(s) hive> quit; ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216583509 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), -condition = Some("x.a".attr === Rand(10))) +joinType = Cross).where("x.a".attr === Rand(10)) --- End diff -- Yes, I changed this to let the test passing. The original thought is nondeterministic expression in join condition is not supported yet, so that's no big problem.https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L105 https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1158-L1159 But now I think I should more carefully about this and just limit the cross join changes only in PythonUDF case. WDYT? @mgaido91 .Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22369: [SPARK-25072][DOC] Update migration guide for behavior c...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22369 As the comment in https://github.com/apache/spark/pull/22140#issuecomment-419997180, I think this doc change is no more needed, I just close this, thanks @BryanCutler and @HyukjinKwon ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...
Github user xuanyuanking closed the pull request at: https://github.com/apache/spark/pull/22369 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216585000 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), -condition = Some("x.a".attr === Rand(10))) +joinType = Cross).where("x.a".attr === Rand(10)) --- End diff -- As the code in canEvaluateWithinJoin, we can get the scope relation : (CannotEvaluateWithinJoin = nonDeterminstic + Unevaluable) > Unevaluable > PythonUDF. So for the safety maybe I just limit the change scope to the smallest PythonUDF only. Need some advise from you thanks :) https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L104-L120 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r216587584 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala --- @@ -77,6 +80,51 @@ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers { }) } + test("test expanding glob path") { --- End diff -- No problem, I'll fix this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216606555 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), -condition = Some("x.a".attr === Rand(10))) +joinType = Cross).where("x.a".attr === Rand(10)) --- End diff -- Thanks @mgaido91 for the detailed review and advise, for me, I maybe choose only limited the change scope to pythonUDF only or at lease Unevaluable only. Waiting for others advice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21618 gental ping @cloud-fan @gatorsmile @kiszk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21729#discussion_r200989413 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -87,7 +87,7 @@ private[spark] class TaskSetManager( // Set the coresponding index of Boolean var when the task killed by other attempt tasks, --- End diff -- typo I made before, coresponding -> corresponding. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r201006275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -724,4 +726,35 @@ object DataSource extends Logging { """.stripMargin) } } + + /** + * Return all paths represented by the wildcard string. + * Use a local thread pool to do this while there's too many paths. + */ + private def getGlobbedPaths( + sparkSession: SparkSession, + fs: FileSystem, + hadoopConf: Configuration, + qualified: Path): Seq[Path] = { +val getGlobbedPathThreshold = sparkSession.sessionState.conf.parallelGetGlobbedPathThreshold +val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified, getGlobbedPathThreshold) --- End diff -- Thanks for your advise, I'll reuse the value of `spark.sql.sources.parallelGetGlobbedPath.numThreads` to control this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r201007556 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -656,6 +656,25 @@ object SQLConf { .intConf .createWithDefault(1) + val PARALLEL_GET_GLOBBED_PATH_THRESHOLD = +buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold") + .doc("The maximum number of subfiles or directories allowed after a globbed path " + +"expansion. If the number of paths exceeds this value during expansion, it tries to " + +"expand the globbed in parallel with multi-thread.") + .intConf + .checkValue(threshlod => threshlod >= 0, "The maximum number of subfiles or directories " + +"must not be negative") + .createWithDefault(32) + + val PARALLEL_GET_GLOBBED_PATH_NUM_THREADS = +buildConf("spark.sql.sources.parallelGetGlobbedPath.numThreads") + .doc("The number of threads to get a collection of path in parallel. Set the " + +"number to avoid generating too many threads.") + .intConf + .checkValue(parallel => parallel >= 0, "The maximum number of threads allowed for getting " + --- End diff -- Thanks for your catch, while this value set to 0 we'll get a IllegalArgumentException during new ThreadPoolExecutor. So I use the 0 value here as the default value for controlling this feature as we discuss in https://github.com/apache/spark/pull/21618#discussion_r200465855 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21729#discussion_r200990424 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("SPARK-24755 Executor loss can cause task to not be resubmitted") { +val conf = new SparkConf().set("spark.speculation", "true") +sc = new SparkContext("local", "test", conf) +// Set the speculation multiplier to be 0 so speculative tasks are launched immediately +sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation.quantile", "0.5") +sc.conf.set("spark.speculation", "true") + +var killTaskCalled = false +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3")) +sched.initialize(new FakeSchedulerBackend() { + override def killTask(taskId: Long, +executorId: String, +interruptThread: Boolean, +reason: String): Unit = { +// Check the only one killTask event in this case, which triggered by +// task 2.1 completed. +assert(taskId === 2) +assert(executorId === "exec3") +assert(interruptThread) +assert(reason === "another attempt succeeded") +killTaskCalled = true + } +}) + +// Keep track of the index of tasks that are resubmitted, +// so that the test can check that task is resubmitted correctly +var resubmittedTasks = new mutable.HashSet[Int] +val dagScheduler = new FakeDAGScheduler(sc, sched) { + override def taskEnded(task: Task[_], + reason: TaskEndReason, --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21729#discussion_r200990279 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("SPARK-24755 Executor loss can cause task to not be resubmitted") { +val conf = new SparkConf().set("spark.speculation", "true") +sc = new SparkContext("local", "test", conf) +// Set the speculation multiplier to be 0 so speculative tasks are launched immediately +sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation.quantile", "0.5") +sc.conf.set("spark.speculation", "true") + +var killTaskCalled = false +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3")) +sched.initialize(new FakeSchedulerBackend() { + override def killTask(taskId: Long, +executorId: String, --- End diff -- nit: indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r201006447 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -656,6 +656,25 @@ object SQLConf { .intConf .createWithDefault(1) + val PARALLEL_GET_GLOBBED_PATH_THRESHOLD = +buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold") + .doc("The maximum number of subfiles or directories allowed after a globbed path " + +"expansion. If the number of paths exceeds this value during expansion, it tries to " + +"expand the globbed in parallel with multi-thread.") + .intConf + .checkValue(threshlod => threshlod >= 0, "The maximum number of subfiles or directories " + --- End diff -- Thanks, done in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21729: SPARK-24755 Executor loss can cause task to not be resub...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21729 Please change the title to '[SPARK-24755][Core] Executor loss can cause task to not be resubmitted' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21642#discussion_r200160518 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -73,6 +74,10 @@ private[spark] class AppStatusListener( // around liveExecutors. @volatile private var activeExecutorCount = 0 + private val inputDataSetId = new AtomicLong(0) + private val outputDataSetId = new AtomicLong(0) + private val maxRecords = conf.getInt("spark.data.maxRecords", 1000) --- End diff -- What's this `spark.data.maxRecords` for? Maybe you should follow the config in core/src/main/scala/org/apache/spark/status/config.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21642#discussion_r200159852 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -185,6 +185,24 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent @DeveloperApi case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent +/** + * An internal class that describes the input data of an event log. + */ +@DeveloperApi +case class SparkListenerInputUpdate(format: String, +options: Map[String, String], --- End diff -- indent. see:https://github.com/apache/spark/pull/21642/files#diff-fbe8f967070627c8dc155237e77c7314R172 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21642#discussion_r200160022 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -19,6 +19,7 @@ package org.apache.spark.status import java.util.Date import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong --- End diff -- import order error here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21642#discussion_r200159949 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -185,6 +185,24 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent @DeveloperApi case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent +/** + * An internal class that describes the input data of an event log. + */ +@DeveloperApi +case class SparkListenerInputUpdate(format: String, +options: Map[String, String], +locations: Seq[String] = Seq.empty[String]) + extends SparkListenerEvent + +/** + * An internal class that describes the non-table output of an event log. + */ +@DeveloperApi +case class SparkListenerOutputUpdate(format: String, + mode: String, --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17702: [SPARK-20408][SQL] Get the glob path in parallel to redu...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/17702 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org