[GitHub] spark pull request #19745: [SPARK-2926][Core][Follow Up] Sort shuffle reader...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking closed the pull request at:

https://github.com/apache/spark/pull/19745


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19745: [SPARK-2926][Core][Follow Up] Sort shuffle reader for Sp...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19745
  
No problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21839
  
Thanks for reviewing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...

2018-07-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21839
  
@gatorsmile @maropu This is the follow up PR for #21447, please have a look 
when you have time, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21839: [SPARK-24339][SQL] Prunes the unused columns from...

2018-07-22 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/21839

[SPARK-24339][SQL] Prunes the unused columns from child of 
ScriptTransformation

## What changes were proposed in this pull request?

Modify the strategy in ColumnPruning to add a Project between 
ScriptTransformation and its child, this strategy can reduce the scan time 
especially in the scenario of the table has many columns.

## How was this patch tested?

Add UT in ColumnPruningSuite.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-24339

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21839.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21839


commit 68869d9fb8cc0e2686fb1e01f4d4c3e7ac8a52fe
Author: Yuanjian Li 
Date:   2018-07-22T14:46:31Z

Prunes the unused columns from child of ScriptTransformation




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21447: [SPARK-24339][SQL]Add project for transform/map/reduce s...

2018-07-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21447
  
I want to give a follow up PR and cc @gatorsmile @maropu for a review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21533: [SPARK-24195][Core] Ignore the files with "local" scheme...

2018-07-19 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21533
  
@jiangxb1987 Thanks for reminding, rephrase done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202703948
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -2248,4 +2249,20 @@ class HiveDDLSuite
   checkAnswer(spark.table("t4"), Row(0, 0))
 }
   }
+
+  test("desc formatted table for last access verification") {
+withTable("t1") {
+  sql(s"create table" +
+s" if not exists t1 (c1_int int, c2_string string, c3_float 
float)")
+  val desc = sql("DESC FORMATTED t1").collect().toSeq
+  val lastAcessField = desc.filter((r: Row) => 
r.getValuesMap(Seq("col_name"))
+.get("col_name").getOrElse("").equals("Last Access"))
+  // Check whether lastAcessField key is exist
+  assert(!lastAcessField.isEmpty)
+  val validLastAcessFieldValue = lastAcessField.filterNot((r: Row) => 
((r
--- End diff --

where is the val `validLastAcessFieldValue` used? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202701870
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -2248,4 +2249,20 @@ class HiveDDLSuite
   checkAnswer(spark.table("t4"), Row(0, 0))
 }
   }
+
+  test("desc formatted table for last access verification") {
+withTable("t1") {
+  sql(s"create table" +
+s" if not exists t1 (c1_int int, c2_string string, c3_float 
float)")
+  val desc = sql("DESC FORMATTED t1").collect().toSeq
+  val lastAcessField = desc.filter((r: Row) => 
r.getValuesMap(Seq("col_name"))
--- End diff --

nit: lastAccessField


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202703129
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -114,7 +114,10 @@ case class CatalogTablePartition(
   map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" 
+ p._2).mkString(", ")}}")
 }
 map.put("Created Time", new Date(createTime).toString)
-map.put("Last Access", new Date(lastAccessTime).toString)
+val lastAccess = {
+  if (-1 == lastAccessTime) "UNKNOWN" else new 
Date(lastAccessTime).toString
+}
+map.put("Last Access", lastAccess)
--- End diff --

No need for the val lastAccess?
```
map.put("Last Access",
  if (-1 == lastAccessTime) "UNKNOWN" else new 
Date(lastAccessTime).toString)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202704259
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -2248,4 +2249,20 @@ class HiveDDLSuite
   checkAnswer(spark.table("t4"), Row(0, 0))
 }
   }
+
+  test("desc formatted table for last access verification") {
+withTable("t1") {
+  sql(s"create table" +
+s" if not exists t1 (c1_int int, c2_string string, c3_float 
float)")
+  val desc = sql("DESC FORMATTED t1").collect().toSeq
+  val lastAcessField = desc.filter((r: Row) => 
r.getValuesMap(Seq("col_name"))
+.get("col_name").getOrElse("").equals("Last Access"))
+  // Check whether lastAcessField key is exist
+  assert(!lastAcessField.isEmpty)
+  val validLastAcessFieldValue = lastAcessField.filterNot((r: Row) => 
((r
+.getValuesMap(Seq("data_type"))
+.get("data_type").contains(new Date(-1).toString
+  assert(lastAcessField.size!=0)
--- End diff --

code style nit: blank before and after '!='


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202701770
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -2248,4 +2249,20 @@ class HiveDDLSuite
   checkAnswer(spark.table("t4"), Row(0, 0))
 }
   }
+
+  test("desc formatted table for last access verification") {
+withTable("t1") {
+  sql(s"create table" +
+s" if not exists t1 (c1_int int, c2_string string, c3_float 
float)")
+  val desc = sql("DESC FORMATTED t1").collect().toSeq
+  val lastAcessField = desc.filter((r: Row) => 
r.getValuesMap(Seq("col_name"))
+.get("col_name").getOrElse("").equals("Last Access"))
+  // Check whether lastAcessField key is exist
+  assert(!lastAcessField.isEmpty)
--- End diff --

lastAccessField.nonEmpty


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-07-24 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
@gatorsmile @maropu Please have a look about this, solving the conflicts 
takes me some time.
Also cc @jiangxb1987 because the conflict mainly with #20696, also thanks 
for the work in #20696, the latest pr no longer need to do the extra work for 
partition column comment changing as before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-07-24 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r204805474
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand(
 
 // Find the origin column from dataSchema by column name.
 val originColumn = findColumnByName(table.dataSchema, columnName, 
resolver)
-// Throw an AnalysisException if the column name/dataType is changed.
+// Throw an AnalysisException if the column name is changed.
 if (!columnEqual(originColumn, newColumn, resolver)) {
   throw new AnalysisException(
 "ALTER TABLE CHANGE COLUMN is not supported for changing column " +
   s"'${originColumn.name}' with type '${originColumn.dataType}' to 
" +
   s"'${newColumn.name}' with type '${newColumn.dataType}'")
 }
 
+val typeChanged = originColumn.dataType != newColumn.dataType
+val partitionColumnChanged = 
table.partitionColumnNames.contains(originColumn.name)
+
+// Throw an AnalysisException if the type of partition column is 
changed.
+if (typeChanged && partitionColumnChanged) {
--- End diff --

Just adding a check here when user changing the type of partition columns.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214075761
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
 return result;
   }
 
-  public static UnsafeArrayData forPrimitiveArray(int offset, int length, 
int elementSize) {
-return fromPrimitiveArray(null, offset, length, elementSize);
-  }
-
-  public static boolean shouldUseGenericArrayData(int elementSize, int 
length) {
--- End diff --

I think `shouldUseGenericArrayData` is still used in generated code, check 
the code here:

https://github.com/apache/spark/blob/b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3633


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22252: [SPARK-25261][MINOR][DOC] correct the default uni...

2018-08-29 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22252#discussion_r213708177
  
--- Diff: docs/configuration.md ---
@@ -152,7 +152,7 @@ of the most common options to set are:
   spark.driver.memory
   1g
   
-Amount of memory to use for the driver process, i.e. where 
SparkContext is initialized, in MiB 
+Amount of memory to use for the driver process, i.e. where 
SparkContext is initialized, in bytes 
--- End diff --

I think I got the point you want to report @ivoson, IIUC, this is a bug in 
the code not in doc, we should also make `spark.driver.memory=1024` with the 
unit of MiB, maybe this change the original behavior and we can announce in 
migrate guide? cc @srowen @HyukjinKwon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214084903
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
 return result;
   }
 
-  public static UnsafeArrayData forPrimitiveArray(int offset, int length, 
int elementSize) {
-return fromPrimitiveArray(null, offset, length, elementSize);
-  }
-
-  public static boolean shouldUseGenericArrayData(int elementSize, int 
length) {
--- End diff --

Yep, the UT failed log proved 
this:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95468/testReport/org.apache.spark.sql.catalyst.expressions/CollectionExpressionsSuite/Array_Union/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22282
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

2018-09-01 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/1#discussion_r214508181
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl(
   accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
   blockManagerId: BlockManagerId): Boolean = {
 // (taskId, stageId, stageAttemptId, accumUpdates)
-val accumUpdatesWithTaskIds: Array[(Long, Int, Int, 
Seq[AccumulableInfo])] = synchronized {
+val accumUpdatesWithTaskIds: Array[(Long, Int, Int, 
Seq[AccumulableInfo])] = {
   accumUpdates.flatMap { case (id, updates) =>
 val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), 
None))
-taskIdToTaskSetManager.get(id).map { taskSetMgr =>
+Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
--- End diff --

Just leave a small concern here, original code locked hole scope of ids in 
`accumUpdates`, after this changing, maybe some id could be found originally 
but can't find now, because `taskIdToTaskSetManager` can be changed by 
`removeExecutor` or `statusUpdate`. Its not big problem if executor has been 
removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22205: [SPARK-25212][SQL] Support Filter in ConvertToLoc...

2018-08-31 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22205#discussion_r214505595
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1349,6 +1357,12 @@ object ConvertToLocalRelation extends 
Rule[LogicalPlan] {
 
 case Limit(IntegerLiteral(limit), LocalRelation(output, data, 
isStreaming)) =>
   LocalRelation(output, data.take(limit), isStreaming)
+
+case Filter(condition, LocalRelation(output, data, isStreaming))
--- End diff --

super nit: comment in 
https://github.com/apache/spark/pull/22205/files#diff-a636a87d8843eeccca90140be91d4fafR1348
 not change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22313: [SPARK-25306][SQL] Use cache to speed up `createF...

2018-09-01 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22313#discussion_r214528435
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala ---
@@ -55,19 +59,52 @@ import org.apache.spark.sql.types._
  * known to be convertible.
  */
 private[orc] object OrcFilters extends Logging {
+  case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, 
DataType])
+
+  private lazy val cacheExpireTimeout =
+
org.apache.spark.sql.execution.datasources.orc.OrcFilters.cacheExpireTimeout
+
+  private lazy val searchArgumentCache = CacheBuilder.newBuilder()
+.expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS)
+.build(
+  new CacheLoader[FilterWithTypeMap, Option[Builder]]() {
+override def load(typeMapAndFilter: FilterWithTypeMap): 
Option[Builder] = {
+  buildSearchArgument(
+typeMapAndFilter.typeMap, typeMapAndFilter.filter, 
SearchArgumentFactory.newBuilder())
+}
+  })
+
+  private def getOrBuildSearchArgumentWithNewBuilder(
--- End diff --

Just a little question about is any possible to reuse code with 
https://github.com/apache/spark/pull/22313/files#diff-224b8cbedf286ecbfdd092d1e2e2f237R61?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215271726
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

```
 I am a bit surprised that works, it would be great to understand why. 
Thanks.
```
Sorry for the bad test, that's too special and the result just right by 
accident. The original implement will make all semi join return `[]` in PySpark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
gentle ping @maropu, could you help to review this? I'll keep follow up 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22341: [SPARK-24889][Core] Update block info when unpers...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22341#discussion_r215285275
  
--- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala ---
@@ -55,7 +55,7 @@ class RDDInfo(
 }
 
 private[spark] object RDDInfo {
-  private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
+  private lazy val callsiteForm = 
SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
--- End diff --

Is this related to the problem?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22341: [SPARK-24889][Core] Update block info when unpers...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22341#discussion_r215318105
  
--- Diff: core/src/main/scala/org/apache/spark/storage/RDDInfo.scala ---
@@ -55,7 +55,7 @@ class RDDInfo(
 }
 
 private[spark] object RDDInfo {
-  private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
+  private lazy val callsiteForm = 
SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
--- End diff --

Thanks for explaining.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214974819
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

Thanks, I'll do more test on the SemiJoin here, but as currently test over 
PySpark, this is not wrong, maybe I misunderstand you two `wrong` means, is 
your `wrong` means correctness or just benchmark regression?

![image](https://user-images.githubusercontent.com/4833765/45043269-4ba20c80-b09f-11e8-84dc-a1f3ff416303.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
Thanks @BryanCutler @HyukjinKwon !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22326
  
Gental ping @mgaido91 @HyukjinKwon @dilipbiswal, great thanks for advice, 
please have a look when you have time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r215859764
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand(
 
 // Find the origin column from dataSchema by column name.
 val originColumn = findColumnByName(table.dataSchema, columnName, 
resolver)
-// Throw an AnalysisException if the column name/dataType is changed.
+// Throw an AnalysisException if the column name is changed.
 if (!columnEqual(originColumn, newColumn, resolver)) {
   throw new AnalysisException(
 "ALTER TABLE CHANGE COLUMN is not supported for changing column " +
   s"'${originColumn.name}' with type '${originColumn.dataType}' to 
" +
   s"'${newColumn.name}' with type '${newColumn.dataType}'")
--- End diff --

After add the type check, maybe we also need the type message in error 
message.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r215859681
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand(
 
 // Find the origin column from dataSchema by column name.
 val originColumn = findColumnByName(table.dataSchema, columnName, 
resolver)
-// Throw an AnalysisException if the column name/dataType is changed.
+// Throw an AnalysisException if the column name is changed.
 if (!columnEqual(originColumn, newColumn, resolver)) {
--- End diff --

Thanks, not enough yet, add type compatible check in ef65c4d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r215860035
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

Thanks for advise, I should also check the type compatible, add in ef65c4d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r215859851
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
---
@@ -1697,6 +1697,16 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
 sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 INT COMMENT 'this is 
col1'")
 assert(getMetadata("col1").getString("key") == "value")
 assert(getMetadata("col1").getString("comment") == "this is col1")
+
+// Ensure that changing column type takes effect
+sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 STRING")
+val column = 
catalog.getTableMetadata(tableIdent).schema.fields.find(_.name == "col1")
+assert(column.get.dataType == StringType)
+
+// Ensure that changing partition column type throw exception
+intercept[AnalysisException] {
+  sql("ALTER TABLE dbx.tab1 CHANGE COLUMN a a STRING")
+}
--- End diff --

Thanks, done in ef65c4d. Also add check for type compatible check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216132926
  
--- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java ---
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * This is based on hadoop-common-2.7.2
+ * {@link org.apache.hadoop.fs.Globber}.
+ * This class exposes globWithThreshold which can be used glob path in 
parallel.
+ */
+public class SparkGlobber {
+  public static final Log LOG = 
LogFactory.getLog(SparkGlobber.class.getName());
+
+  private final FileSystem fs;
+  private final FileContext fc;
+  private final Path pathPattern;
+
+  public SparkGlobber(FileSystem fs, Path pathPattern) {
+this.fs = fs;
+this.fc = null;
+this.pathPattern = pathPattern;
+  }
+
+  public SparkGlobber(FileContext fc, Path pathPattern) {
+this.fs = null;
+this.fc = fc;
+this.pathPattern = pathPattern;
+  }
+
+  private FileStatus getFileStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.getFileStatus(path);
+  } else {
+return fc.getFileStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return null;
+}
+  }
+
+  private FileStatus[] listStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.listStatus(path);
+  } else {
+return fc.util().listStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return new FileStatus[0];
+}
+  }
+
+  private Path fixRelativePart(Path path) {
+if (fs != null) {
+  return fs.fixRelativePart(path);
+} else {
+  return fc.fixRelativePart(path);
+}
+  }
+
+  /**
+   * Convert a path component that contains backslash ecape sequences to a
+   * literal string.  This is necessary when you want to explicitly refer 
to a
+   * path that contains globber metacharacters.
+   */
+  private static String unescapePathComponent(String name) {
+return name.replaceAll("(.)", "$1");
+  }
+
+  /**
+   * Translate an absolute path into a list of path components.
+   * We merge double slashes into a single slash here.
+   * POSIX root path, i.e. '/', does not get an entry in the list.
+   */
+  private static List getPathComponents(String path)
+  throws IOException {
+ArrayList ret = new ArrayList();
+for (String component : path.split(Path.SEPARATOR)) {
+  if (!component.isEmpty()) {
+ret.add(component);
+  }
+}
+return ret;
+  }
+
+  private String schemeFromPath(Path path) throws IOException {
+String scheme = path.toUri().getScheme();
+if (scheme == null) {
+  if (fs != null) {
+scheme = fs.getUri().getScheme();
+  } else {
+scheme = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme();
+  }
+}
+return scheme;
+  }
+
+  private String authorityFromPath(Path path) throws IOException {
+String authority = path.toUri().getAuthority();
+if (authority == null) {
+  if (fs != null) {
+authority = fs.getUri().getAuthority();
+  } else {
+authority = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority();
+  }
+}
+return authority ;
+  }
+
+  public FileStatus[] globWithThreshold(int threshold) throws IOException {
+

[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216132779
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

Its also can't work in Hive, I test with Hive 1.2.2 and Hadoop 2.7.3.
```
Logging initialized using configuration in 
jar:file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/lib/hive-common-1.2.2.jar!/hive-log4j.properties
hive> CREATE TABLE t(a INT, b STRING, c INT) stored as parquet;
OK
Time taken: 1.604 seconds
hive> INSERT INTO t VALUES (1, 'a', 3);
Query ID = XuanYuan_20180908230549_3c8732ff-07e0-4a7a-95b4-260aed04a762
Total jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
2018-09-08 23:06:08,732 Stage-1 map = 0%,  reduce = 0%
2018-09-08 23:06:09,743 Stage-1 map = 100%,  reduce = 0%
Ended Job = job_local712899233_0001
Stage-4 is selected by condition resolver.
Stage-3 is filtered out by condition resolver.
Stage-5 is filtered out by condition resolver.
Moving data to: 
file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/warehouse/t/.hive-staging_hive_2018-09-08_23-05-49_782_100109481692677607-1/-ext-1
Loading data to table default.t
Table default.t stats: [numFiles=1, numRows=1, totalSize=343, rawDataSize=3]
MapReduce Jobs Launched:
Stage-Stage-1:  HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 20.294 seconds
hive> select * from t;
OK
1   a   3
Time taken: 0.154 seconds, Fetched: 1 row(s)
hive> ALTER TABLE t CHANGE a a STRING;
OK
Time taken: 0.18 seconds
hive> select * from t;
OK
Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.UnsupportedOperationException: Cannot inspect 
org.apache.hadoop.io.IntWritable
Time taken: 0.116 seconds
hive>
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216133261
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala ---
@@ -77,6 +80,51 @@ class SparkHadoopUtilSuite extends SparkFunSuite with 
Matchers {
 })
   }
 
+  test("test expanding glob path") {
--- End diff --

```
IIUC, the new feature is disabled as default since 
spark.sql.sources.parallelGetGlobbedPath.numThreads is 0.
```
Yes that's right.

```
I am afraid these test causes are executed only with disabling the new 
feature.
```
These mainly test the correctness of `sparkHadoopUtil.expandGlobPath`, 
maybe it's necessary to keep.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21618
  
@kiszk @maropu Great thanks for your review and advise! I'll address them 
and resolve the conflicts ASAP.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...

2018-09-08 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22369

[SPARK-25072][DOC] Update migration guide for behavior change

## What changes were proposed in this pull request?

Update the document for the behavior change in PySpark Row creation.

## How was this patch tested?

Existing UT.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25072-DOC

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22369.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22369


commit d257a38c647b45a9e83a2bdbbd2814f1b3fc5d56
Author: Yuanjian Li 
Date:   2018-09-09T04:26:23Z

Update doc for SPARK-25072




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147915
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -656,6 +656,25 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PARALLEL_GET_GLOBBED_PATH_THRESHOLD =
+buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold")
+  .doc("The maximum number of subfiles or directories allowed after a 
globbed path " +
+"expansion.")
+  .intConf
+  .checkValue(threshold => threshold >= 0, "The maximum number of 
subfiles or directories " +
--- End diff --

Maybe we should keep this public? Because the parallel only opened when the 
thread number > 0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147921
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -724,4 +726,37 @@ object DataSource extends Logging {
  """.stripMargin)
 }
   }
+
+  /**
+   * Return all paths represented by the wildcard string.
+   * This will be done in main thread by default while the value of config
+   * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local 
thread
+   * pool will expand the globbed paths.
--- End diff --

Thanks, done in 1319cd3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147887
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1557,6 +1576,15 @@ class SQLConf extends Serializable with Logging {
   def parallelPartitionDiscoveryParallelism: Int =
 getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)
 
+  def parallelGetGlobbedPathThreshold: Int =
+getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_THRESHOLD)
+
+  def parallelGetGlobbedPathNumThreads: Int =
+getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_NUM_THREADS)
+
+  def parallelGetGlobbedPathEnabled: Boolean =
--- End diff --

Thanks, done in 1319cd3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147889
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -724,4 +726,37 @@ object DataSource extends Logging {
  """.stripMargin)
 }
   }
+
+  /**
+   * Return all paths represented by the wildcard string.
+   * This will be done in main thread by default while the value of config
+   * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local 
thread
+   * pool will expand the globbed paths.
+   */
+  private def getGlobbedPaths(
+  sparkSession: SparkSession,
--- End diff --

Thanks for advise, done in 1319cd3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216147919
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -724,4 +726,37 @@ object DataSource extends Logging {
  """.stripMargin)
 }
   }
+
+  /**
+   * Return all paths represented by the wildcard string.
+   * This will be done in main thread by default while the value of config
+   * `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local 
thread
+   * pool will expand the globbed paths.
+   */
+  private def getGlobbedPaths(
--- End diff --

Thanks, that's more clear, done in 1319cd3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
```
@xuanyuanking Could you please update the document?
```
#22369 Thanks for reminding, I'll pay attention in future work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...

2018-09-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22369#discussion_r216189359
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1901,6 +1901,7 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above
 
   - As of version 2.3.1 Arrow functionality, including `pandas_udf` and 
`toPandas()`/`createDataFrame()` with `spark.sql.execution.arrow.enabled` set 
to `True`, has been marked as experimental. These are still evolving and not 
currently recommended for use in production.
+  - In version 2.3.1 and earlier, it is possible for PySpark to create a 
Row object by providing more value than column number through the customized 
Row class. Since Spark 2.3.3, Spark will confirm value length is less or equal 
than column length in PySpark. See 
[SPARK-25072](https://issues.apache.org/jira/browse/SPARK-25072) for details.
--- End diff --

Thanks Bryan, I'll address this after discussion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22369: [SPARK-25072][DOC] Update migration guide for behavior c...

2018-09-09 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22369
  
Got it, thanks @HyukjinKwon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22140#discussion_r215601543
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -269,6 +269,10 @@ def test_struct_field_type_name(self):
 struct_field = StructField("a", IntegerType())
 self.assertRaises(TypeError, struct_field.typeName)
 
+def test_invalid_create_row(slef):
+rowClass = Row("c1", "c2")
--- End diff --

Thanks, done in eb3f506.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22140#discussion_r215601350
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -269,6 +269,10 @@ def test_struct_field_type_name(self):
 struct_field = StructField("a", IntegerType())
 self.assertRaises(TypeError, struct_field.typeName)
 
+def test_invalid_create_row(slef):
--- End diff --

Thanks, done in eb3f506.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22140#discussion_r215601486
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1397,6 +1397,8 @@ def _create_row_inbound_converter(dataType):
 
 
 def _create_row(fields, values):
+if len(values) > len(fields):
+raise ValueError("Can not create %s by %s" % (fields, values))
--- End diff --

Thanks, improve done and move this check to `__call__` in `Row`. eb3f506


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214932266
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
--- End diff --

Thanks, done in a86a7d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214968900
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
--- End diff --

Thanks, done in 82e50d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214968794
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
--- End diff --

Make sense, also change this in `CheckCartesianProducts`. Done in 82e50d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214969191
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Filter(others.reduceLeft(And), join)
--- End diff --

Thanks, no need to add extra Filter in LeftSemi case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214931484
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
+  Cross
+} else joinType
--- End diff --

Thanks, done in a86a7d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r215635071
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -65,7 +65,7 @@ private[spark] class BarrierCoordinator(
 
   // Record all active stage attempts that make barrier() call(s), and the 
corresponding internal
   // state.
-  private val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
+  private[spark] val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
--- End diff --

No problem, done in ecf12bd.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r215636283
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -187,6 +191,9 @@ private[spark] class BarrierCoordinator(
   requesters.clear()
   cancelTimerTask()
 }
+
+// Check for clearing internal data, visible for test only.
+private[spark] def cleanCheck(): Boolean = requesters.isEmpty && 
timerTask == null
--- End diff --

Add internal data clean check in Xingbo's comments: 
https://github.com/apache/spark/pull/22165#issuecomment-415086679.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r215635587
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeoutException
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.spark._
+import org.apache.spark.rpc.RpcTimeout
+
+class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext 
{
+
+  /**
+   * Get the current barrierEpoch from barrierCoordinator.states by 
ContextBarrierId
+   */
+  def getCurrentBarrierEpoch(
+  stageId: Int, stageAttemptId: Int, barrierCoordinator: 
BarrierCoordinator): Int = {
+val barrierId = ContextBarrierId(stageId, stageAttemptId)
+barrierCoordinator.states.get(barrierId).barrierEpoch
+  }
+
+  test("normal test for single task") {
+sc = new SparkContext("local", "test")
+val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, 
sc.env.rpcEnv)
+val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", 
barrierCoordinator)
+val stageId = 0
+val stageAttemptNumber = 0
+rpcEndpointRef.askSync[Unit](
+  message = RequestToSync(numTasks = 1, stageId, stageAttemptNumber, 
taskAttemptId = 0,
+barrierEpoch = 0),
+  timeout = new RpcTimeout(5 seconds, "rpcTimeOut"))
+// sleep for waiting barrierEpoch value change
+Thread.sleep(500)
--- End diff --

Thanks for guidance, done in ecf12bd. I'll also pay attention in the future 
work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215876606
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1222,50 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
+case LeftSemi =>
   // push down the single side only join filter for both sides sub 
queries
   val newLeft = leftJoinConditions.
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
+  } else {
+join
+  }
+case _: InnerLike =>
+  // push down the single side only join filter for both sides sub 
queries
+  val newLeft = leftJoinConditions.
--- End diff --

No problem, done in 87440b0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215876550
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1149,6 +1149,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
+  private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: 
LogicalPlan) = {
+if (SQLConf.get.crossJoinEnabled) {
+  // if condition expression is unevaluable, it will be removed from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  logWarning(s"The whole commonJoinCondition:$commonJoinCondition of 
the join " +
+"plan is unevaluable, it will be ignored and the join plan will be 
" +
+s"turned to cross join. This plan shows below:\n $j")
+  Cross
+} else {
+  // if the crossJoinEnabled is false, an AnalysisException will throw 
by
+  // [[CheckCartesianProducts]], we throw firstly here for better 
readable
--- End diff --

Thanks, done in 87440b0. I'll also pay attention in future work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215877417
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
 
 // CheckAnalysis will ensure nondeterministic expressions not appear 
in join condition.
 // TODO support nondeterministic expressions in join condition.
-comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
-  checkAnalysis = false)
+withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
+  comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
+checkAnalysis = false)
+}
+  }
+
+  test("join condition pushdown: deterministic and non-deterministic in 
left semi join") {
--- End diff --

I didn't add SPARK-25314 cause it maybe a supplement for test("join 
condition pushdown: deterministic and non-deterministic").


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216377621
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -280,6 +284,12 @@ private[spark] class AppStatusListener(
   private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
 liveExecutors.get(execId).foreach { exec =>
   exec.isBlacklisted = blacklisted
+  if (blacklisted) {
+appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)}
+  }
+  else {
--- End diff --

nit: } else {


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216378185
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -503,9 +503,12 @@ private[spark] object AppStatusStore {
   /**
* Create an in-memory store for a live application.
*/
-  def createLiveStore(conf: SparkConf): AppStatusStore = {
+  def createLiveStore(
+  conf: SparkConf,
+  appStatusSource: Option[AppStatusSource] = None):
--- End diff --

nit: `: AppStatusStore`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216377882
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class AppStatusSource extends Source{
--- End diff --

nit: Source {


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216377526
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -560,6 +561,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
 setupAndStartListenerBus()
 postEnvironmentUpdate()
+_env.metricsSystem.registerSource(appStatusSource)
--- End diff --

Better to put this line to +574


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...

2018-08-29 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/2
  
Got it, I'll revert the changes in file source in this commit, thanks for 
your reply.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/2
  
@cloud-fan @rdblue 
I want to leave some comments and thoughts during looking into this again, 
hope these can help us deciding the next step plan.
Currently all the plan assumed input row is `RDD[InternalRow]`, whole 
framework treat columnar read as special case. Also the `inputRDDs` function 
not only be called in `WholeStageCodegenExec`, but also all the father physical 
node, it's very easy to get a mess in the scenario of nested plan during debug 
this fix. So we may have these 3 choices, the first two can totally remove cast 
but maybe have many changes on `CodegenSupport`, the last one can limited the 
changes but still has cast problem:
1. Erasure the type of `inputRDDs`, because we should allow both 
RDD[InternalRow] and RDD[ColumnarBatch] passed, mainly for the parent physical 
plan call the child. This is implemented as the last commit in this PR: 
https://github.com/apache/spark/pull/2/files
2. Refactor the framework to let all plan dealing with columnar batch
3. Limited the changes in `ColumnarBatchScan`, don't change 
`CodegenSupport`, but still left the cast problem. This is implemented as the 
first two commit in this PR: 
https://github.com/apache/spark/pull/2/files/7e88599dfc2caf177d12e890d588be68bdd3bc8e

If all of these are not make sense, I'll just close this. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216122599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

Thanks for your question, actually that's also what I'm consider during do 
the compatible check. Hive do this column type change work in 
[HiveAlterHandler](https://github.com/apache/hive/blob/3287a097e31063cc805ca55c2ca7defffe761b6f/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java#L175
) and the detailed compatible check is in 
[ColumnType](https://github.com/apache/hive/blob/3287a097e31063cc805ca55c2ca7defffe761b6f/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ColumnType.java#L206).
 You can see in the ColumnType checking work, it actually use the `canCast` 
semantic to judge compatible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127605
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1149,6 +1149,47 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
+  private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: 
LogicalPlan) = {
+if (SQLConf.get.crossJoinEnabled) {
+  // if condition expression is unevaluable, it will be removed from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  logWarning(s"The whole commonJoinCondition:$commonJoinCondition of 
the join " +
+"plan is unevaluable, it will be ignored and the join plan will be 
" +
+s"turned to cross join. This plan shows below:\n $j")
+  Cross
+} else {
+  // if the crossJoinEnabled is false, an AnalysisException will throw 
by
+  // CheckCartesianProducts, we throw firstly here for better readable 
information.
+  throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+s"$commonJoinCondition of the join plan is unevaluable, we need to 
cast the " +
+"join to cross join by setting the configuration variable " +
+s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+}
+  }
+
+  /**
+   * Generate new left and right child of join by pushing down the side 
only join filter,
+   * split commonJoinCondition based on the expression can be evaluated 
within join or not.
+   *
+   * @return (newLeftChild, newRightChild, newJoinCondition, 
conditionCannotEvaluateWithinJoin)
--- End diff --

Got it, just see the demo here 
https://github.com/apache/spark/pull/22326/files#diff-a636a87d8843eeccca90140be91d4fafR1140,
 remove in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127673
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
-  // push down the single side only join filter for both sides sub 
queries
-  val newLeft = leftJoinConditions.
-reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
-  val newRight = rightJoinConditions.
-reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+case LeftSemi =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
+  } else {
+join
+  }
+case _: InnerLike =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // only need to add cross join when whole commonJoinCondition 
are unevaluable
+  val newJoinType = if (commonJoinCondition.nonEmpty && 
newJoinCond.isEmpty) {
--- End diff --

Thanks, after a detailed checking, I change this to `others.nonEmpty`, this 
maybe an unnecessary worry about the commonJoin contains both unevaluable and 
evaluable condition. Also add a test in next commit to ensure this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127710
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
-  // push down the single side only join filter for both sides sub 
queries
-  val newLeft = leftJoinConditions.
-reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
-  val newRight = rightJoinConditions.
-reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+case LeftSemi =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
--- End diff --

Could I try to answer this? The projection only used in a left semi join 
after cross join in this scenario for ensuring it only contains left side 
attributes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22326
  
@holdenk Thanks, sorry for the typo.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216127880
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

Thanks for the checking, my mistake of not describe the intention to do 
this feature. We want support type change just want to add the ability of 
changing the metadata of column type. The scenario we meet is our user want a 
type change(like int is not enough, need a long type), they has done the type 
changing in their data file, but we should hack to change the metastore or 
create the whole table again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127932
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -547,6 +547,74 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2])
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, 
b1=1, b2=2)])
+
+def test_udf_and_filter_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], 
"left_semi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_common_filter_in_join_condition(self):
--- End diff --

Add these two test for the comment in 
https://github.com/apache/spark/pull/22326#discussion_r216127673.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216127904
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

So maybe we should also add a conf like 
`MetastoreConf.ConfVars.DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES` in hive to wrap 
this behavior? WDYT. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215261610
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

Thanks @mgaido91 and @dilipbiswal !
I fix this in 63fbcce. The mainly problem is semi join in both 
deterministic and non-deterministic condition, filter after semi join will 
fail. Also add more tests both on python and scala side, including semi join, 
inner join and complex scenario described below.
It makes the strategy difficult to read after considering left semi, so in 
63fbcce I split the logic of semi join and inner join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
```
Could I do the refactor of moving ContextBarrierState out of 
BarrierCoordinator?
```
gental ping @jiangxb1987, I still follow up this. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
gental ping @HyukjinKwon @BryanCutler 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21618
  
gental ping @cloud-fan @gatorsmile @kiszk, we still meet this in internal 
folk, could you help to review? I'll resolve the conflict, great thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216588353
  
--- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java ---
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * This is based on hadoop-common-2.7.2
+ * {@link org.apache.hadoop.fs.Globber}.
+ * This class exposes globWithThreshold which can be used glob path in 
parallel.
+ */
+public class SparkGlobber {
+  public static final Log LOG = 
LogFactory.getLog(SparkGlobber.class.getName());
+
+  private final FileSystem fs;
+  private final FileContext fc;
+  private final Path pathPattern;
+
+  public SparkGlobber(FileSystem fs, Path pathPattern) {
+this.fs = fs;
+this.fc = null;
+this.pathPattern = pathPattern;
+  }
+
+  public SparkGlobber(FileContext fc, Path pathPattern) {
+this.fs = null;
+this.fc = fc;
+this.pathPattern = pathPattern;
+  }
+
+  private FileStatus getFileStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.getFileStatus(path);
+  } else {
+return fc.getFileStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return null;
+}
+  }
+
+  private FileStatus[] listStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.listStatus(path);
+  } else {
+return fc.util().listStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return new FileStatus[0];
+}
+  }
+
+  private Path fixRelativePart(Path path) {
+if (fs != null) {
+  return fs.fixRelativePart(path);
+} else {
+  return fc.fixRelativePart(path);
+}
+  }
+
+  /**
+   * Convert a path component that contains backslash ecape sequences to a
+   * literal string.  This is necessary when you want to explicitly refer 
to a
+   * path that contains globber metacharacters.
+   */
+  private static String unescapePathComponent(String name) {
+return name.replaceAll("(.)", "$1");
+  }
+
+  /**
+   * Translate an absolute path into a list of path components.
+   * We merge double slashes into a single slash here.
+   * POSIX root path, i.e. '/', does not get an entry in the list.
+   */
+  private static List getPathComponents(String path)
+  throws IOException {
+ArrayList ret = new ArrayList();
+for (String component : path.split(Path.SEPARATOR)) {
+  if (!component.isEmpty()) {
+ret.add(component);
+  }
+}
+return ret;
+  }
+
+  private String schemeFromPath(Path path) throws IOException {
+String scheme = path.toUri().getScheme();
+if (scheme == null) {
+  if (fs != null) {
+scheme = fs.getUri().getScheme();
+  } else {
+scheme = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme();
+  }
+}
+return scheme;
+  }
+
+  private String authorityFromPath(Path path) throws IOException {
+String authority = path.toUri().getAuthority();
+if (authority == null) {
+  if (fs != null) {
+authority = fs.getUri().getAuthority();
+  } else {
+authority = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority();
+  }
+}
+return authority ;
+  }
+
+  public FileStatus[] globWithThreshold(int threshold) throws IOException {
+

[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r216600156
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -350,16 +366,11 @@ case class AlterTableChangeColumnCommand(
 s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
   }
 
-  // Add the comment to a column, if comment is empty, return the original 
column.
-  private def addComment(column: StructField, comment: Option[String]): 
StructField = {
-comment.map(column.withComment(_)).getOrElse(column)
-  }
-
--- End diff --

```
Although the query above doesn't work well, why do users change column 
types?
```
As the scenario described above, user firstly use int but during some time 
found here we need a Long, he can rewrite the new data as Long and load data to 
new partitions. And if we not support the type change, user should do the table 
recreate job for this type change work.

Yep, if not the binary file, the query works OK.
```
Logging initialized using configuration in 
jar:file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/lib/hive-common-1.2.2.jar!/hive-log4j.properties
hive> CREATE TABLE t(a INT, b STRING, c INT);
OK
Time taken: 2.576 seconds
hive> INSERT INTO t VALUES (1, 'a', 3);;
Query ID = XuanYuan_20180911164348_32238a6c-b0a4-4cfd-aa3d-00a7628031cf
Total jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
2018-09-11 16:43:51,684 Stage-1 map = 100%,  reduce = 0%
Ended Job = job_local162423_0001
Stage-4 is selected by condition resolver.
Stage-3 is filtered out by condition resolver.
Stage-5 is filtered out by condition resolver.
Moving data to: 
file:/Users/XuanYuan/Source/hive/apache-hive-1.2.2-bin/warehouse/t/.hive-staging_hive_2018-09-11_16-43-48_117_2262603440504094412-1/-ext-1
Loading data to table default.t
Table default.t stats: [numFiles=1, numRows=1, totalSize=6, rawDataSize=5]
MapReduce Jobs Launched:
Stage-Stage-1:  HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 4.025 seconds
hive> select * from t;;
OK
1   a   3
Time taken: 0.164 seconds, Fetched: 1 row(s)
hive> ALTER TABLE t CHANGE a a STRING;
OK
Time taken: 0.177 seconds
hive> select * from t;
OK
1   a   3
Time taken: 0.12 seconds, Fetched: 1 row(s)
hive> quit;
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216583509
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

Yes, I changed this to let the test passing. The original thought is 
nondeterministic expression in join condition is not supported yet, so that's 
no big 
problem.https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L105

https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1158-L1159
But now I think I should more carefully about this and just limit the cross 
join changes only in PythonUDF case. WDYT? @mgaido91 .Thanks.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22369: [SPARK-25072][DOC] Update migration guide for behavior c...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22369
  
As the comment in 
https://github.com/apache/spark/pull/22140#issuecomment-419997180, I think this 
doc change is no more needed, I just close this, thanks @BryanCutler and 
@HyukjinKwon !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking closed the pull request at:

https://github.com/apache/spark/pull/22369


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216585000
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

As the code in canEvaluateWithinJoin, we can get the scope relation : 
(CannotEvaluateWithinJoin = nonDeterminstic +  Unevaluable) > Unevaluable > 
PythonUDF. 
So for the safety maybe I just limit the change scope to the smallest 
PythonUDF only. Need some advise from you thanks :)



https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L104-L120


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r216587584
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala ---
@@ -77,6 +80,51 @@ class SparkHadoopUtilSuite extends SparkFunSuite with 
Matchers {
 })
   }
 
+  test("test expanding glob path") {
--- End diff --

No problem, I'll fix this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216606555
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

Thanks @mgaido91 for the detailed review and advise, for me, I maybe choose 
only limited the change scope to pythonUDF only or at lease Unevaluable only. 
Waiting for others advice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...

2018-07-12 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21618
  
gental ping @cloud-fan @gatorsmile @kiszk 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21729#discussion_r200989413
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -87,7 +87,7 @@ private[spark] class TaskSetManager(
   // Set the coresponding index of Boolean var when the task killed by 
other attempt tasks,
--- End diff --

typo I made before, coresponding -> corresponding.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r201006275
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -724,4 +726,35 @@ object DataSource extends Logging {
  """.stripMargin)
 }
   }
+
+  /**
+   * Return all paths represented by the wildcard string.
+   * Use a local thread pool to do this while there's too many paths.
+   */
+  private def getGlobbedPaths(
+  sparkSession: SparkSession,
+  fs: FileSystem,
+  hadoopConf: Configuration,
+  qualified: Path): Seq[Path] = {
+val getGlobbedPathThreshold = 
sparkSession.sessionState.conf.parallelGetGlobbedPathThreshold
+val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified, 
getGlobbedPathThreshold)
--- End diff --

Thanks for your advise, I'll reuse the value of 
`spark.sql.sources.parallelGetGlobbedPath.numThreads` to control this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r201007556
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -656,6 +656,25 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PARALLEL_GET_GLOBBED_PATH_THRESHOLD =
+buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold")
+  .doc("The maximum number of subfiles or directories allowed after a 
globbed path " +
+"expansion. If the number of paths exceeds this value during 
expansion, it tries to " +
+"expand the globbed in parallel with multi-thread.")
+  .intConf
+  .checkValue(threshlod => threshlod >= 0, "The maximum number of 
subfiles or directories " +
+"must not be negative")
+  .createWithDefault(32)
+
+  val PARALLEL_GET_GLOBBED_PATH_NUM_THREADS =
+buildConf("spark.sql.sources.parallelGetGlobbedPath.numThreads")
+  .doc("The number of threads to get a collection of path in parallel. 
Set the " +
+"number to avoid generating too many threads.")
+  .intConf
+  .checkValue(parallel => parallel >= 0, "The maximum number of 
threads allowed for getting " +
--- End diff --

Thanks for your catch, while this value set to 0 we'll get a 
IllegalArgumentException during new ThreadPoolExecutor. So I use the 0 value 
here as the default value for controlling this feature as we discuss in 
https://github.com/apache/spark/pull/21618#discussion_r200465855


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21729#discussion_r200990424
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite 
with LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
+val conf = new SparkConf().set("spark.speculation", "true")
+sc = new SparkContext("local", "test", conf)
+// Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation.quantile", "0.5")
+sc.conf.set("spark.speculation", "true")
+
+var killTaskCalled = false
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+  ("exec2", "host2"), ("exec3", "host3"))
+sched.initialize(new FakeSchedulerBackend() {
+  override def killTask(taskId: Long,
+executorId: String,
+interruptThread: Boolean,
+reason: String): Unit = {
+// Check the only one killTask event in this case, which triggered 
by
+// task 2.1 completed.
+assert(taskId === 2)
+assert(executorId === "exec3")
+assert(interruptThread)
+assert(reason === "another attempt succeeded")
+killTaskCalled = true
+  }
+})
+
+// Keep track of the index of tasks that are resubmitted,
+// so that the test can check that task is resubmitted correctly
+var resubmittedTasks = new mutable.HashSet[Int]
+val dagScheduler = new FakeDAGScheduler(sc, sched) {
+  override def taskEnded(task: Task[_],
+ reason: TaskEndReason,
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21729#discussion_r200990279
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite 
with LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
+val conf = new SparkConf().set("spark.speculation", "true")
+sc = new SparkContext("local", "test", conf)
+// Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation.quantile", "0.5")
+sc.conf.set("spark.speculation", "true")
+
+var killTaskCalled = false
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+  ("exec2", "host2"), ("exec3", "host3"))
+sched.initialize(new FakeSchedulerBackend() {
+  override def killTask(taskId: Long,
+executorId: String,
--- End diff --

nit: indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r201006447
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -656,6 +656,25 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PARALLEL_GET_GLOBBED_PATH_THRESHOLD =
+buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold")
+  .doc("The maximum number of subfiles or directories allowed after a 
globbed path " +
+"expansion. If the number of paths exceeds this value during 
expansion, it tries to " +
+"expand the globbed in parallel with multi-thread.")
+  .intConf
+  .checkValue(threshlod => threshlod >= 0, "The maximum number of 
subfiles or directories " +
--- End diff --

Thanks, done in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21729: SPARK-24755 Executor loss can cause task to not be resub...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21729
  
Please change the title to '[SPARK-24755][Core] Executor loss can cause 
task to not be resubmitted'


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...

2018-07-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21642#discussion_r200160518
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -73,6 +74,10 @@ private[spark] class AppStatusListener(
   // around liveExecutors.
   @volatile private var activeExecutorCount = 0
 
+  private val inputDataSetId = new AtomicLong(0)
+  private val outputDataSetId = new AtomicLong(0)
+  private val maxRecords = conf.getInt("spark.data.maxRecords", 1000)
--- End diff --

What's this `spark.data.maxRecords` for? Maybe you should follow the config 
in core/src/main/scala/org/apache/spark/status/config.scala


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...

2018-07-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21642#discussion_r200159852
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -185,6 +185,24 @@ case class SparkListenerApplicationEnd(time: Long) 
extends SparkListenerEvent
 @DeveloperApi
 case class SparkListenerLogStart(sparkVersion: String) extends 
SparkListenerEvent
 
+/**
+ * An internal class that describes the input data of an event log.
+ */
+@DeveloperApi
+case class SparkListenerInputUpdate(format: String,
+options: Map[String, String],
--- End diff --

indent. 
see:https://github.com/apache/spark/pull/21642/files#diff-fbe8f967070627c8dc155237e77c7314R172


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...

2018-07-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21642#discussion_r200160022
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -19,6 +19,7 @@ package org.apache.spark.status
 
 import java.util.Date
 import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
--- End diff --

import order error here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...

2018-07-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21642#discussion_r200159949
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -185,6 +185,24 @@ case class SparkListenerApplicationEnd(time: Long) 
extends SparkListenerEvent
 @DeveloperApi
 case class SparkListenerLogStart(sparkVersion: String) extends 
SparkListenerEvent
 
+/**
+ * An internal class that describes the input data of an event log.
+ */
+@DeveloperApi
+case class SparkListenerInputUpdate(format: String,
+options: Map[String, String],
+locations: Seq[String] = 
Seq.empty[String])
+  extends SparkListenerEvent
+
+/**
+ * An internal class that describes the non-table output of an event log.
+ */
+@DeveloperApi
+case class SparkListenerOutputUpdate(format: String,
+ mode: String,
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17702: [SPARK-20408][SQL] Get the glob path in parallel to redu...

2018-01-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/17702
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   >